merged
authorwenzelm
Sun, 09 Jun 2024 22:40:13 +0200
changeset 80314 594356f16810
parent 80285 8678986d9af5 (diff)
parent 80313 a828e47c867c (current diff)
child 80315 a82db14570cd
child 80316 82c20eaad94a
child 80325 dca37c6479e3
merged
--- a/NEWS	Sun Jun 09 21:16:38 2024 +0200
+++ b/NEWS	Sun Jun 09 22:40:13 2024 +0200
@@ -7,6 +7,10 @@
 New in this Isabelle version
 ----------------------------
 
+* Theory "HOL.Wellfounded":
+  - Renamed lemmas. Minor INCOMPATIBILITY.
+      wfP_if_convertible_to_nat ~> wfp_if_convertible_to_nat
+      wfP_if_convertible_to_wfP ~> wfp_if_convertible_to_wfp
 
 
 New in Isabelle2024 (May 2024)
--- a/etc/options	Sun Jun 09 21:16:38 2024 +0200
+++ b/etc/options	Sun Jun 09 22:40:13 2024 +0200
@@ -243,6 +243,8 @@
 option build_manager_identifier : string = "build_manager"
   -- "isabelle identifier for build manager processes"
 
+option build_manager_cluster : string = "cluster.default"
+
 option build_manager_delay : real = 1.0
   -- "delay build manager loop"
 
--- a/src/HOL/Library/FSet.thy	Sun Jun 09 21:16:38 2024 +0200
+++ b/src/HOL/Library/FSet.thy	Sun Jun 09 22:40:13 2024 +0200
@@ -1335,7 +1335,7 @@
 subsubsection \<open>@{term fsubset}\<close>
 
 lemma wfP_pfsubset: "wfP (|\<subset>|)"
-proof (rule wfP_if_convertible_to_nat)
+proof (rule wfp_if_convertible_to_nat)
   show "\<And>x y. x |\<subset>| y \<Longrightarrow> fcard x < fcard y"
     by (rule pfsubset_fcard_mono)
 qed
--- a/src/HOL/Library/Multiset.thy	Sun Jun 09 21:16:38 2024 +0200
+++ b/src/HOL/Library/Multiset.thy	Sun Jun 09 22:40:13 2024 +0200
@@ -1548,7 +1548,7 @@
 text \<open>Well-foundedness of strict subset relation\<close>
 
 lemma wf_subset_mset_rel: "wf {(M, N :: 'a multiset). M \<subset># N}"
-  using mset_subset_size wfP_def wfP_if_convertible_to_nat by blast
+  using mset_subset_size wfP_def wfp_if_convertible_to_nat by blast
 
 lemma wfP_subset_mset[simp]: "wfP (\<subset>#)"
   by (rule wf_subset_mset_rel[to_pred])
--- a/src/HOL/Wellfounded.thy	Sun Jun 09 21:16:38 2024 +0200
+++ b/src/HOL/Wellfounded.thy	Sun Jun 09 22:40:13 2024 +0200
@@ -1065,16 +1065,16 @@
     using convertible .
 qed
 
-lemma wfP_if_convertible_to_wfP: "wfP S \<Longrightarrow> (\<And>x y. R x y \<Longrightarrow> S (f x) (f y)) \<Longrightarrow> wfP R"
+lemma wfp_if_convertible_to_wfp: "wfP S \<Longrightarrow> (\<And>x y. R x y \<Longrightarrow> S (f x) (f y)) \<Longrightarrow> wfP R"
   using wf_if_convertible_to_wf[to_pred, of S R f] by simp
 
 text \<open>Converting to @{typ nat} is a very common special case that might be found more easily by
   Sledgehammer.\<close>
 
-lemma wfP_if_convertible_to_nat:
+lemma wfp_if_convertible_to_nat:
   fixes f :: "_ \<Rightarrow> nat"
   shows "(\<And>x y. R x y \<Longrightarrow> f x < f y) \<Longrightarrow> wfP R"
-  by (rule wfP_if_convertible_to_wfP[of "(<) :: nat \<Rightarrow> nat \<Rightarrow> bool", simplified])
+  by (rule wfp_if_convertible_to_wfp[of "(<) :: nat \<Rightarrow> nat \<Rightarrow> bool", simplified])
 
 
 subsubsection \<open>Measure functions into \<^typ>\<open>nat\<close>\<close>
--- a/src/Pure/Admin/ci_build.scala	Sun Jun 09 21:16:38 2024 +0200
+++ b/src/Pure/Admin/ci_build.scala	Sun Jun 09 22:40:13 2024 +0200
@@ -7,6 +7,8 @@
 package isabelle
 
 
+import scala.collection.mutable
+
 import java.time.ZoneId
 import java.time.format.DateTimeFormatter
 import java.util.{Properties => JProperties, Map => JMap}
