add cluster/hosts configurations to build manager: allows running jobs in parallel on distinct hardware;
authorFabian Huch <huch@in.tum.de>
Fri, 07 Jun 2024 15:47:19 +0200
changeset 80281 17d2f775907a
parent 80280 7987b33fb6c5
child 80282 3c3a9154c107
add cluster/hosts configurations to build manager: allows running jobs in parallel on distinct hardware;
etc/options
src/Pure/Admin/ci_build.scala
src/Pure/Build/build_manager.scala
--- a/etc/options	Fri Jun 07 15:04:07 2024 +0200
+++ b/etc/options	Fri Jun 07 15:47:19 2024 +0200
@@ -243,6 +243,8 @@
 option build_manager_identifier : string = "build_manager"
   -- "isabelle identifier for build manager processes"
 
+option build_manager_cluster : string = "cluster.default"
+
 option build_manager_delay : real = 1.0
   -- "delay build manager loop"
 
--- a/src/Pure/Admin/ci_build.scala	Fri Jun 07 15:04:07 2024 +0200
+++ b/src/Pure/Admin/ci_build.scala	Fri Jun 07 15:47:19 2024 +0200
@@ -7,6 +7,8 @@
 package isabelle
 
 
+import scala.collection.mutable
+
 import java.time.ZoneId
 import java.time.format.DateTimeFormatter
 import java.util.{Properties => JProperties, Map => JMap}
@@ -23,21 +25,6 @@
   }
 
 
-  /* executor profile */
-
-  case class Profile(threads: Int, jobs: Int, numa: Boolean)
-
-  object Profile {
-    def from_host: Profile = {
-      Isabelle_System.hostname() match {
-        case "hpcisabelle" => Profile(8, 8, numa = true)
-        case "lxcisa1" => Profile(4, 10, numa = false)
-        case _ => Profile(2, 2, numa = false)
-      }
-    }
-  }
-
-
   /* build config */
 
   case class Build_Config(
@@ -62,8 +49,31 @@
       password = options.string("ci_mail_password"))
   }
 
+
   /* ci build jobs */
 
+  sealed trait Hosts {
+    def hosts_spec: String
+    def max_jobs: Option[Int]
+    def prefs: List[Options.Spec]
+    def numa_shuffling: Boolean
+    def build_cluster: Boolean
+  }
+
+  case class Local(host_spec: String, jobs: Int, threads: Int, numa_shuffling: Boolean = true)
+    extends Hosts {
+    def hosts_spec: String = host_spec
+    def max_jobs: Option[Int] = Some(jobs)
+    def prefs: List[Options.Spec] = List(Options.Spec.eq("threads", threads.toString))
+    def build_cluster: Boolean = false
+  }
+
+  case class Cluster(hosts_spec: String, numa_shuffling: Boolean = true) extends Hosts {
+    def max_jobs: Option[Int] = None
+    def prefs: List[Options.Spec] = Nil
+    def build_cluster: Boolean = true
+  }
+
   sealed trait Trigger
   case object On_Commit extends Trigger
 
@@ -76,13 +86,13 @@
           (before.time < start1.time && start1.time <= now.time)
       }
   }
