merged
authorwenzelm
Sun, 12 Feb 2023 22:05:02 +0100
changeset 77262 9a60a2d19a4c
parent 77235 d19c45c7195b (current diff)
parent 77261 9dc3986721a3 (diff)
child 77263 27be31d7ad88
child 77265 bafdc56654cf
child 77273 f82317de6f28
child 77284 2bf321758333
merged
--- a/etc/build.props	Sun Feb 12 20:49:39 2023 +0000
+++ b/etc/build.props	Sun Feb 12 22:05:02 2023 +0100
@@ -186,6 +186,7 @@
   src/Pure/Tools/build.scala \
   src/Pure/Tools/build_docker.scala \
   src/Pure/Tools/build_job.scala \
+  src/Pure/Tools/build_process.scala \
   src/Pure/Tools/check_keywords.scala \
   src/Pure/Tools/debugger.scala \
   src/Pure/Tools/doc.scala \
--- a/src/Pure/PIDE/prover.scala	Sun Feb 12 20:49:39 2023 +0000
+++ b/src/Pure/PIDE/prover.scala	Sun Feb 12 22:05:02 2023 +0100
@@ -138,7 +138,7 @@
       terminate_process()
       process_result.join
       stdout.join
-      exit_message(Process_Result(127))
+      exit_message(Process_Result.startup_failure)
     }
     else {
       val (command_stream, message_stream) = channel.rendezvous()
--- a/src/Pure/PIDE/session.scala	Sun Feb 12 20:49:39 2023 +0000
+++ b/src/Pure/PIDE/session.scala	Sun Feb 12 22:05:02 2023 +0100
@@ -727,7 +727,7 @@
         {
           case Session.Startup | Session.Shutdown => None
           case Session.Terminated(_) => Some((false, phase))
-          case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0)))))
+          case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result.ok))))
           case Session.Ready => Some((true, post_phase(Session.Shutdown)))
         })
     if (was_ready) manager.send(Stop)