@@ -23,21 +25,6 @@
   }
 
 
-  /* executor profile */
-
-  case class Profile(threads: Int, jobs: Int, numa: Boolean)
-
-  object Profile {
-    def from_host: Profile = {
-      Isabelle_System.hostname() match {
-        case "hpcisabelle" => Profile(8, 8, numa = true)
-        case "lxcisa1" => Profile(4, 10, numa = false)
-        case _ => Profile(2, 2, numa = false)
-      }
-    }
-  }
-
-
   /* build config */
 
   case class Build_Config(
@@ -62,8 +49,31 @@
       password = options.string("ci_mail_password"))
   }
 
+
   /* ci build jobs */
 
+  sealed trait Hosts {
+    def hosts_spec: String
+    def max_jobs: Option[Int]
+    def prefs: List[Options.Spec]
+    def numa_shuffling: Boolean
+    def build_cluster: Boolean
+  }
+
+  case class Local(host_spec: String, jobs: Int, threads: Int, numa_shuffling: Boolean = true)
+    extends Hosts {
+    def hosts_spec: String = host_spec
+    def max_jobs: Option[Int] = Some(jobs)
+    def prefs: List[Options.Spec] = List(Options.Spec.eq("threads", threads.toString))
+    def build_cluster: Boolean = false
+  }
+
+  case class Cluster(hosts_spec: String, numa_shuffling: Boolean = true) extends Hosts {
+    def max_jobs: Option[Int] = None
+    def prefs: List[Options.Spec] = Nil
+    def build_cluster: Boolean = true
+  }
+
   sealed trait Trigger
   case object On_Commit extends Trigger
 
@@ -76,13 +86,13 @@
           (before.time < start1.time && start1.time <= now.time)
       }
   }