-  
+
   case class Timed(in_interval: (Date, Date) => Boolean) extends Trigger
 
   sealed case class Job(
     name: String,
     description: String,
-    profile: Profile,
+    hosts: Hosts,
     config: Build_Config,
     components: List[String] = Nil,
     trigger: Trigger = On_Commit
@@ -102,7 +112,7 @@
   val timing =
     Job(
       "benchmark", "runs benchmark and timing sessions",
-      Profile(threads = 6, jobs = 1, numa = false),
+      Local("benchmark", jobs = 1, threads = 6, numa_shuffling = false),
       Build_Config(
         documents = false,
         select = List(
@@ -161,8 +171,8 @@
   def print_section(title: String): Unit =
     println("\n=== " + title + " ===\n")
 
-  def ci_build(options: Options, job: Job): Unit = {
-    val profile = job.profile
+  def ci_build(options: Options, build_hosts: List[Build_Cluster.Host], job: Job): Unit = {
+    val hosts = job.hosts
     val config = job.config
 
     val isabelle_home = Path.explode(Isabelle_System.getenv_strict("ISABELLE_HOME"))
@@ -175,12 +185,9 @@
     print_section("CONFIGURATION")
     println(Build_Log.Settings.show())
 
-    val build_options =
-      with_documents(options, config).int.update("threads", profile.threads) +
-        "parallel_proofs=1" + "system_heaps"
+    val build_options = with_documents(options, config) + "parallel_proofs=1" + "system_heaps"
 
-    println(
-      "jobs = " + profile.jobs + ", threads = " + profile.threads + ", numa = " + profile.numa)
+    println(hosts)
 
     print_section("BUILD")
     println("Build started at " + formatted_time)
@@ -193,12 +200,13 @@
       val start_time = Time.now()
       val results = progress.interrupt_handler {
         Build.build(
-          build_options,
+          build_options ++ hosts.prefs,
+          build_hosts = build_hosts,
           selection = config.selection,
           progress = progress,
           clean_build = config.clean,
-          numa_shuffling = profile.numa,
-          max_jobs = Some(profile.jobs),
+          numa_shuffling = hosts.numa_shuffling,
+          max_jobs = hosts.max_jobs,
           dirs = config.include,
           select_dirs = config.select)
       }
@@ -241,17 +249,23 @@
       /* arguments */
 
       var options = Options.init()
+      val build_hosts = new mutable.ListBuffer[Build_Cluster.Host]
 
       val getopts = Getopts("""
 Usage: isabelle ci_build [OPTIONS] JOB
 
   Options are:
+    -H HOSTS     host specifications of the form NAMES:PARAMETERS (separated by commas)
     -o OPTION    override Isabelle system OPTION (via NAME=VAL or NAME)
 
-  Runs Isabelle builds in ci environment, with the following build jobs:
+  Runs Isabelle builds in ci environment. For cluster builds, build hosts must
+  be passed explicitly (no registry).
+
+  The following build jobs are available:
 
 """ + Library.indent_lines(4, show_jobs) + "\n",
-        "o:" -> (arg => options = options + arg))
+        "o:" -> (arg => options = options + arg),
+        "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.load(Nil), arg)))
 
       val more_args = getopts(args)
 
@@ -260,7 +274,7 @@
         case _ => getopts.usage()
       }
 
-      ci_build(options, job)
+      ci_build(options, build_hosts.toList, job)
     })
 }
 
--- a/src/Pure/Build/build_manager.scala	Fri Jun 07 15:04:07 2024 +0200
+++ b/src/Pure/Build/build_manager.scala	Fri Jun 07 15:47:19 2024 +0200
@@ -42,7 +42,10 @@
 
   case class CI_Build(name: String, components: List[Component]) extends Build_Config {
     def fresh_build: Boolean = true
-    def command(build_hosts: List[Build_Cluster.Host]): String = " ci_build " + name
+    def command(build_hosts: List[Build_Cluster.Host]): String =
+      " ci_build" +
+      build_hosts.map(host => " -H " + Bash.string(host.print)).mkString +
+      " " + name
   }
 
   object User_Build {
@@ -72,7 +75,7 @@
       " build" +
         if_proper(afp_rev, " -A:") +
         base_sessions.map(session => " -B " + Bash.string(session)).mkString +
-        if_proper(build_hosts, build_hosts.map(host => " -H " + Bash.string(host.print)).mkString) +
+        build_hosts.map(host => " -H " + Bash.string(host.print)).mkString +
         if_proper(presentation, " -P:") +
         if_proper(requirements, " -R") +
         if_proper(all_sessions, " -a") +
@@ -92,6 +95,8 @@
 
   sealed case class Task(
     build_config: Build_Config,
+    build_cluster: Boolean,
+    hosts_spec: String,
     id: UUID.T = UUID.random(),
     submit_date: Date = Date.now(),
     priority: Priority = Priority.normal,
@@ -100,6 +105,9 @@
     def name: String = id.toString
     def kind: String = build_config.name
     def components: List[Component] = build_config.components
+
+    def build_hosts: List[Build_Cluster.Host] =
+      Build_Cluster.Host.parse(Registry.global, hosts_spec)
   }
 
   sealed case class Job(
@@ -107,6 +115,8 @@
     kind: String,
     number: Long,
     isabelle_rev: String,
+    build_cluster: Boolean,
+    hostnames: List[String],
     components: List[Component],
     start_date: Date = Date.now(),
     cancelled: Boolean = false
@@ -156,12 +166,22 @@
 
     def num_builds = running.size + finished.size
 
-    def next: List[Task] =
-      if (pending.isEmpty) Nil
+    def next(build_hosts: List[Build_Cluster.Host]): Option[Task] = {
+      val cluster_running = running.values.exists(_.build_cluster)
+      val available = build_hosts.map(_.hostname).toSet - running.values.flatMap(_.hostnames).toSet
+      val ready =
+        for {
+          (_, task) <- pending
+          if !task.build_cluster || !cluster_running
+          if task.build_hosts.map(_.hostname).forall(available.contains)
+        } yield task
+
+      if (ready.isEmpty) None
       else {
-        val priority = pending.values.map(_.priority).maxBy(_.ordinal)
-        pending.values.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering)
+        val priority = ready.map(_.priority).maxBy(_.ordinal)
+        ready.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering).headOption
       }
+    }
 
     def add_running(job: Job): State = copy(running = running + (job.name -> job))
     def remove_running(name: String): State = copy(running = running - name)
@@ -255,6 +275,8 @@
 
     object Pending {
       val kind = SQL.Column.string("kind")
+      val build_cluster = SQL.Column.bool("build_cluster")
+      val hosts_spec = SQL.Column.string("hosts_spec")
       val id = SQL.Column.string("id").make_primary_key
       val submit_date = SQL.Column.date("submit_date")
       val priority = SQL.Column.string("priority")
@@ -277,10 +299,10 @@
       val verbose = SQL.Column.bool("verbose")
 
       val table =
-        make_table(List(kind, id, submit_date, priority, isabelle_rev, components, prefs,
-          requirements, all_sessions, base_sessions, exclude_session_groups, exclude_sessions,
-          session_groups, sessions, build_heap, clean_build, export_files, fresh_build,
-          presentation, verbose),
+        make_table(List(kind, build_cluster, hosts_spec, id, submit_date, priority, isabelle_rev,
+          components, prefs, requirements, all_sessions, base_sessions, exclude_session_groups,
+          exclude_sessions, session_groups, sessions, build_heap, clean_build, export_files,
+          fresh_build, presentation, verbose),
         name = "pending")
     }
 
@@ -288,6 +310,8 @@
       db.execute_query_statement(Pending.table.select(), Map.from[String, Task], get =
         { res =>
           val kind = res.string(Pending.kind)
+          val build_cluster = res.bool(Pending.build_cluster)
+          val hosts_spec = res.string(Pending.hosts_spec)
           val id = res.string(Pending.id)
           val submit_date = res.date(Pending.submit_date)
           val priority = Priority.valueOf(res.string(Pending.priority))
@@ -319,7 +343,8 @@
                 clean_build, export_files, fresh_build, presentation, verbose)
             }
 
-          val task = Task(build_config, UUID.make(id), submit_date, priority, isabelle_rev)
+          val task = Task(build_config, build_cluster, hosts_spec, UUID.make(id), submit_date,
+            priority, isabelle_rev)
 
           task.name -> task
         })
@@ -340,11 +365,13 @@
           for (name <- update.insert) yield { (stmt: SQL.Statement) =>
             val task = pending(name)
             stmt.string(1) = task.kind
-            stmt.string(2) = task.id.toString
-            stmt.date(3) = task.submit_date
-            stmt.string(4) = task.priority.toString
-            stmt.string(5) = task.isabelle_rev
-            stmt.string(6) = task.components.mkString(",")
+            stmt.bool(2) = task.build_cluster
+            stmt.string(3) = task.hosts_spec
+            stmt.string(4) = task.id.toString
+            stmt.date(5) = task.submit_date
+            stmt.string(6) = task.priority.toString
+            stmt.string(7) = task.isabelle_rev
+            stmt.string(8) = task.components.mkString(",")
 
             def get[A](f: User_Build => A): Option[A] =
               task.build_config match {
@@ -352,20 +379,20 @@
                 case _ => None
               }
 
-            stmt.string(7) = get(user_build => user_build.prefs.map(_.print).mkString(","))
-            stmt.bool(8) = get(_.requirements)
-            stmt.bool(9) = get(_.all_sessions)
-            stmt.string(10) = get(_.base_sessions.mkString(","))
-            stmt.string(11) = get(_.exclude_session_groups.mkString(","))
-            stmt.string(12) = get(_.exclude_sessions.mkString(","))
-            stmt.string(13) = get(_.session_groups.mkString(","))
-            stmt.string(14) = get(_.sessions.mkString(","))
-            stmt.bool(15) = get(_.build_heap)
-            stmt.bool(16) = get(_.clean_build)
-            stmt.bool(17) = get(_.export_files)
-            stmt.bool(18) = get(_.fresh_build)
-            stmt.bool(19) = get(_.presentation)
-            stmt.bool(20) = get(_.verbose)
+            stmt.string(9) = get(user_build => user_build.prefs.map(_.print).mkString(","))
+            stmt.bool(10) = get(_.requirements)
+            stmt.bool(11) = get(_.all_sessions)
+            stmt.string(12) = get(_.base_sessions.mkString(","))
+            stmt.string(13) = get(_.exclude_session_groups.mkString(","))
+            stmt.string(14) = get(_.exclude_sessions.mkString(","))
+            stmt.string(15) = get(_.session_groups.mkString(","))
+            stmt.string(16) = get(_.sessions.mkString(","))
+            stmt.bool(17) = get(_.build_heap)
+            stmt.bool(18) = get(_.clean_build)
+            stmt.bool(19) = get(_.export_files)
+            stmt.bool(20) = get(_.fresh_build)
+            stmt.bool(21) = get(_.presentation)
+            stmt.bool(22) = get(_.verbose)
           })
       }
 