--- a/src/Pure/System/process_result.scala	Sun Feb 12 20:49:39 2023 +0000
+++ b/src/Pure/System/process_result.scala	Sun Feb 12 22:05:02 2023 +0100
@@ -10,15 +10,18 @@
   /* return code */
 
   object RC {
+    val undefined = -1
     val ok = 0
     val error = 1
     val failure = 2
+    val startup_failure = 127
     val interrupt = 130
     val timeout = 142
 
     private def text(rc: Int): String = {
       val txt =
         rc match {
+          case `undefined` => "UNDEFINED"
           case `ok` => "OK"
           case `error` => "ERROR"
           case `failure` => "FAILURE"
@@ -38,6 +41,11 @@
     def print_long(rc: Int): String = "Return code: " + rc + text(rc)
     def print(rc: Int): String = "return code " + rc + text(rc)
   }
+
+  val undefined: Process_Result = Process_Result(RC.undefined)
+  val ok: Process_Result = Process_Result(RC.ok)
+  val error: Process_Result = Process_Result(RC.error)
+  val startup_failure: Process_Result = Process_Result(RC.startup_failure)
 }
 
 final case class Process_Result(
@@ -59,6 +67,9 @@
   def ok: Boolean = rc == Process_Result.RC.ok
   def interrupted: Boolean = rc == Process_Result.RC.interrupt
 
+  def defined: Boolean = rc > Process_Result.RC.undefined
+  def strict: Process_Result = if (defined) this else copy(rc = Process_Result.RC.error)
+
   def timeout_rc: Process_Result = copy(rc = Process_Result.RC.timeout)
   def timeout: Boolean = rc == Process_Result.RC.timeout
 
--- a/src/Pure/Tools/build.scala	Sun Feb 12 20:49:39 2023 +0000
+++ b/src/Pure/Tools/build.scala	Sun Feb 12 22:05:02 2023 +0100
@@ -8,10 +8,6 @@
 package isabelle
 
 
-import scala.collection.immutable.SortedSet
-import scala.annotation.tailrec
-
-
 object Build {
   /** auxiliary **/
 
@@ -28,111 +24,6 @@
   }
 
 
-  /* queue with scheduling information */
-
-  private object Queue {
-    type Timings = (List[Properties.T], Double)
-
-    def load_timings(progress: Progress, store: Sessions.Store, session_name: String): Timings = {
-      val no_timings: Timings = (Nil, 0.0)
-
-      store.try_open_database(session_name) match {
-        case None => no_timings
-        case Some(db) =>
-          def ignore_error(msg: String) = {
-            progress.echo_warning("Ignoring bad database " + db +
-              " for session " + quote(session_name) + (if (msg == "") "" else ":\n" + msg))
-            no_timings
-          }
-          try {
-            val command_timings = store.read_command_timings(db, session_name)
-            val session_timing =
-              store.read_session_timing(db, session_name) match {
-                case Markup.Elapsed(t) => t
-                case _ => 0.0
-              }
-            (command_timings, session_timing)
-          }
-          catch {
-            case ERROR(msg) => ignore_error(msg)
-            case exn: java.lang.Error => ignore_error(Exn.message(exn))
-            case _: XML.Error => ignore_error("XML.Error")
-          }
-          finally { db.close() }
-      }
-    }
-
-    def make_session_timing(
-      sessions_structure: Sessions.Structure,
-      timing: Map[String, Double]
-    ) : Map[String, Double] = {
-      val maximals = sessions_structure.build_graph.maximals.toSet
-      def desc_timing(session_name: String): Double = {
-        if (maximals.contains(session_name)) timing(session_name)
-        else {
-          val descendants = sessions_structure.build_descendants(List(session_name)).toSet
-          val g = sessions_structure.build_graph.restrict(descendants)
-          (0.0 :: g.maximals.flatMap { desc =>
-            val ps = g.all_preds(List(desc))
-            if (ps.exists(p => !timing.isDefinedAt(p))) None
-            else Some(ps.map(timing(_)).sum)
-          }).max
-        }
-      }
-      timing.keySet.iterator.map(name => (name -> desc_timing(name))).toMap.withDefaultValue(0.0)
-    }
-
-    def apply(
-      progress: Progress,
-      sessions_structure: Sessions.Structure,
-      store: Sessions.Store
-    ) : Queue = {
-      val graph = sessions_structure.build_graph
-      val names = graph.keys
-
-      val timings = names.map(name => (name, load_timings(progress, store, name)))
-      val command_timings =
-        timings.map({ case (name, (ts, _)) => (name, ts) }).toMap.withDefaultValue(Nil)
-      val session_timing =
-        make_session_timing(sessions_structure,
-          timings.map({ case (name, (_, t)) => (name, t) }).toMap)
-
-      object Ordering extends scala.math.Ordering[String] {
-        def compare(name1: String, name2: String): Int =
-          session_timing(name2) compare session_timing(name1) match {
-            case 0 =>
-              sessions_structure(name2).timeout compare sessions_structure(name1).timeout match {
-                case 0 => name1 compare name2
-                case ord => ord
-              }
-            case ord => ord
-          }
-      }
-
-      new Queue(graph, SortedSet(names: _*)(Ordering), command_timings)
-    }
-  }
-
-  private class Queue(
-    graph: Graph[String, Sessions.Info],
-    order: SortedSet[String],
-    val command_timings: String => List[Properties.T]
-  ) {
-    def is_inner(name: String): Boolean = !graph.is_maximal(name)
-
-    def is_empty: Boolean = graph.is_empty
-
-    def - (name: String): Queue =
-      new Queue(graph.del_node(name), order - name, command_timings)
-
-    def dequeue(skip: String => Boolean): Option[(String, Sessions.Info)] = {
-      val it = order.iterator.dropWhile(name => skip(name) || !graph.is_minimal(name))
-      if (it.hasNext) { val name = it.next(); Some((name, graph.get_node(name))) }
-      else None
-    }
-  }
-
-
 
   /** build with results **/
 
@@ -140,17 +31,15 @@
     val store: Sessions.Store,
     val deps: Sessions.Deps,
     val sessions_ok: List[String],
-    results: Map[String, (Option[Process_Result], Sessions.Info)]
+    results: Map[String, Process_Result]
   ) {
     def cache: Term.Cache = store.cache
 
+    def info(name: String): Sessions.Info = deps.sessions_structure(name)
     def sessions: Set[String] = results.keySet
-    def cancelled(name: String): Boolean = results(name)._1.isEmpty
-    def info(name: String): Sessions.Info = results(name)._2
-    def apply(name: String): Process_Result = results(name)._1.getOrElse(Process_Result(1))
-    val rc: Int =
-      results.iterator.map({ case (_, (Some(r), _)) => r.rc case (_, (None, _)) => 1 }).
-        foldLeft(Process_Result.RC.ok)(_ max _)
+    def cancelled(name: String): Boolean = !results(name).defined
+    def apply(name: String): Process_Result = results(name).strict
+    val rc: Int = results.valuesIterator.map(_.strict.rc).foldLeft(Process_Result.RC.ok)(_ max _)
     def ok: Boolean = rc == Process_Result.RC.ok
 
     def unfinished: List[String] = sessions.iterator.filterNot(apply(_).ok).toList.sorted
@@ -158,16 +47,6 @@
     override def toString: String = rc.toString
   }
 
-  def session_finished(session_name: String, process_result: Process_Result): String =
-    "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
-
-  def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = {
-    val props = build_log.session_timing
-    val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
-    val timing = Markup.Timing_Properties.get(props)
-    "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")"
-  }
-
   def build(
     options: Options,
     selection: Sessions.Selection = Sessions.Selection.empty,
@@ -252,9 +131,9 @@
     }
 
 