-  
+
   case class Timed(in_interval: (Date, Date) => Boolean) extends Trigger
 
   sealed case class Job(
     name: String,
     description: String,
-    profile: Profile,
+    hosts: Hosts,
     config: Build_Config,
     components: List[String] = Nil,
     trigger: Trigger = On_Commit
@@ -102,7 +112,7 @@
   val timing =
     Job(
       "benchmark", "runs benchmark and timing sessions",
-      Profile(threads = 6, jobs = 1, numa = false),
+      Local("benchmark", jobs = 1, threads = 6, numa_shuffling = false),
       Build_Config(
         documents = false,
         select = List(
@@ -161,8 +171,8 @@
   def print_section(title: String): Unit =
     println("\n=== " + title + " ===\n")
 
-  def ci_build(options: Options, job: Job): Unit = {
-    val profile = job.profile
+  def ci_build(options: Options, build_hosts: List[Build_Cluster.Host], job: Job): Unit = {
+    val hosts = job.hosts
     val config = job.config
 
     val isabelle_home = Path.explode(Isabelle_System.getenv_strict("ISABELLE_HOME"))
@@ -175,12 +185,9 @@
     print_section("CONFIGURATION")
     println(Build_Log.Settings.show())
 
-    val build_options =
-      with_documents(options, config).int.update("threads", profile.threads) +
-        "parallel_proofs=1" + "system_heaps"
+    val build_options = with_documents(options, config) + "parallel_proofs=1" + "system_heaps"
 
-    println(
-      "jobs = " + profile.jobs + ", threads = " + profile.threads + ", numa = " + profile.numa)
+    println(hosts)
 
     print_section("BUILD")
     println("Build started at " + formatted_time)
@@ -193,12 +200,13 @@
       val start_time = Time.now()
       val results = progress.interrupt_handler {
         Build.build(
-          build_options,
+          build_options ++ hosts.prefs,
+          build_hosts = build_hosts,
           selection = config.selection,
           progress = progress,
           clean_build = config.clean,
-          numa_shuffling = profile.numa,
-          max_jobs = Some(profile.jobs),
+          numa_shuffling = hosts.numa_shuffling,
+          max_jobs = hosts.max_jobs,
           dirs = config.include,
           select_dirs = config.select)
       }
@@ -241,17 +249,23 @@
       /* arguments */
 
       var options = Options.init()
+      val build_hosts = new mutable.ListBuffer[Build_Cluster.Host]
 
       val getopts = Getopts("""
 Usage: isabelle ci_build [OPTIONS] JOB
 
   Options are:
+    -H HOSTS     host specifications of the form NAMES:PARAMETERS (separated by commas)
     -o OPTION    override Isabelle system OPTION (via NAME=VAL or NAME)
 
-  Runs Isabelle builds in ci environment, with the following build jobs:
+  Runs Isabelle builds in ci environment. For cluster builds, build hosts must
+  be passed explicitly (no registry).
+
+  The following build jobs are available:
 
 """ + Library.indent_lines(4, show_jobs) + "\n",
-        "o:" -> (arg => options = options + arg))
+        "o:" -> (arg => options = options + arg),
+        "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.load(Nil), arg)))
 
       val more_args = getopts(args)
 
@@ -260,7 +274,7 @@
         case _ => getopts.usage()
       }
 
-      ci_build(options, job)
+      ci_build(options, build_hosts.toList, job)
     })
 }
 
--- a/src/Pure/Build/build_manager.scala	Sun Jun 09 21:16:38 2024 +0200
+++ b/src/Pure/Build/build_manager.scala	Sun Jun 09 22:40:13 2024 +0200
@@ -42,7 +42,10 @@
 
   case class CI_Build(name: String, components: List[Component]) extends Build_Config {
     def fresh_build: Boolean = true
-    def command(build_hosts: List[Build_Cluster.Host]): String = " ci_build " + name
+    def command(build_hosts: List[Build_Cluster.Host]): String =
+      " ci_build" +
+      build_hosts.map(host => " -H " + Bash.string(host.print)).mkString +
+      " " + name
   }
 
   object User_Build {
@@ -72,7 +75,7 @@
       " build" +
         if_proper(afp_rev, " -A:") +
         base_sessions.map(session => " -B " + Bash.string(session)).mkString +
-        if_proper(build_hosts, build_hosts.map(host => " -H " + Bash.string(host.print)).mkString) +
+        build_hosts.map(host => " -H " + Bash.string(host.print)).mkString +
         if_proper(presentation, " -P:") +
         if_proper(requirements, " -R") +
         if_proper(all_sessions, " -a") +
@@ -92,6 +95,8 @@
 
   sealed case class Task(
     build_config: Build_Config,
+    build_cluster: Boolean,
+    hosts_spec: String,
     id: UUID.T = UUID.random(),
     submit_date: Date = Date.now(),
     priority: Priority = Priority.normal,
@@ -100,6 +105,9 @@
     def name: String = id.toString
     def kind: String = build_config.name
     def components: List[Component] = build_config.components
+
+    def build_hosts: List[Build_Cluster.Host] =
+      Build_Cluster.Host.parse(Registry.global, hosts_spec)
   }
 
   sealed case class Job(
@@ -107,6 +115,8 @@
     kind: String,
     number: Long,
     isabelle_rev: String,
+    build_cluster: Boolean,
+    hostnames: List[String],
     components: List[Component],
     start_date: Date = Date.now(),
     cancelled: Boolean = false
@@ -156,12 +166,22 @@
 
     def num_builds = running.size + finished.size
 
-    def next: List[Task] =
-      if (pending.isEmpty) Nil
+    def next(build_hosts: List[Build_Cluster.Host]): Option[Task] = {
+      val cluster_running = running.values.exists(_.build_cluster)
+      val available = build_hosts.map(_.hostname).toSet - running.values.flatMap(_.hostnames).toSet
+      val ready =
+        for {
+          (_, task) <- pending
+          if !task.build_cluster || !cluster_running
+          if task.build_hosts.map(_.hostname).forall(available.contains)
+        } yield task
+
+      if (ready.isEmpty) None
       else {
-        val priority = pending.values.map(_.priority).maxBy(_.ordinal)
-        pending.values.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering)
+        val priority = ready.map(_.priority).maxBy(_.ordinal)
+        ready.filter(_.priority == priority).toList.sortBy(_.submit_date)(Date.Ordering).headOption
       }
+    }
 
     def add_running(job: Job): State = copy(running = running + (job.name -> job))
     def remove_running(name: String): State = copy(running = running - name)
@@ -255,6 +275,8 @@
 
     object Pending {
       val kind = SQL.Column.string("kind")
+      val build_cluster = SQL.Column.bool("build_cluster")
+      val hosts_spec = SQL.Column.string("hosts_spec")
       val id = SQL.Column.string("id").make_primary_key
       val submit_date = SQL.Column.date("submit_date")
       val priority = SQL.Column.string("priority")
@@ -277,10 +299,10 @@
       val verbose = SQL.Column.bool("verbose")
 
       val table =
-        make_table(List(kind, id, submit_date, priority, isabelle_rev, components, prefs,
-          requirements, all_sessions, base_sessions, exclude_session_groups, exclude_sessions,
-          session_groups, sessions, build_heap, clean_build, export_files, fresh_build,
-          presentation, verbose),
+        make_table(List(kind, build_cluster, hosts_spec, id, submit_date, priority, isabelle_rev,
+          components, prefs, requirements, all_sessions, base_sessions, exclude_session_groups,
+          exclude_sessions, session_groups, sessions, build_heap, clean_build, export_files,
+          fresh_build, presentation, verbose),
         name = "pending")
     }
 
@@ -288,6 +310,8 @@
       db.execute_query_statement(Pending.table.select(), Map.from[String, Task], get =
         { res =>
           val kind = res.string(Pending.kind)
+          val build_cluster = res.bool(Pending.build_cluster)
+          val hosts_spec = res.string(Pending.hosts_spec)
           val id = res.string(Pending.id)
           val submit_date = res.date(Pending.submit_date)
           val priority = Priority.valueOf(res.string(Pending.priority))
@@ -319,7 +343,8 @@
                 clean_build, export_files, fresh_build, presentation, verbose)
             }
 
-          val task = Task(build_config, UUID.make(id), submit_date, priority, isabelle_rev)
+          val task = Task(build_config, build_cluster, hosts_spec, UUID.make(id), submit_date,
+            priority, isabelle_rev)
 
           task.name -> task
         })
@@ -340,11 +365,13 @@
           for (name <- update.insert) yield { (stmt: SQL.Statement) =>
             val task = pending(name)
             stmt.string(1) = task.kind
-            stmt.string(2) = task.id.toString
-            stmt.date(3) = task.submit_date
-            stmt.string(4) = task.priority.toString
-            stmt.string(5) = task.isabelle_rev
-            stmt.string(6) = task.components.mkString(",")
+            stmt.bool(2) = task.build_cluster
+            stmt.string(3) = task.hosts_spec
+            stmt.string(4) = task.id.toString
+            stmt.date(5) = task.submit_date
+            stmt.string(6) = task.priority.toString
+            stmt.string(7) = task.isabelle_rev
+            stmt.string(8) = task.components.mkString(",")
 
             def get[A](f: User_Build => A): Option[A] =
               task.build_config match {
@@ -352,20 +379,20 @@
                 case _ => None
               }
 
-            stmt.string(7) = get(user_build => user_build.prefs.map(_.print).mkString(","))
-            stmt.bool(8) = get(_.requirements)
-            stmt.bool(9) = get(_.all_sessions)
-            stmt.string(10) = get(_.base_sessions.mkString(","))
-            stmt.string(11) = get(_.exclude_session_groups.mkString(","))
-            stmt.string(12) = get(_.exclude_sessions.mkString(","))
-            stmt.string(13) = get(_.session_groups.mkString(","))
-            stmt.string(14) = get(_.sessions.mkString(","))
-            stmt.bool(15) = get(_.build_heap)
-            stmt.bool(16) = get(_.clean_build)
-            stmt.bool(17) = get(_.export_files)
-            stmt.bool(18) = get(_.fresh_build)
-            stmt.bool(19) = get(_.presentation)
-            stmt.bool(20) = get(_.verbose)
+            stmt.string(9) = get(user_build => user_build.prefs.map(_.print).mkString(","))
+            stmt.bool(10) = get(_.requirements)
+            stmt.bool(11) = get(_.all_sessions)
+            stmt.string(12) = get(_.base_sessions.mkString(","))
+            stmt.string(13) = get(_.exclude_session_groups.mkString(","))
+            stmt.string(14) = get(_.exclude_sessions.mkString(","))
+            stmt.string(15) = get(_.session_groups.mkString(","))
+            stmt.string(16) = get(_.sessions.mkString(","))
+            stmt.bool(17) = get(_.build_heap)
+            stmt.bool(18) = get(_.clean_build)
+            stmt.bool(19) = get(_.export_files)
+            stmt.bool(20) = get(_.fresh_build)
+            stmt.bool(21) = get(_.presentation)
+            stmt.bool(22) = get(_.verbose)
           })
       }
 