@@ -380,12 +407,15 @@
       val kind = SQL.Column.string("kind")
       val number = SQL.Column.long("number")
       val isabelle_rev = SQL.Column.string("isabelle_rev")
+      val build_cluster = SQL.Column.bool("build_cluster")
+      val hostnames = SQL.Column.string("hostnames")
       val components = SQL.Column.string("components")
       val start_date = SQL.Column.date("start_date")
       val cancelled = SQL.Column.bool("cancelled")
 
       val table =
-        make_table(List(id, kind, number, isabelle_rev, components, start_date, cancelled),
+        make_table(List(id, kind, number, isabelle_rev, build_cluster, hostnames, components,
+          start_date, cancelled),
         name = "running")
     }
 
@@ -396,12 +426,14 @@
           val kind = res.string(Running.kind)
           val number = res.long(Running.number)
           val isabelle_rev = res.string(Running.isabelle_rev)
+          val build_cluster = res.bool(Running.build_cluster)
+          val hostnames = space_explode(',', res.string(Running.hostnames))
           val components = space_explode(',', res.string(Running.components)).map(Component.parse)
           val start_date = res.date(Running.start_date)
           val cancelled = res.bool(Running.cancelled)
 
-          val job =
-            Job(UUID.make(id), kind, number, isabelle_rev, components, start_date, cancelled)
+          val job = Job(UUID.make(id), kind, number, isabelle_rev, build_cluster, hostnames,
+            components, start_date, cancelled)
 
           job.name -> job
         })