-    /* main build process */
+    /* build process and results */
 
-    val queue = Queue(progress, build_deps.sessions_structure, store)
+    val build_context = Build_Process.Context(store, build_deps, progress = progress)
 
     store.prepare_output_dir()
 
@@ -268,187 +147,21 @@
       }
     }
 
-    // scheduler loop
-    case class Result(
-      current: Boolean,
-      output_heap: SHA1.Shasum,
-      process: Option[Process_Result],
-      info: Sessions.Info
-    ) {
-      def ok: Boolean =
-        process match {
-          case None => false
-          case Some(res) => res.ok
-        }
-    }
-
-    def sleep(): Unit =
-      Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
-        build_options.seconds("editor_input_delay").sleep()
-      }
-
-    val log =
-      build_options.string("system_log") match {
-        case "" => No_Logger
-        case "-" => Logger.make(progress)
-        case log_file => Logger.make(Some(Path.explode(log_file)))
-      }
-
-    val numa_nodes = new NUMA.Nodes(numa_shuffling)
-
-    @tailrec def loop(
-      pending: Queue,
-      running: Map[String, (SHA1.Shasum, Build_Job)],
-      results: Map[String, Result]
-    ): Map[String, Result] = {
-      def used_node(i: Int): Boolean =
-        running.iterator.exists(
-          { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i })
-
-      if (pending.is_empty) results
-      else {
-        if (progress.stopped) {
-          for ((_, (_, job)) <- running) job.terminate()
-        }
-
-        running.find({ case (_, (_, job)) => job.is_finished }) match {
-          case Some((session_name, (input_heaps, job))) =>
-            //{{{ finish job
-
-            val (process_result, output_heap) = job.join
-
-            val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
-            val process_result_tail = {
-              val tail = job.info.options.int("process_output_tail")
-              process_result.copy(
-                out_lines =
-                  "(more details via \"isabelle log -H Error " + session_name + "\")" ::
-                  (if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)))
-            }
-
-            val build_log =
-              Build_Log.Log_File(session_name, process_result.out_lines).
-                parse_session_info(
-                  command_timings = true,
-                  theory_timings = true,
-                  ml_statistics = true,
-                  task_statistics = true)
-
-            // write log file
-            if (process_result.ok) {
-              File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines))
-            }
-            else File.write(store.output_log(session_name), terminate_lines(log_lines))
-
-            // write database
-            using(store.open_database(session_name, output = true))(db =>
-              store.write_session_info(db, session_name, job.session_sources,
-                build_log =
-                  if (process_result.timeout) build_log.error("Timeout") else build_log,
-                build =
-                  Session_Info(build_deps.sources_shasum(session_name), input_heaps, output_heap,
-                    process_result.rc, UUID.random().toString)))
-
-            // messages
-            process_result.err_lines.foreach(progress.echo)
-
-            if (process_result.ok) {
-              if (verbose) progress.echo(session_timing(session_name, build_log))
-              progress.echo(session_finished(session_name, process_result))
-            }
-            else {
-              progress.echo(session_name + " FAILED")
-              if (!process_result.interrupted) progress.echo(process_result_tail.out)
-            }
-
-            loop(pending - session_name, running - session_name,
-              results +
-                (session_name -> Result(false, output_heap, Some(process_result_tail), job.info)))
-            //}}}
-          case None if running.size < (max_jobs max 1) =>
-            //{{{ check/start next job
-            pending.dequeue(running.isDefinedAt) match {
-              case Some((session_name, info)) =>
-                val ancestor_results =
-                  build_deps.sessions_structure.build_requirements(List(session_name)).
-                    filterNot(_ == session_name).map(results(_))
-                val input_heaps =
-                  if (ancestor_results.isEmpty) {
-                    SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE")))
-                  }
-                  else SHA1.flat_shasum(ancestor_results.map(_.output_heap))
-
-                val do_store =
-                  build_heap || Sessions.is_pure(session_name) || queue.is_inner(session_name)
-
-                val (current, output_heap) = {
-                  store.try_open_database(session_name) match {
-                    case Some(db) =>
-                      using(db)(store.read_build(_, session_name)) match {
-                        case Some(build) =>
-                          val output_heap = store.find_heap_shasum(session_name)
-                          val current =
-                            !fresh_build &&
-                            build.ok &&
-                            build.sources == build_deps.sources_shasum(session_name) &&
-                            build.input_heaps == input_heaps &&
-                            build.output_heap == output_heap &&
-                            !(do_store && output_heap.is_empty)
-                          (current, output_heap)
-                        case None => (false, SHA1.no_shasum)
-                      }
-                    case None => (false, SHA1.no_shasum)
-                  }
-                }
-                val all_current = current && ancestor_results.forall(_.current)
-
-                if (all_current) {
-                  loop(pending - session_name, running,
-                    results +
-                      (session_name -> Result(true, output_heap, Some(Process_Result(0)), info)))
-                }
-                else if (no_build) {
-                  progress.echo_if(verbose, "Skipping " + session_name + " ...")
-                  loop(pending - session_name, running,
-                    results +
-                      (session_name -> Result(false, output_heap, Some(Process_Result(1)), info)))
-                }
-                else if (ancestor_results.forall(_.ok) && !progress.stopped) {
-                  progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...")
-
-                  store.clean_output(session_name)
-                  using(store.open_database(session_name, output = true))(
-                    store.init_session_info(_, session_name))
-
-                  val numa_node = numa_nodes.next(used_node)
-                  val job =
-                    new Build_Job(progress, build_deps.background(session_name), store, do_store,
-                      log, session_setup, numa_node, queue.command_timings(session_name))
-                  loop(pending, running + (session_name -> (input_heaps, job)), results)
-                }
-                else {
-                  progress.echo(session_name + " CANCELLED")
-                  loop(pending - session_name, running,
-                    results + (session_name -> Result(false, output_heap, None, info)))
-                }
-              case None => sleep(); loop(pending, running, results)
-            }
-            ///}}}
-          case None => sleep(); loop(pending, running, results)
-        }
-      }
-    }
-
-
-    /* build results */
-
     val results = {
       val build_results =
         if (build_deps.is_empty) {
           progress.echo_warning("Nothing to build")
-          Map.empty[String, Result]
+          Map.empty[String, Build_Process.Result]
         }
-        else Isabelle_Thread.uninterruptible { loop(queue, Map.empty, Map.empty) }
+        else {
+          Isabelle_Thread.uninterruptible {
+            val build_process =
+              new Build_Process(build_context, build_heap = build_heap,
+                numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build,
+                no_build = no_build, verbose = verbose, session_setup = session_setup)
+            build_process.run()
+          }
+        }
 
       val sessions_ok: List[String] =
         (for {
@@ -459,7 +172,7 @@
 
       val results =
         (for ((name, result) <- build_results.iterator)
-          yield (name, (result.process, result.info))).toMap
+          yield (name, result.process_result)).toMap
 
       new Results(store, build_deps, sessions_ok, results)
     }
--- a/src/Pure/Tools/build_job.scala	Sun Feb 12 20:49:39 2023 +0000
+++ b/src/Pure/Tools/build_job.scala	Sun Feb 12 22:05:02 2023 +0100
@@ -238,11 +238,10 @@
 class Build_Job(progress: Progress,
   session_background: Sessions.Background,
   store: Sessions.Store,
-  do_store: Boolean,
-  log: Logger,
+  val do_store: Boolean,
+  resources: Resources,
   session_setup: (String, Session) => Unit,
-  val numa_node: Option[Int],
-  command_timings0: List[Properties.T]
+  val numa_node: Option[Int]
 ) {
   def session_name: String = session_background.session_name
   val info: Sessions.Info = session_background.sessions_structure(session_name)
@@ -291,9 +290,6 @@
               }
         }
 