@@ -380,12 +407,15 @@
       val kind = SQL.Column.string("kind")
       val number = SQL.Column.long("number")
       val isabelle_rev = SQL.Column.string("isabelle_rev")
+      val build_cluster = SQL.Column.bool("build_cluster")
+      val hostnames = SQL.Column.string("hostnames")
       val components = SQL.Column.string("components")
       val start_date = SQL.Column.date("start_date")
       val cancelled = SQL.Column.bool("cancelled")
 
       val table =
-        make_table(List(id, kind, number, isabelle_rev, components, start_date, cancelled),
+        make_table(List(id, kind, number, isabelle_rev, build_cluster, hostnames, components,
+          start_date, cancelled),
         name = "running")
     }
 
@@ -396,12 +426,14 @@
           val kind = res.string(Running.kind)
           val number = res.long(Running.number)
           val isabelle_rev = res.string(Running.isabelle_rev)
+          val build_cluster = res.bool(Running.build_cluster)
+          val hostnames = space_explode(',', res.string(Running.hostnames))
           val components = space_explode(',', res.string(Running.components)).map(Component.parse)
           val start_date = res.date(Running.start_date)
           val cancelled = res.bool(Running.cancelled)
 
-          val job =
-            Job(UUID.make(id), kind, number, isabelle_rev, components, start_date, cancelled)
+          val job = Job(UUID.make(id), kind, number, isabelle_rev, build_cluster, hostnames,
+            components, start_date, cancelled)
 
           job.name -> job
         })