@@ -425,9 +457,11 @@
             stmt.string(2) = job.kind
             stmt.long(3) = job.number
             stmt.string(4) = job.isabelle_rev
-            stmt.string(5) = job.components.mkString(",")
-            stmt.date(6) = job.start_date
-            stmt.bool(7) = job.cancelled
+            stmt.bool(5) = job.build_cluster
+            stmt.string(6) = job.hostnames.mkString(",")
+            stmt.string(7) = job.components.mkString(",")
+            stmt.date(8) = job.start_date
+            stmt.bool(9) = job.cancelled
           })
       }
       update
@@ -631,13 +665,13 @@
 
     private def start_next(): Option[Context] =
       synchronized_database("start_next") {
-        _state.next.headOption.flatMap { task =>
+        _state.next(build_hosts).flatMap { task =>
           progress.echo("Initializing " + task.name)
 
           _state = _state.remove_pending(task.name)
 
           val number = _state.next_number(task.kind)
-          val context = Context(store, task, number, build_hosts)
+          val context = Context(store, task, number)
 
           Exn.capture {
             context.init()
@@ -646,6 +680,8 @@
             val isabelle_rev =
               sync(isabelle_repository, task.isabelle_rev, context.task_dir)
 
+            val hostnames = task.build_hosts.map(_.hostname).distinct
+
             val components =
               for (component <- task.components)
               yield sync_dirs.find(_.name == component.name) match {
@@ -657,7 +693,7 @@
                   else error("Unknown component " + component)
               }
 
-            Job(task.id, task.kind, number, isabelle_rev, components)
+            Job(task.id, task.kind, number, isabelle_rev, task.build_cluster, hostnames, components)
           } match {
             case Exn.Res(job) =>
               _state = _state.add_running(job)
@@ -704,17 +740,18 @@
 
     def init: Runner.State = Runner.State.empty
     def loop_body(state: Runner.State): Runner.State = {
-      if (state.is_empty && !progress.stopped) {
-        start_next() match {
-          case None => state
-          case Some(context) => state.init(context)
+      val state1 =
+        if (progress.stopped) state
+        else {
+          start_next() match {
+            case None => state
+            case Some(context) => state.init(context)
+          }
         }
-      }
-      else {
-        val (state1, results) = stop_cancelled(state).update
-        results.foreach(finish_job)
-        state1
-      }
+      val state2 = stop_cancelled(state1)
+      val (state3, results) = state2.update
+      results.foreach(finish_job)
+      state3
     }
   }
 
@@ -761,7 +798,8 @@
           if isabelle_updated || ci_job.components.exists(updated_components.contains)
           if !_state.pending.values.exists(_.kind == ci_job.name)
         } {
-          val task = Task(CI_Build(ci_job), priority = Priority.low, isabelle_rev = "default")
+          val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec,
+            priority = Priority.low, isabelle_rev = "default")
           _state = _state.add_pending(task)
         }
       }
@@ -796,7 +834,8 @@
       for (ci_job <-ci_jobs)
         ci_job.trigger match {
           case isabelle.CI_Build.Timed(in_interval) if in_interval(previous, next) =>
-            val task = Task(CI_Build(ci_job), isabelle_rev = "default")
+            val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec,
+              isabelle_rev = "default")
             _state = _state.add_pending(task)
           case _ =>
         }
