# HG changeset patch # User Fabian Huch # Date 1717768039 -7200 # Node ID 17d2f775907aba5b88384885ad3cc1c0987c528a # Parent 7987b33fb6c5b9500ebab478581e61890741c25d add cluster/hosts configurations to build manager: allows running jobs in parallel on distinct hardware; diff -r 7987b33fb6c5 -r 17d2f775907a etc/options --- a/etc/options Fri Jun 07 15:04:07 2024 +0200 +++ b/etc/options Fri Jun 07 15:47:19 2024 +0200 @@ -243,6 +243,8 @@ option build_manager_identifier : string = "build_manager" -- "isabelle identifier for build manager processes" +option build_manager_cluster : string = "cluster.default" + option build_manager_delay : real = 1.0 -- "delay build manager loop" diff -r 7987b33fb6c5 -r 17d2f775907a src/Pure/Admin/ci_build.scala --- a/src/Pure/Admin/ci_build.scala Fri Jun 07 15:04:07 2024 +0200 +++ b/src/Pure/Admin/ci_build.scala Fri Jun 07 15:47:19 2024 +0200 @@ -7,6 +7,8 @@ package isabelle +import scala.collection.mutable + import java.time.ZoneId import java.time.format.DateTimeFormatter import java.util.{Properties => JProperties, Map => JMap} @@ -23,21 +25,6 @@ } - /* executor profile */ - - case class Profile(threads: Int, jobs: Int, numa: Boolean) - - object Profile { - def from_host: Profile = { - Isabelle_System.hostname() match { - case "hpcisabelle" => Profile(8, 8, numa = true) - case "lxcisa1" => Profile(4, 10, numa = false) - case _ => Profile(2, 2, numa = false) - } - } - } - - /* build config */ case class Build_Config( @@ -62,8 +49,31 @@ password = options.string("ci_mail_password")) } + /* ci build jobs */ + sealed trait Hosts { + def hosts_spec: String + def max_jobs: Option[Int] + def prefs: List[Options.Spec] + def numa_shuffling: Boolean + def build_cluster: Boolean + } + + case class Local(host_spec: String, jobs: Int, threads: Int, numa_shuffling: Boolean = true) + extends Hosts { + def hosts_spec: String = host_spec + def max_jobs: Option[Int] = Some(jobs) + def prefs: List[Options.Spec] = List(Options.Spec.eq("threads", threads.toString)) + def build_cluster: Boolean = false + } + + case class Cluster(hosts_spec: String, numa_shuffling: Boolean = true) extends Hosts { + def max_jobs: Option[Int] = None + def prefs: List[Options.Spec] = Nil + def build_cluster: Boolean = true + } + sealed trait Trigger case object On_Commit extends Trigger @@ -76,13 +86,13 @@ (before.time < start1.time && start1.time <= now.time) } } - + case class Timed(in_interval: (Date, Date) => Boolean) extends Trigger sealed case class Job( name: String, description: String, - profile: Profile, + hosts: Hosts, config: Build_Config, components: List[String] = Nil, trigger: Trigger = On_Commit @@ -102,7 +112,7 @@ val timing = Job( "benchmark", "runs benchmark and timing sessions", - Profile(threads = 6, jobs = 1, numa = false), + Local("benchmark", jobs = 1, threads = 6, numa_shuffling = false), Build_Config( documents = false, select = List( @@ -161,8 +171,8 @@ def print_section(title: String): Unit = println("\n=== " + title + " ===\n") - def ci_build(options: Options, job: Job): Unit = { - val profile = job.profile + def ci_build(options: Options, build_hosts: List[Build_Cluster.Host], job: Job): Unit = { + val hosts = job.hosts val config = job.config val isabelle_home = Path.explode(Isabelle_System.getenv_strict("ISABELLE_HOME")) @@ -175,12 +185,9 @@ print_section("CONFIGURATION") println(Build_Log.Settings.show()) - val build_options = - with_documents(options, config).int.update("threads", profile.threads) + - "parallel_proofs=1" + "system_heaps" + val build_options = with_documents(options, config) + "parallel_proofs=1" + "system_heaps" - println( - "jobs = " + profile.jobs + ", threads = " + profile.threads + ", numa = " + profile.numa) + println(hosts) print_section("BUILD") println("Build started at " + formatted_time) @@ -193,12 +200,13 @@ val start_time = Time.now() val results = progress.interrupt_handler { Build.build( - build_options, + build_options ++ hosts.prefs, + build_hosts = build_hosts, selection = config.selection, progress = progress, clean_build = config.clean, - numa_shuffling = profile.numa, - max_jobs = Some(profile.jobs), + numa_shuffling = hosts.numa_shuffling, + max_jobs = hosts.max_jobs, dirs = config.include, select_dirs = config.select) } @@ -241,17 +249,23 @@ /* arguments */ var options = Options.init() + val build_hosts = new mutable.ListBuffer[Build_Cluster.Host] val getopts = Getopts(""" Usage: isabelle ci_build [OPTIONS] JOB Options are: + -H HOSTS host specifications of the form NAMES:PARAMETERS (separated by commas) -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) - Runs Isabelle builds in ci environment, with the following build jobs: + Runs Isabelle builds in ci environment. For cluster builds, build hosts must + be passed explicitly (no registry). + + The following build jobs are available: """ + Library.indent_lines(4, show_jobs) + "\n", - "o:" -> (arg => options = options + arg)) + "o:" -> (arg => options = options + arg), + "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.load(Nil), arg))) val more_args = getopts(args) @@ -260,7 +274,7 @@ case _ => getopts.usage() } - ci_build(options, job) + ci_build(options, build_hosts.toList, job) }) } diff -r 7987b33fb6c5 -r 17d2f775907a src/Pure/Build/build_manager.scala --- a/src/Pure/Build/build_manager.scala Fri Jun 07 15:04:07 2024 +0200 +++ b/src/Pure/Build/build_manager.scala Fri Jun 07 15:47:19 2024 +0200 @@ -42,7 +42,10 @@ case class CI_Build(name: String, components: List[Component]) extends Build_Config { def fresh_build: Boolean = true - def command(build_hosts: List[Build_Cluster.Host]): String = " ci_build " + name + def command(build_hosts: List[Build_Cluster.Host]): String = + " ci_build" + + build_hosts.map(host => " -H " + Bash.string(host.print)).mkString + + " " + name } object User_Build { @@ -72,7 +75,7 @@ " build" + if_proper(afp_rev, " -A:") + base_sessions.map(session => " -B " + Bash.string(session)).mkString + - if_proper(build_hosts, build_hosts.map(host => " -H " + Bash.string(host.print)).mkString) + + build_hosts.map(host => " -H " + Bash.string(host.print)).mkString + if_proper(presentation, " -P:") + if_proper(requirements, " -R") + if_proper(all_sessions, " -a") + @@ -92,6 +95,8 @@ sealed case class Task( build_config: Build_Config, + build_cluster: Boolean, + hosts_spec: String, id: UUID.T = UUID.random(), submit_date: Date = Date.now(), priority: Priority = Priority.normal, @@ -100,6 +105,9 @@ def name: String = id.toString def kind: String = build_config.name def components: List[Component] = build_config.components + + def build_hosts: List[Build_Cluster.Host] = + Build_Cluster.Host.parse(Registry.global, hosts_spec) } sealed case class Job( @@ -107,6 +115,8 @@ kind: String, number: Long, isabelle_rev: String, + build_cluster: Boolean, + hostnames: List[String], components: List[Component], start_date: Date = Date.now(), cancelled: Boolean = false @@ -156,12 +166,22 @@ def num_builds = running.size + finished.size - def next: List[Task] = - if (pending.isEmpty) Nil + def next(build_hosts: List[Build_Cluster.Host]): Option[Task] = { + val cluster_running = running.values.exists(_.build_cluster) + val available = build_hosts.map(_.hostname).toSet - running.values.flatMap(_.hostnames).toSet + val ready = + for { + (_, task) <- pending + if !task.build_cluster || !cluster_running + if task.build_hosts.map(_.hostname).forall(available.contains) + } yield task + + if (ready.isEmpty) None else { - val priority = pending.values.map(_.priority).maxBy(_.ordinal) - pending.values.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering) + val priority = ready.map(_.priority).maxBy(_.ordinal) + ready.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering).headOption } + } def add_running(job: Job): State = copy(running = running + (job.name -> job)) def remove_running(name: String): State = copy(running = running - name) @@ -255,6 +275,8 @@ object Pending { val kind = SQL.Column.string("kind") + val build_cluster = SQL.Column.bool("build_cluster") + val hosts_spec = SQL.Column.string("hosts_spec") val id = SQL.Column.string("id").make_primary_key val submit_date = SQL.Column.date("submit_date") val priority = SQL.Column.string("priority") @@ -277,10 +299,10 @@ val verbose = SQL.Column.bool("verbose") val table = - make_table(List(kind, id, submit_date, priority, isabelle_rev, components, prefs, - requirements, all_sessions, base_sessions, exclude_session_groups, exclude_sessions, - session_groups, sessions, build_heap, clean_build, export_files, fresh_build, - presentation, verbose), + make_table(List(kind, build_cluster, hosts_spec, id, submit_date, priority, isabelle_rev, + components, prefs, requirements, all_sessions, base_sessions, exclude_session_groups, + exclude_sessions, session_groups, sessions, build_heap, clean_build, export_files, + fresh_build, presentation, verbose), name = "pending") } @@ -288,6 +310,8 @@ db.execute_query_statement(Pending.table.select(), Map.from[String, Task], get = { res => val kind = res.string(Pending.kind) + val build_cluster = res.bool(Pending.build_cluster) + val hosts_spec = res.string(Pending.hosts_spec) val id = res.string(Pending.id) val submit_date = res.date(Pending.submit_date) val priority = Priority.valueOf(res.string(Pending.priority)) @@ -319,7 +343,8 @@ clean_build, export_files, fresh_build, presentation, verbose) } - val task = Task(build_config, UUID.make(id), submit_date, priority, isabelle_rev) + val task = Task(build_config, build_cluster, hosts_spec, UUID.make(id), submit_date, + priority, isabelle_rev) task.name -> task }) @@ -340,11 +365,13 @@ for (name <- update.insert) yield { (stmt: SQL.Statement) => val task = pending(name) stmt.string(1) = task.kind - stmt.string(2) = task.id.toString - stmt.date(3) = task.submit_date - stmt.string(4) = task.priority.toString - stmt.string(5) = task.isabelle_rev - stmt.string(6) = task.components.mkString(",") + stmt.bool(2) = task.build_cluster + stmt.string(3) = task.hosts_spec + stmt.string(4) = task.id.toString + stmt.date(5) = task.submit_date + stmt.string(6) = task.priority.toString + stmt.string(7) = task.isabelle_rev + stmt.string(8) = task.components.mkString(",") def get[A](f: User_Build => A): Option[A] = task.build_config match { @@ -352,20 +379,20 @@ case _ => None } - stmt.string(7) = get(user_build => user_build.prefs.map(_.print).mkString(",")) - stmt.bool(8) = get(_.requirements) - stmt.bool(9) = get(_.all_sessions) - stmt.string(10) = get(_.base_sessions.mkString(",")) - stmt.string(11) = get(_.exclude_session_groups.mkString(",")) - stmt.string(12) = get(_.exclude_sessions.mkString(",")) - stmt.string(13) = get(_.session_groups.mkString(",")) - stmt.string(14) = get(_.sessions.mkString(",")) - stmt.bool(15) = get(_.build_heap) - stmt.bool(16) = get(_.clean_build) - stmt.bool(17) = get(_.export_files) - stmt.bool(18) = get(_.fresh_build) - stmt.bool(19) = get(_.presentation) - stmt.bool(20) = get(_.verbose) + stmt.string(9) = get(user_build => user_build.prefs.map(_.print).mkString(",")) + stmt.bool(10) = get(_.requirements) + stmt.bool(11) = get(_.all_sessions) + stmt.string(12) = get(_.base_sessions.mkString(",")) + stmt.string(13) = get(_.exclude_session_groups.mkString(",")) + stmt.string(14) = get(_.exclude_sessions.mkString(",")) + stmt.string(15) = get(_.session_groups.mkString(",")) + stmt.string(16) = get(_.sessions.mkString(",")) + stmt.bool(17) = get(_.build_heap) + stmt.bool(18) = get(_.clean_build) + stmt.bool(19) = get(_.export_files) + stmt.bool(20) = get(_.fresh_build) + stmt.bool(21) = get(_.presentation) + stmt.bool(22) = get(_.verbose) }) } @@ -380,12 +407,15 @@ val kind = SQL.Column.string("kind") val number = SQL.Column.long("number") val isabelle_rev = SQL.Column.string("isabelle_rev") + val build_cluster = SQL.Column.bool("build_cluster") + val hostnames = SQL.Column.string("hostnames") val components = SQL.Column.string("components") val start_date = SQL.Column.date("start_date") val cancelled = SQL.Column.bool("cancelled") val table = - make_table(List(id, kind, number, isabelle_rev, components, start_date, cancelled), + make_table(List(id, kind, number, isabelle_rev, build_cluster, hostnames, components, + start_date, cancelled), name = "running") } @@ -396,12 +426,14 @@ val kind = res.string(Running.kind) val number = res.long(Running.number) val isabelle_rev = res.string(Running.isabelle_rev) + val build_cluster = res.bool(Running.build_cluster) + val hostnames = space_explode(',', res.string(Running.hostnames)) val components = space_explode(',', res.string(Running.components)).map(Component.parse) val start_date = res.date(Running.start_date) val cancelled = res.bool(Running.cancelled) - val job = - Job(UUID.make(id), kind, number, isabelle_rev, components, start_date, cancelled) + val job = Job(UUID.make(id), kind, number, isabelle_rev, build_cluster, hostnames, + components, start_date, cancelled) job.name -> job }) @@ -425,9 +457,11 @@ stmt.string(2) = job.kind stmt.long(3) = job.number stmt.string(4) = job.isabelle_rev - stmt.string(5) = job.components.mkString(",") - stmt.date(6) = job.start_date - stmt.bool(7) = job.cancelled + stmt.bool(5) = job.build_cluster + stmt.string(6) = job.hostnames.mkString(",") + stmt.string(7) = job.components.mkString(",") + stmt.date(8) = job.start_date + stmt.bool(9) = job.cancelled }) } update @@ -631,13 +665,13 @@ private def start_next(): Option[Context] = synchronized_database("start_next") { - _state.next.headOption.flatMap { task => + _state.next(build_hosts).flatMap { task => progress.echo("Initializing " + task.name) _state = _state.remove_pending(task.name) val number = _state.next_number(task.kind) - val context = Context(store, task, number, build_hosts) + val context = Context(store, task, number) Exn.capture { context.init() @@ -646,6 +680,8 @@ val isabelle_rev = sync(isabelle_repository, task.isabelle_rev, context.task_dir) + val hostnames = task.build_hosts.map(_.hostname).distinct + val components = for (component <- task.components) yield sync_dirs.find(_.name == component.name) match { @@ -657,7 +693,7 @@ else error("Unknown component " + component) } - Job(task.id, task.kind, number, isabelle_rev, components) + Job(task.id, task.kind, number, isabelle_rev, task.build_cluster, hostnames, components) } match { case Exn.Res(job) => _state = _state.add_running(job) @@ -704,17 +740,18 @@ def init: Runner.State = Runner.State.empty def loop_body(state: Runner.State): Runner.State = { - if (state.is_empty && !progress.stopped) { - start_next() match { - case None => state - case Some(context) => state.init(context) + val state1 = + if (progress.stopped) state + else { + start_next() match { + case None => state + case Some(context) => state.init(context) + } } - } - else { - val (state1, results) = stop_cancelled(state).update - results.foreach(finish_job) - state1 - } + val state2 = stop_cancelled(state1) + val (state3, results) = state2.update + results.foreach(finish_job) + state3 } } @@ -761,7 +798,8 @@ if isabelle_updated || ci_job.components.exists(updated_components.contains) if !_state.pending.values.exists(_.kind == ci_job.name) } { - val task = Task(CI_Build(ci_job), priority = Priority.low, isabelle_rev = "default") + val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec, + priority = Priority.low, isabelle_rev = "default") _state = _state.add_pending(task) } } @@ -796,7 +834,8 @@ for (ci_job <-ci_jobs) ci_job.trigger match { case isabelle.CI_Build.Timed(in_interval) if in_interval(previous, next) => - val task = Task(CI_Build(ci_job), isabelle_rev = "default") + val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec, + isabelle_rev = "default") _state = _state.add_pending(task) case _ => } @@ -1063,21 +1102,21 @@ /* context */ - case class Context( - store: Store, - task: Task, - number: Long, - build_hosts: List[Build_Cluster.Host] - ) { - def name: String = task.kind + "/" + number + case class Context(store: Store, task: Task, number: Long) { + def name = task.kind + "/" + number + def progress: Progress = new File_Progress(store.log_file(name)) + + def task_dir: Path = store.task_dir(task) + def init(): Unit = Isabelle_System.make_directory(store.log_file(name).dir) - def task_dir: Path = store.task_dir(task) - def components: List[Component] = task.build_config.components - def isabelle_identifier: String = store.identifier - def fresh_build: Boolean = task.build_config.fresh_build - def command: String = task.build_config.command(build_hosts) - def progress: Progress = new File_Progress(store.log_file(name)) - def open_ssh(): SSH.Session = store.open_ssh() + + def isabelle_identifier: String = + if (task.build_cluster) store.options.string("build_cluster_identifier") else store.identifier + + def open_ssh(): SSH.System = { + if (task.build_cluster) store.open_ssh() + else Library.the_single(task.build_hosts).open_ssh(store.options) + } } @@ -1087,10 +1126,14 @@ def open(context: Context): Build_Process = new Build_Process(context.open_ssh(), context) } - class Build_Process(ssh: SSH.Session, context: Context) { + class Build_Process(ssh: SSH.System, context: Context) { + private val task = context.task + private val progress = context.progress + + + /* resources with cleanup operations */ + private val _dir = ssh.tmp_dir() - private val _progress = context.progress - private val _isabelle = try { val rsync_context = Rsync.Context(ssh = ssh) @@ -1099,7 +1142,7 @@ rsync_context.target(_dir))).check Isabelle_System.rm_tree(context.task_dir) - Other_Isabelle(_dir, context.isabelle_identifier, ssh, _progress) + Other_Isabelle(_dir, context.isabelle_identifier, ssh, progress) } catch { case exn: Throwable => close(); throw exn } @@ -1107,30 +1150,23 @@ try { val init_components = for { - component <- context.components + component <- task.build_config.components target = _dir + Sync.DIRS + Path.basic(component.name) if Components.is_component_dir(target) } yield "init_component " + quote(target.absolute.implode) _isabelle.init( other_settings = _isabelle.init_components() ::: init_components, - fresh = context.fresh_build, echo = true) + fresh = task.build_config.fresh_build, echo = true) - val cmd = context.command - _progress.echo("isabelle" + cmd) + val cmd = task.build_config.command(task.build_hosts) + progress.echo("isabelle" + cmd) val script = File.bash_path(Isabelle_Tool.exe(_isabelle.isabelle_home)) + cmd ssh.bash_process(_isabelle.bash_context(script), settings = false) } catch { case exn: Throwable => close(); throw exn } - def run(): Process_Result = { - val process_result = - _process.result(progress_stdout = _progress.echo(_), progress_stderr = _progress.echo(_)) - close() - process_result - } - def cancel(): Unit = Option(_process).foreach(_.interrupt()) def close(): Unit = { @@ -1138,6 +1174,16 @@ Isabelle_System.rm_tree(context.task_dir) ssh.close() } + + + /* execution */ + + def run(): Process_Result = { + val process_result = + _process.result(progress_stdout = progress.echo(_), progress_stderr = progress.echo(_)) + close() + process_result + } } @@ -1250,10 +1296,11 @@ val id = UUID.random() val afp_rev = if (afp_root.nonEmpty) Some("") else None - val build_config = User_Build(afp_rev, prefs, requirements, all_sessions, base_sessions, - exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap, clean_build, - export_files, fresh_build, presentation, verbose) - val task = Task(build_config, id, Date.now(), Priority.high) + val hosts_spec = options.string("build_manager_cluster") + val build_config = User_Build(afp_rev, prefs, requirements, all_sessions, + base_sessions, exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap, + clean_build, export_files, fresh_build, presentation, verbose) + val task = Task(build_config, true, hosts_spec, id, Date.now(), Priority.high) val dir = store.task_dir(task) @@ -1315,7 +1362,7 @@ Options are: -A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """) -D DIR include extra component in given directory - -H HOSTS additional cluster host specifications of the form + -H HOSTS host specifications for all available hosts of the form NAMES:PARAMETERS (separated by commas) -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -p PORT explicit web server port