@@ -425,9 +457,11 @@
             stmt.string(2) = job.kind
             stmt.long(3) = job.number
             stmt.string(4) = job.isabelle_rev
-            stmt.string(5) = job.components.mkString(",")
-            stmt.date(6) = job.start_date
-            stmt.bool(7) = job.cancelled
+            stmt.bool(5) = job.build_cluster
+            stmt.string(6) = job.hostnames.mkString(",")
+            stmt.string(7) = job.components.mkString(",")
+            stmt.date(8) = job.start_date
+            stmt.bool(9) = job.cancelled
           })
       }
       update
@@ -567,42 +601,49 @@
     }
 
     class State private(
-      processes: Map[String, Future[Bash.Process]],
-      results: Map[String, Future[Process_Result]]
+      process_futures: Map[String, Future[Build_Process]],
+      result_futures: Map[String, Future[Process_Result]]
     ) {
-      def is_empty = processes.isEmpty && results.isEmpty
+      def is_empty = process_futures.isEmpty && result_futures.isEmpty
 
-      def init(build_config: Build_Config, job: Job, context: Context): State = {
-        val process = Future.fork(context.process(build_config))
-        val result =
+      def init(context: Context): State = {
+        val process_future = Future.fork(Build_Process.open(context))
+        val result_future =
           Future.fork(
-            process.join_result match {
-              case Exn.Res(res) => context.run(res)
-              case Exn.Exn(_) => Process_Result(Process_Result.RC.interrupt)
+            process_future.join_result match {
+              case Exn.Res(process) => process.run()
+              case Exn.Exn(exn) => Process_Result(Process_Result.RC.interrupt).error(exn.getMessage)
             })
-        new State(processes + (job.name -> process), results + (job.name -> result))
+        new State(
+          process_futures + (context.name -> process_future),
+          result_futures + (context.name -> result_future))
       }
 
-      def running: List[String] = processes.keys.toList
+      def running: List[String] = process_futures.keys.toList
 
       def update: (State, Map[String, Process_Result]) = {
         val finished =
-          for ((name, future) <- results if future.is_finished) yield name -> future.join
+          for ((name, future) <- result_futures if future.is_finished)
+          yield name ->
+            (future.join_result match {
+              case Exn.Res(result) => result
+              case Exn.Exn(exn) => Process_Result(Process_Result.RC.interrupt).error(exn.getMessage)
+            })
 
-        val processes1 = processes.filterNot((name, _) => finished.contains(name))
-        val results1 = results.filterNot((name, _) => finished.contains(name))
+        val process_futures1 = process_futures.filterNot((name, _) => finished.contains(name))
+        val result_futures1 = result_futures.filterNot((name, _) => finished.contains(name))
 
-        (new State(processes1, results1), finished)
+        (new State(process_futures1, result_futures1), finished)
       }
 
       def cancel(cancelled: List[String]): State = {
         for (name <- cancelled) {
-          val process = processes(name)
-          if (process.is_finished) process.join.interrupt()
-          else process.cancel()
+          val process_future = process_futures(name)
+          if (process_future.is_finished) process_future.join.cancel()
+          else process_future.cancel()
         }
 
-        new State(processes.filterNot((name, _) => cancelled.contains(name)), results)
+        new State(process_futures.filterNot((name, _) => cancelled.contains(name)), result_futures)
       }
     }
   }
@@ -627,54 +668,55 @@
       }
     }
 