-      val resources =
-        new Resources(session_background, log = log, command_timings = command_timings0)
-
       val session =
         new Session(options, resources) {
           override val cache: Term.Cache = store.cache
@@ -570,8 +566,8 @@
     else Some(Event_Timer.request(Time.now() + info.timeout) { terminate() })
   }
 
-  def join: (Process_Result, SHA1.Shasum) = {
-    val result1 = future_result.join
+  def join: Process_Result = {
+    val result = future_result.join
 
     val was_timeout =
       timeout_request match {
@@ -579,18 +575,9 @@
         case Some(request) => !request.cancel()
       }
 
-    val result2 =
-      if (result1.ok) result1
-      else if (was_timeout) result1.error(Output.error_message_text("Timeout")).timeout_rc
-      else if (result1.interrupted) result1.error(Output.error_message_text("Interrupt"))
-      else result1
-
-    val heap_shasum =
-      if (result2.ok && do_store && store.output_heap(session_name).is_file) {
-        SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name)
-      }
-      else SHA1.no_shasum
-
-    (result2, heap_shasum)
+    if (result.ok) result
+    else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc
+    else if (result.interrupted) result.error(Output.error_message_text("Interrupt"))
+    else result
   }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Tools/build_process.scala	Sun Feb 12 22:05:02 2023 +0100