@@ -1063,21 +1102,21 @@
 
   /* context */
 
-  case class Context(
-    store: Store,
-    task: Task,
-    number: Long,
-    build_hosts: List[Build_Cluster.Host]
-  ) {
-    def name: String = task.kind + "/" + number
+  case class Context(store: Store, task: Task, number: Long) {
+    def name = task.kind + "/" + number
+    def progress: Progress = new File_Progress(store.log_file(name))
+
+    def task_dir: Path = store.task_dir(task)
+
     def init(): Unit = Isabelle_System.make_directory(store.log_file(name).dir)
-    def task_dir: Path = store.task_dir(task)
-    def components: List[Component] = task.build_config.components
-    def isabelle_identifier: String = store.identifier
-    def fresh_build: Boolean = task.build_config.fresh_build
-    def command: String = task.build_config.command(build_hosts)
-    def progress: Progress = new File_Progress(store.log_file(name))
-    def open_ssh(): SSH.Session = store.open_ssh()
+
+    def isabelle_identifier: String =
+      if (task.build_cluster) store.options.string("build_cluster_identifier") else store.identifier
+
+    def open_ssh(): SSH.System = {
+      if (task.build_cluster) store.open_ssh()
+      else Library.the_single(task.build_hosts).open_ssh(store.options)
+    }
   }
 
 
@@ -1087,10 +1126,14 @@
     def open(context: Context): Build_Process = new Build_Process(context.open_ssh(), context)
   }
 