-    private def start_next(): Option[(Build_Config, Job)] =
-      synchronized_database("start_job") {
-        _state.next.headOption.flatMap { task =>
+    private def start_next(): Option[Context] =
+      synchronized_database("start_next") {
+        _state.next(build_hosts).flatMap { task =>
           progress.echo("Initializing " + task.name)
 
           _state = _state.remove_pending(task.name)
 
-          val context = Context(store, task, build_hosts)
           val number = _state.next_number(task.kind)
+          val context = Context(store, task, number)
 
           Exn.capture {
-            store.sync_permissions(context.dir)
+            context.init()
+            store.sync_permissions(context.task_dir)
 
             val isabelle_rev =
-              sync(isabelle_repository, task.isabelle_rev, context.isabelle_dir)
+              sync(isabelle_repository, task.isabelle_rev, context.task_dir)
+
+            val hostnames = task.build_hosts.map(_.hostname).distinct
 
             val components =
               for (component <- task.components)
               yield sync_dirs.find(_.name == component.name) match {
                 case Some(sync_dir) =>
-                  val target = context.isabelle_dir + sync_dir.target
+                  val target = context.task_dir + sync_dir.target
                   component.copy(rev = sync(sync_dir.hg, component.rev, target))
                 case None =>
                   if (component.rev.isEmpty) component
                   else error("Unknown component " + component)
               }
 
-            Job(task.id, task.kind, number, isabelle_rev, components)
+            Job(task.id, task.kind, number, isabelle_rev, task.build_cluster, hostnames, components)
           } match {
             case Exn.Res(job) =>
               _state = _state.add_running(job)
-              val context1 = context.move(Context(store, job))
 
               val msg = "Starting " + job.name
               echo(msg + " (id " + job.id + ")")
-              context1.progress.echo(msg)
+              context.progress.echo(msg)
 
-              Some(task.build_config, job)
+              Some(context)
             case Exn.Exn(exn) =>
               val result = Result(task.kind, number, Status.aborted)
-              val context1 = Context(store, result)
+              _state = _state.add_finished(result)
 
               val msg = "Failed to start job: " + exn.getMessage
               echo_error_message(msg)
-              context1.progress.echo_error_message(msg)
+              context.progress.echo_error_message(msg)
 
-              context.remove()
-              _state = _state.add_finished(result)
+              Isabelle_System.rm_tree(context.task_dir)
 
               None
           }
@@ -690,12 +732,11 @@
     private def finish_job(name: String, process_result: Process_Result): Unit =
       synchronized_database("finish_job") {
         val job = _state.running(name)
-        val context = Context(store, job, build_hosts)
+        val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id))
 
-        val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id))
-        context.copy_results(Context(store, result))
-        context.remove()
-        echo("Finished job " + job.id + " with status code " + process_result.rc)
+        val interrupted_error = process_result.interrupted && process_result.err.nonEmpty
+        val err_msg = if_proper(interrupted_error, ": " + process_result.err)
+        echo("Finished job " + job.id + " with status code " + process_result.rc + err_msg)
 
         _state = _state
           .remove_running(job.name)
@@ -706,18 +747,18 @@
 
     def init: Runner.State = Runner.State.empty
     def loop_body(state: Runner.State): Runner.State = {
-      if (state.is_empty && !progress.stopped) {
-        start_next() match {
-          case None => state
-          case Some((build_config, job)) =>
-            state.init(build_config, job, Context(store, job, build_hosts))
+      val state1 =
+        if (progress.stopped) state
+        else {
+          start_next() match {
+            case None => state
+            case Some(context) => state.init(context)
+          }
         }
-      }
-      else {
-        val (state1, results) = stop_cancelled(state).update
-        results.foreach(finish_job)
-        state1
-      }
+      val state2 = stop_cancelled(state1)
+      val (state3, results) = state2.update
+      results.foreach(finish_job)
+      state3
     }
   }
 
@@ -764,7 +805,8 @@
           if isabelle_updated || ci_job.components.exists(updated_components.contains)
           if !_state.pending.values.exists(_.kind == ci_job.name)
         } {
-          val task = Task(CI_Build(ci_job), priority = Priority.low, isabelle_rev = "default")
+          val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec,
+            priority = Priority.low, isabelle_rev = "default")
           _state = _state.add_pending(task)
         }
       }
@@ -799,7 +841,8 @@
       for (ci_job <-ci_jobs)
         ci_job.trigger match {
           case isabelle.CI_Build.Timed(in_interval) if in_interval(previous, next) =>
-            val task = Task(CI_Build(ci_job), isabelle_rev = "default")
+            val task = Task(CI_Build(ci_job), ci_job.hosts.build_cluster, ci_job.hosts.hosts_spec,
+              isabelle_rev = "default")
             _state = _state.add_pending(task)
           case _ =>
         }
@@ -838,17 +881,17 @@
           for {
             (name, (time, log)) <- logs
             if time + keep > Time.now()
-          } yield name -> (time, Context(store, state.get(name).get).log)
+          } yield name -> (time, File.read(store.log_file(name)))
       }
 
-      def lookup(store: Store, elem: T): String = synchronized {
-        logs.get(elem.name) match {
+      def lookup(store: Store, name: String): String = synchronized {
+        logs.get(name) match {
           case Some((_, log)) =>
-            logs += elem.name -> (Time.now(), log)
+            logs += name -> (Time.now(), log)
           case None =>
-            logs += elem.name -> (Time.now(), Context(store, elem).log)
+            logs += name -> (Time.now(), File.read(store.log_file(name)))
         }
-        logs(elem.name)._2
+        logs(name)._2
       }
     }
   }