@@ -0,0 +1,350 @@
+/*  Title:      Pure/Tools/build_process.scala
+    Author:     Makarius
+
+Build process for sessions, with build database, optional heap, and
+optional presentation.
+*/
+
+package isabelle
+
+
+import scala.math.Ordering
+import scala.collection.immutable.SortedSet
+import scala.annotation.tailrec
+
+
+object Build_Process {
+  /* static information */
+
+  object Session_Context {
+    def empty(session: String, timeout: Time): Session_Context =
+      new Session_Context(session, timeout, Time.zero, Nil)
+
+    def apply(
+      session: String,
+      timeout: Time,
+      store: Sessions.Store,
+      progress: Progress = new Progress
+    ): Session_Context = {
+      store.try_open_database(session) match {
+        case None => empty(session, timeout)
+        case Some(db) =>
+          def ignore_error(msg: String) = {
+            progress.echo_warning("Ignoring bad database " + db +
+              " for session " + quote(session) + (if (msg == "") "" else ":\n" + msg))
+            empty(session, timeout)
+          }
+          try {
+            val command_timings = store.read_command_timings(db, session)
+            val elapsed =
+              store.read_session_timing(db, session) match {
+                case Markup.Elapsed(s) => Time.seconds(s)
+                case _ => Time.zero
+              }
+            new Session_Context(session, timeout, elapsed, command_timings)
+          }
+          catch {
+            case ERROR(msg) => ignore_error(msg)
+            case exn: java.lang.Error => ignore_error(Exn.message(exn))
+            case _: XML.Error => ignore_error("XML.Error")
+          }
+          finally { db.close() }
+      }
+    }
+  }
+
+  final class Session_Context(
+    val session: String,
+    val timeout: Time,
+    val old_time: Time,
+    val old_command_timings: List[Properties.T]
+  ) {
+    def is_empty: Boolean = old_time.is_zero && old_command_timings.isEmpty
+
+    override def toString: String = session
+  }
+
+  object Context {
+    def apply(
+      store: Sessions.Store,
+      deps: Sessions.Deps,
+      progress: Progress = new Progress
+    ): Context = {
+      val sessions_structure = deps.sessions_structure
+      val build_graph = sessions_structure.build_graph
+
+      val sessions =
+        Map.from(
+          for (name <- build_graph.keys_iterator)
+          yield {
+            val timeout = sessions_structure(name).timeout
+            name -> Build_Process.Session_Context(name, timeout, store, progress = progress)
+          })
+
+      val sessions_time = {
+        val maximals = build_graph.maximals.toSet
+        def descendants_time(name: String): Double = {
+          if (maximals.contains(name)) sessions(name).old_time.seconds
+          else {
+            val descendants = build_graph.all_succs(List(name)).toSet
+            val g = build_graph.restrict(descendants)
+            (0.0 :: g.maximals.flatMap { desc =>
+              val ps = g.all_preds(List(desc))
+              if (ps.exists(p => !sessions.isDefinedAt(p))) None
+              else Some(ps.map(p => sessions(p).old_time.seconds).sum)
+            }).max
+          }
+        }
+        Map.from(
+          for (name <- sessions.keysIterator)
+          yield name -> descendants_time(name)).withDefaultValue(0.0)
+      }
+
+      val ordering =
+        new Ordering[String] {
+          def compare(name1: String, name2: String): Int =
+            sessions_time(name2) compare sessions_time(name1) match {
+              case 0 =>
+                sessions(name2).timeout compare sessions(name1).timeout match {
+                  case 0 => name1 compare name2
+                  case ord => ord
+                }
+              case ord => ord
+            }
+        }
+
+      new Context(store, deps, sessions, ordering, progress)
+    }
+  }
+
+  final class Context private(
+    val store: Sessions.Store,
+    val deps: Sessions.Deps,
+    sessions: Map[String, Session_Context],
+    val ordering: Ordering[String],
+    val progress: Progress
+  ) {
+    def sessions_structure: Sessions.Structure = deps.sessions_structure
+
+    def apply(session: String): Session_Context =
+      sessions.getOrElse(session, Session_Context.empty(session, Time.zero))
+
+    def build_heap(session: String): Boolean =
+      Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session)
+  }
+
+
+  /* main */
+
+  case class Result(
+    current: Boolean,
+    output_heap: SHA1.Shasum,
+    process_result: Process_Result
+  ) {
+    def ok: Boolean = process_result.ok
+  }
+}
+
+class Build_Process(
+  build_context: Build_Process.Context,
+  build_heap: Boolean = false,
+  numa_shuffling: Boolean = false,
+  max_jobs: Int = 1,
+  fresh_build: Boolean = false,
+  no_build: Boolean = false,
+  verbose: Boolean = false,
+  session_setup: (String, Session) => Unit = (_, _) => ()
+) {
+  private val store = build_context.store
+  private val build_options = store.options
+  private val build_deps = build_context.deps
+  private val progress = build_context.progress
+
+  // global state
+  private val numa_nodes = new NUMA.Nodes(numa_shuffling)
+  private var build_graph = build_context.sessions_structure.build_graph
+  private var build_order = SortedSet.from(build_graph.keys)(build_context.ordering)
+  private var running = Map.empty[String, (SHA1.Shasum, Build_Job)]
+  private var results = Map.empty[String, Build_Process.Result]
+
+  private val log =
+    build_options.string("system_log") match {
+      case "" => No_Logger
+      case "-" => Logger.make(progress)
+      case log_file => Logger.make(Some(Path.explode(log_file)))
+    }
+
+  private def remove_pending(name: String): Unit = {
+    build_graph = build_graph.del_node(name)
+    build_order = build_order - name
+  }
+
+  private def next_pending(): Option[String] =
+    build_order.iterator
+      .dropWhile(name => running.isDefinedAt(name) || !build_graph.is_minimal(name))
+      .nextOption()
+
+  private def used_node(i: Int): Boolean =
+    running.iterator.exists(
+      { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i })
+
+  private def session_finished(session_name: String, process_result: Process_Result): String =
+    "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
+
+  private def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = {
+    val props = build_log.session_timing
+    val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
+    val timing = Markup.Timing_Properties.get(props)
+    "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")"
+  }
+
+  private def finish_job(session_name: String, input_heaps: SHA1.Shasum, job: Build_Job): Unit = {
+    val process_result = job.join
+
+    val output_heap =
+      if (process_result.ok && job.do_store && store.output_heap(session_name).is_file) {
+        SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name)
+      }
+      else SHA1.no_shasum
+
+    val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
+    val process_result_tail = {
+      val tail = job.info.options.int("process_output_tail")
+      process_result.copy(
+        out_lines =
+          "(more details via \"isabelle log -H Error " + session_name + "\")" ::
+          (if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)))
+    }
+
+    val build_log =
+      Build_Log.Log_File(session_name, process_result.out_lines).
+        parse_session_info(
+          command_timings = true,
+          theory_timings = true,
+          ml_statistics = true,
+          task_statistics = true)
+
+    // write log file
+    if (process_result.ok) {
+      File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines))
+    }
+    else File.write(store.output_log(session_name), terminate_lines(log_lines))
+
+    // write database
+    using(store.open_database(session_name, output = true))(db =>
+      store.write_session_info(db, session_name, job.session_sources,
+        build_log =
+          if (process_result.timeout) build_log.error("Timeout") else build_log,
+        build =
+          Build.Session_Info(build_deps.sources_shasum(session_name), input_heaps,
+            output_heap, process_result.rc, UUID.random().toString)))
+
+    // messages
+    process_result.err_lines.foreach(progress.echo)
+
+    if (process_result.ok) {
+      if (verbose) progress.echo(session_timing(session_name, build_log))
+      progress.echo(session_finished(session_name, process_result))
+    }
+    else {
+      progress.echo(session_name + " FAILED")
+      if (!process_result.interrupted) progress.echo(process_result_tail.out)
+    }
+
+    remove_pending(session_name)
+    running -= session_name
+    results += (session_name -> Build_Process.Result(false, output_heap, process_result_tail))
+  }
+
+  private def start_job(session_name: String): Unit = {
+    val ancestor_results =
+      build_deps.sessions_structure.build_requirements(List(session_name)).
+        filterNot(_ == session_name).map(results(_))
+    val input_heaps =
+      if (ancestor_results.isEmpty) {
+        SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE")))
+      }
+      else SHA1.flat_shasum(ancestor_results.map(_.output_heap))
+
+    val do_store = build_heap || build_context.build_heap(session_name)
+    val (current, output_heap) = {
+      store.try_open_database(session_name) match {
+        case Some(db) =>
+          using(db)(store.read_build(_, session_name)) match {
+            case Some(build) =>
+              val output_heap = store.find_heap_shasum(session_name)
+              val current =
+                !fresh_build &&
+                build.ok &&
+                build.sources == build_deps.sources_shasum(session_name) &&
+                build.input_heaps == input_heaps &&
+                build.output_heap == output_heap &&
+                !(do_store && output_heap.is_empty)
+              (current, output_heap)
+            case None => (false, SHA1.no_shasum)
+          }
+        case None => (false, SHA1.no_shasum)
+      }
+    }
+    val all_current = current && ancestor_results.forall(_.current)
+
+    if (all_current) {
+      remove_pending(session_name)
+      results += (session_name -> Build_Process.Result(true, output_heap, Process_Result.ok))
+    }
+    else if (no_build) {
+      progress.echo_if(verbose, "Skipping " + session_name + " ...")
+      remove_pending(session_name)
+      results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.error))
+    }
+    else if (ancestor_results.forall(_.ok) && !progress.stopped) {
+      progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...")
+
+      store.clean_output(session_name)
+      using(store.open_database(session_name, output = true))(
+        store.init_session_info(_, session_name))
+
+      val session_background = build_deps.background(session_name)
+      val resources =
+        new Resources(session_background, log = log,
+          command_timings = build_context(session_name).old_command_timings)
+
+      val numa_node = numa_nodes.next(used_node)
+      val job =
+        new Build_Job(progress, session_background, store, do_store,
+          resources, session_setup, numa_node)
+      running += (session_name -> (input_heaps, job))
+    }
+    else {
+      progress.echo(session_name + " CANCELLED")
+      remove_pending(session_name)
+      results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.undefined))
+    }
+  }
+
+  private def sleep(): Unit =
+    Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
+      build_options.seconds("editor_input_delay").sleep()
+    }
+
+  def run(): Map[String, Build_Process.Result] = {
+    while (!build_graph.is_empty) {
+      if (progress.stopped) {
+        for ((_, (_, job)) <- running) job.terminate()
+      }
+
+      running.find({ case (_, (_, job)) => job.is_finished }) match {
+        case Some((session_name, (input_heaps, job))) =>
+          finish_job(session_name, input_heaps, job)
+        case None if running.size < (max_jobs max 1) =>
+          next_pending() match {
+            case Some(session_name) => start_job(session_name)
+            case None => sleep()
+          }
+        case None => sleep()
+      }
+    }
+
+    results
+  }
+}