-  class Build_Process(ssh: SSH.Session, context: Context) {
+  class Build_Process(ssh: SSH.System, context: Context) {
+    private val task = context.task
+    private val progress = context.progress
+
+
+    /* resources with cleanup operations */
+
     private val _dir = ssh.tmp_dir()
-    private val _progress = context.progress
-
     private val _isabelle =
       try {
         val rsync_context = Rsync.Context(ssh = ssh)
@@ -1099,7 +1142,7 @@
           rsync_context.target(_dir))).check
 
         Isabelle_System.rm_tree(context.task_dir)
-        Other_Isabelle(_dir, context.isabelle_identifier, ssh, _progress)
+        Other_Isabelle(_dir, context.isabelle_identifier, ssh, progress)
       }
       catch { case exn: Throwable => close(); throw exn }
 
@@ -1107,30 +1150,23 @@
       try {
         val init_components =
           for {
-            component <- context.components
+            component <- task.build_config.components
             target = _dir + Sync.DIRS + Path.basic(component.name)
             if Components.is_component_dir(target)
           } yield "init_component " + quote(target.absolute.implode)
 
         _isabelle.init(
           other_settings = _isabelle.init_components() ::: init_components,
-          fresh = context.fresh_build, echo = true)
+          fresh = task.build_config.fresh_build, echo = true)
 
-        val cmd = context.command
-        _progress.echo("isabelle" + cmd)
+        val cmd = task.build_config.command(task.build_hosts)
+        progress.echo("isabelle" + cmd)
 
         val script = File.bash_path(Isabelle_Tool.exe(_isabelle.isabelle_home)) + cmd
         ssh.bash_process(_isabelle.bash_context(script), settings = false)
       }
       catch { case exn: Throwable => close(); throw exn }
 
-    def run(): Process_Result = {
-      val process_result =
-        _process.result(progress_stdout = _progress.echo(_), progress_stderr = _progress.echo(_))
-      close()
-      process_result
-    }
-
     def cancel(): Unit = Option(_process).foreach(_.interrupt())
 
     def close(): Unit = {
@@ -1138,6 +1174,16 @@
       Isabelle_System.rm_tree(context.task_dir)
       ssh.close()
     }
+
+
+    /* execution */
+
+    def run(): Process_Result = {
+      val process_result =
+        _process.result(progress_stdout = progress.echo(_), progress_stderr = progress.echo(_))
+      close()
+      process_result
+    }
   }
 
 
@@ -1250,10 +1296,11 @@
     val id = UUID.random()
     val afp_rev = if (afp_root.nonEmpty) Some("") else None
 
-    val build_config = User_Build(afp_rev, prefs, requirements, all_sessions, base_sessions,
-      exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap, clean_build,
-      export_files, fresh_build, presentation, verbose)
-    val task = Task(build_config, id, Date.now(), Priority.high)
+    val hosts_spec = options.string("build_manager_cluster")
+    val build_config = User_Build(afp_rev, prefs, requirements, all_sessions,
+      base_sessions, exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap,
+      clean_build, export_files, fresh_build, presentation, verbose)
+    val task = Task(build_config, true, hosts_spec, id, Date.now(), Priority.high)
 
     val dir = store.task_dir(task)
 
@@ -1315,7 +1362,7 @@
   Options are:
     -A ROOT      include AFP with given root directory (":" for """ + AFP.BASE.implode + """)
     -D DIR       include extra component in given directory
-    -H HOSTS     additional cluster host specifications of the form
+    -H HOSTS     host specifications for all available hosts of the form
                  NAMES:PARAMETERS (separated by commas)
     -o OPTION    override Isabelle system OPTION (via NAME=VAL or NAME)
     -p PORT      explicit web server port