@@ -912,7 +955,9 @@
 
           def render_job(job: Job): XML.Body =
             par(link_build(job.name, job.number) :: text(": running since " + job.start_date)) ::
-            render_if(finished.headOption.exists(_.status != Status.ok), render_previous(finished))
+            render_if(
+              finished.headOption.exists(_.status != Status.ok) && job.kind != User_Build.name,
+              render_previous(finished))
 
           def render_result(result: Result): XML.Body =
             par(
@@ -974,11 +1019,11 @@
                 if (job.cancelled) text("Cancelling...")
                 else text("Running...") ::: render_cancel(job.id)) ::
               render_rev(job.isabelle_rev, job.components) :::
-              source(cache.lookup(store, job)) :: Nil
+              source(cache.lookup(store, job.name)) :: Nil
             case result: Result =>
               par(text("Date: " + result.date)) ::
               par(text("Status: " + result.status)) ::
-              source(cache.lookup(store, result)) :: Nil
+              source(cache.lookup(store, result.name)) :: Nil
           }
         }
 
@@ -1066,62 +1111,86 @@
 
   /* context */
 
-  object Context {
-    def apply(store: Store, elem: T, build_hosts: List[Build_Cluster.Host] = Nil): Context =
-      new Context(store, store.dir(elem), build_hosts)
+  case class Context(store: Store, task: Task, number: Long) {
+    def name = task.kind + "/" + number
+    def progress: Progress = new File_Progress(store.log_file(name))
+
+    def task_dir: Path = store.task_dir(task)
+
+    def init(): Unit = Isabelle_System.make_directory(store.log_file(name).dir)
+
+    def isabelle_identifier: String =
+      if (task.build_cluster) store.options.string("build_cluster_identifier") else store.identifier
+
+    def open_ssh(): SSH.System = {
+      if (task.build_cluster) store.open_ssh()
+      else Library.the_single(task.build_hosts).open_ssh(store.options)
+    }
+  }
+
+
+  /* build process */
+
+  object Build_Process {
+    def open(context: Context): Build_Process = new Build_Process(context.open_ssh(), context)
   }
 
-  class Context private(store: Store, val dir: Path, val build_hosts: List[Build_Cluster.Host]) {
-    def isabelle_dir: Path = dir + Path.basic("isabelle")
+  class Build_Process(ssh: SSH.System, context: Context) {
+    private val task = context.task
+    private val progress = context.progress
+
+
+    /* resources with cleanup operations */
 
-    private val log_file = dir + Path.basic("log")
-    val progress = new File_Progress(log_file, verbose = true)
-    def log: String =
-      Exn.capture(File.read(log_file)) match {
-        case Exn.Exn(_) => ""
-        case Exn.Res(res) => res
+    private val _dir = ssh.tmp_dir()
+    private val _isabelle =
+      try {
+        val rsync_context = Rsync.Context(ssh = ssh)
+        val source = File.standard_path(context.task_dir)
+        Rsync.exec(rsync_context, clean = true, args = List("--", Url.direct_path(source),
+          rsync_context.target(_dir))).check
+
+        Isabelle_System.rm_tree(context.task_dir)
+        Other_Isabelle(_dir, context.isabelle_identifier, ssh, progress)
       }
+      catch { case exn: Throwable => close(); throw exn }
 
-    def move(other: Context): Context = {
-      Isabelle_System.make_directory(other.dir.dir)
-      Isabelle_System.move_file(dir, other.dir)
-      other
-    }
+    private val _process =
+      try {
+        val init_components =
+          for {
+            component <- task.build_config.components
+            target = _dir + Sync.DIRS + Path.basic(component.name)
+            if Components.is_component_dir(target)
+          } yield "init_component " + quote(target.absolute.implode)
+
+        _isabelle.init(
+          other_settings = _isabelle.init_components() ::: init_components,
+          fresh = task.build_config.fresh_build, echo = true)
 
-    def copy_results(other: Context): Context = {
-      Isabelle_System.make_directory(other.dir)
-      Isabelle_System.copy_file(log_file, other.log_file)
-      other
+        val cmd = task.build_config.command(task.build_hosts)
+        progress.echo("isabelle" + cmd)
+
+        val script = File.bash_path(Isabelle_Tool.exe(_isabelle.isabelle_home)) + cmd
+        ssh.bash_process(_isabelle.bash_context(script), settings = false)
+      }
+      catch { case exn: Throwable => close(); throw exn }
+
+    def cancel(): Unit = Option(_process).foreach(_.interrupt())
+
+    def close(): Unit = {
+      Option(_dir).foreach(ssh.rm_tree)
+      Isabelle_System.rm_tree(context.task_dir)
+      ssh.close()
     }
 
-    def remove(): Unit = Isabelle_System.rm_tree(dir)
 
-    lazy val ssh = store.open_ssh()
-
-    def process(build_config: Build_Config): Bash.Process = {
-      val isabelle = Other_Isabelle(isabelle_dir, store.identifier, ssh, progress)
-
-      val init_components =
-        for {
-          dir <- build_config.components
-          target = isabelle_dir + Sync.DIRS + Path.basic(dir.name)
-          if Components.is_component_dir(target)
-        } yield "init_component " + quote(target.absolute.implode)
+    /* execution */
 
-      isabelle.init(other_settings = isabelle.init_components() ::: init_components,
-        fresh = build_config.fresh_build, echo = true)
-
-      val cmd = build_config.command(build_hosts)
-      progress.echo("isabelle" + cmd)
-
-      val script = File.bash_path(Isabelle_Tool.exe(isabelle.isabelle_home)) + cmd
-      ssh.bash_process(isabelle.bash_context(script), settings = false)
-    }
-
-    def run(process: Bash.Process): Process_Result = {
+    def run(): Process_Result = {
       val process_result =
-        process.result(progress_stdout = progress.echo(_), progress_stderr = progress.echo(_))
-      ssh.close()
+        _process.result(progress_stdout = progress.echo(_), progress_stderr = progress.echo(_))
+      close()
       process_result
     }
   }
@@ -1134,15 +1203,10 @@
     val identifier = options.string("build_manager_identifier")
 
     private val pending = base_dir + Path.basic("pending")
-    private val running = base_dir + Path.basic("running")
     private val finished = base_dir + Path.basic("finished")
 
-    def dir(elem: T): Path =
-      elem match {
-        case task: Task => pending + Path.basic(task.id.toString)
-        case job: Job => running + Path.make(List(job.kind, job.number.toString))
-        case result: Result => finished + Path.make(List(result.kind, result.number.toString))
-      }
+    def task_dir(task: Task) = pending + Path.basic(task.id.toString)
+    def log_file(name: String): Path = finished + Path.explode(name)
 
     def sync_permissions(dir: Path, ssh: SSH.System = SSH.Local): Unit = {
       ssh.execute("chmod -R g+rwx " + File.bash_path(dir))
@@ -1150,8 +1214,7 @@
     }
 
     def init_dirs(): Unit =
-      List(pending, running, finished).foreach(dir =>
-        sync_permissions(Isabelle_System.make_directory(dir)))
+      List(pending, finished).foreach(dir => sync_permissions(Isabelle_System.make_directory(dir)))
 
     val ssh_group: String = options.string("build_manager_ssh_group")
 
@@ -1242,24 +1305,25 @@
     val id = UUID.random()
     val afp_rev = if (afp_root.nonEmpty) Some("") else None
 
-    val build_config = User_Build(afp_rev, prefs, requirements, all_sessions, base_sessions,
-      exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap, clean_build,
-      export_files, fresh_build, presentation, verbose)
-    val task = Task(build_config, id, Date.now(), Priority.high)
+    val hosts_spec = options.string("build_manager_cluster")
+    val build_config = User_Build(afp_rev, prefs, requirements, all_sessions,
+      base_sessions, exclude_session_groups, exclude_sessions, session_groups, sessions, build_heap,
+      clean_build, export_files, fresh_build, presentation, verbose)
+    val task = Task(build_config, true, hosts_spec, id, Date.now(), Priority.high)
 
-    val context = Context(store, task)
+    val dir = store.task_dir(task)
 
     progress.interrupt_handler {
       using(store.open_ssh()) { ssh =>
         val rsync_context = Rsync.Context(ssh = ssh)
         progress.echo("Transferring repositories...")
-        Sync.sync(store.options, rsync_context, context.isabelle_dir, preserve_jars = true,
+        Sync.sync(store.options, rsync_context, dir, preserve_jars = true,
           dirs = Sync.afp_dirs(afp_root), rev = rev)
-        store.sync_permissions(context.dir, ssh)
+        store.sync_permissions(dir, ssh)
 
         if (progress.stopped) {
           progress.echo("Cancelling submission...")
-          ssh.rm_tree(context.dir)
+          ssh.rm_tree(dir)
         } else {
           using(store.open_postgresql_server()) { server =>
             using(store.open_database(server = server)) { db =>
@@ -1307,7 +1371,7 @@
   Options are:
     -A ROOT      include AFP with given root directory (":" for """ + AFP.BASE.implode + """)
     -D DIR       include extra component in given directory
-    -H HOSTS     additional cluster host specifications of the form
+    -H HOSTS     host specifications for all available hosts of the form
                  NAMES:PARAMETERS (separated by commas)
     -o OPTION    override Isabelle system OPTION (via NAME=VAL or NAME)
     -p PORT      explicit web server port