merged
authorwenzelm
Sun, 19 Feb 2023 13:51:49 +0100
changeset 77302 632a92fcb673
parent 77283 98dab34ed72d (current diff)
parent 77301 c6d724692603 (diff)
child 77304 aea11797247b
child 77310 6754b5eceb12
merged
--- a/src/Pure/General/logger.scala	Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/General/logger.scala	Sun Feb 19 13:51:49 2023 +0100
@@ -35,7 +35,8 @@
 }
 
 class System_Logger extends Logger {
-  def apply(msg: => String): Unit =
+  def apply(msg: => String): Unit = synchronized {
     if (Platform.is_windows) System.out.println(msg)
     else System.console.writer.println(msg)
+  }
 }
--- a/src/Pure/PIDE/document.scala	Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/PIDE/document.scala	Sun Feb 19 13:51:49 2023 +0100
@@ -371,14 +371,16 @@
 
   object Nodes {
     val empty: Nodes = new Nodes(Graph.empty(Node.Name.Ordering))
+
+    private def init(graph: Graph[Node.Name, Node], name: Node.Name): Graph[Node.Name, Node] =
+      graph.default_node(name, Node.empty)
   }
 
   final class Nodes private(graph: Graph[Node.Name, Node]) {
-    def apply(name: Node.Name): Node =
-      graph.default_node(name, Node.empty).get_node(name)
+    def apply(name: Node.Name): Node = Nodes.init(graph, name).get_node(name)
 
     def is_suppressed(name: Node.Name): Boolean = {
-      val graph1 = graph.default_node(name, Node.empty)
+      val graph1 = Nodes.init(graph, name)
       graph1.is_maximal(name) && graph1.get_node(name).is_empty
     }
 
@@ -391,10 +393,7 @@
     def + (entry: (Node.Name, Node)): Nodes = {
       val (name, node) = entry
       val imports = node.header.imports
-      val graph1 =
-        imports.foldLeft(graph.default_node(name, Node.empty)) {
-          case (g, p) => g.default_node(p, Node.empty)
-        }
+      val graph1 = (name :: imports).foldLeft(graph)(Nodes.init)
       val graph2 =
         graph1.imm_preds(name).foldLeft(graph1) { case (g, dep) => g.del_edge(dep, name) }
       val graph3 = imports.foldLeft(graph2) { case (g, dep) => g.add_edge(dep, name) }
@@ -417,8 +416,8 @@
         if name == file_name
       } yield cmd).toList
 
-    def descendants(names: List[Node.Name]): List[Node.Name] = graph.all_succs(names)
-    def requirements(names: List[Node.Name]): List[Node.Name] = graph.all_preds_rev(names)
+    def descendants(names: List[Node.Name]): List[Node.Name] =
+      names.foldLeft(graph)(Nodes.init).all_succs(names)
     def topological_order: List[Node.Name] = graph.topological_order
 
     override def toString: String = topological_order.mkString("Nodes(", ",", ")")
--- a/src/Pure/Thy/sessions.scala	Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/Thy/sessions.scala	Sun Feb 19 13:51:49 2023 +0100
@@ -1331,6 +1331,18 @@
 
   /** persistent store **/
 
+  /** auxiliary **/
+
+  sealed case class Build_Info(
+    sources: SHA1.Shasum,
+    input_heaps: SHA1.Shasum,
+    output_heap: SHA1.Shasum,
+    return_code: Int,
+    uuid: String
+  ) {
+    def ok: Boolean = return_code == 0
+  }
+
   object Session_Info {
     val session_name = SQL.Column.string("session_name").make_primary_key
 
@@ -1345,7 +1357,7 @@
       List(session_name, session_timing, command_timings, theory_timings,
         ml_statistics, task_statistics, errors)
 
-    // Build.Session_Info
+    // Build_Info
     val sources = SQL.Column.string("sources")
     val input_heaps = SQL.Column.string("input_heaps")
     val output_heap = SQL.Column.string("output_heap")
@@ -1553,7 +1565,7 @@
       session_name: String,
       sources: Sources,
       build_log: Build_Log.Session_Info,
-      build: Build.Session_Info
+      build: Build_Info
     ): Unit = {
       db.transaction {
         write_sources(db, session_name, sources)
@@ -1596,7 +1608,7 @@
     def read_errors(db: SQL.Database, name: String): List[String] =
       Build_Log.uncompress_errors(read_bytes(db, name, Session_Info.errors), cache = cache)
 
-    def read_build(db: SQL.Database, name: String): Option[Build.Session_Info] = {
+    def read_build(db: SQL.Database, name: String): Option[Build_Info] = {
       if (db.tables.contains(Session_Info.table.name)) {
         db.using_statement(Session_Info.table.select(Nil,
           Session_Info.session_name.where_equal(name))) { stmt =>
@@ -1607,7 +1619,7 @@
               try { Option(res.string(Session_Info.uuid)).getOrElse("") }
               catch { case _: SQLException => "" }
             Some(
-              Build.Session_Info(
+              Build_Info(
                 SHA1.fake_shasum(res.string(Session_Info.sources)),
                 SHA1.fake_shasum(res.string(Session_Info.input_heaps)),
                 SHA1.fake_shasum(res.string(Session_Info.output_heap)),
--- a/src/Pure/Tools/build.scala	Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/Tools/build.scala	Sun Feb 19 13:51:49 2023 +0100
@@ -9,22 +9,6 @@
 
 
 object Build {
-  /** auxiliary **/
-
-  /* persistent build info */
-
-  sealed case class Session_Info(
-    sources: SHA1.Shasum,
-    input_heaps: SHA1.Shasum,
-    output_heap: SHA1.Shasum,
-    return_code: Int,
-    uuid: String
-  ) {
-    def ok: Boolean = return_code == 0
-  }
-
-
-
   /** build with results **/
 
   class Results private[Build](
--- a/src/Pure/Tools/build_job.scala	Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/Tools/build_job.scala	Sun Feb 19 13:51:49 2023 +0100
@@ -11,7 +11,373 @@
 import scala.util.matching.Regex
 
 
+trait Build_Job {
+  def session_name: String
+  def numa_node: Option[Int] = None
+  def start(): Unit = ()
+  def terminate(): Unit = ()
+  def is_finished: Boolean = false
+  def join: Process_Result = Process_Result.undefined
+}
+
 object Build_Job {
+  class Build_Session(progress: Progress,
+    session_background: Sessions.Background,
+    store: Sessions.Store,
+    val do_store: Boolean,
+    resources: Resources,
+    session_setup: (String, Session) => Unit,
+    val input_heaps: SHA1.Shasum,
+    override val numa_node: Option[Int]
+  ) extends Build_Job {
+    def session_name: String = session_background.session_name
+    val info: Sessions.Info = session_background.sessions_structure(session_name)
+    val options: Options = NUMA.policy_options(info.options, numa_node)
+
+    val session_sources: Sessions.Sources =
+      Sessions.Sources.load(session_background.base, cache = store.cache.compress)
+
+    private lazy val future_result: Future[Process_Result] =
+      Future.thread("build", uninterruptible = true) {
+        val parent = info.parent.getOrElse("")
+
+        val env =
+          Isabelle_System.settings(
+            List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
+
+        val is_pure = Sessions.is_pure(session_name)
+
+        val use_prelude = if (is_pure) Thy_Header.ml_roots.map(_._1) else Nil
+
+        val eval_store =
+          if (do_store) {
+            (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
+            List("ML_Heap.save_child " +
+              ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
+          }
+          else Nil
+
+        def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
+          session_background.base.theory_load_commands.get(node_name.theory) match {
+            case None => Nil
+            case Some(spans) =>
+              val syntax = session_background.base.theory_syntax(node_name)
+              val master_dir = Path.explode(node_name.master_dir)
+              for (span <- spans; file <- span.loaded_files(syntax).files)
+                yield {
+                  val src_path = Path.explode(file)
+                  val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
+
+                  val bytes = session_sources(blob_name.node).bytes
+                  val text = bytes.text
+                  val chunk = Symbol.Text_Chunk(text)
+
+                  Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
+                    Document.Blobs.Item(bytes, text, chunk, changed = false)
+                }
+          }
+
+        val session =
+          new Session(options, resources) {
+            override val cache: Term.Cache = store.cache
+
+            override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
+              Command.Blobs_Info.make(session_blobs(node_name))
+
+            override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
+              Document.Blobs.make(session_blobs(node_name))
+          }
+
+        object Build_Session_Errors {
+          private val promise: Promise[List[String]] = Future.promise
+
+          def result: Exn.Result[List[String]] = promise.join_result
+          def cancel(): Unit = promise.cancel()
+          def apply(errs: List[String]): Unit = {
+            try { promise.fulfill(errs) }
+            catch { case _: IllegalStateException => }
+          }
+        }
+
+        val export_consumer =
+          Export.consumer(store.open_database(session_name, output = true), store.cache,
+            progress = progress)
+
+        val stdout = new StringBuilder(1000)
+        val stderr = new StringBuilder(1000)
+        val command_timings = new mutable.ListBuffer[Properties.T]
+        val theory_timings = new mutable.ListBuffer[Properties.T]
+        val session_timings = new mutable.ListBuffer[Properties.T]
+        val runtime_statistics = new mutable.ListBuffer[Properties.T]
+        val task_statistics = new mutable.ListBuffer[Properties.T]
+
+        def fun(
+          name: String,
+          acc: mutable.ListBuffer[Properties.T],
+          unapply: Properties.T => Option[Properties.T]
+        ): (String, Session.Protocol_Function) = {
+          name -> ((msg: Prover.Protocol_Output) =>
+            unapply(msg.properties) match {
+              case Some(props) => acc += props; true
+              case _ => false
+            })
+        }
+
+        session.init_protocol_handler(new Session.Protocol_Handler {
+            override def exit(): Unit = Build_Session_Errors.cancel()
+
+            private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
+              val (rc, errors) =
+                try {
+                  val (rc, errs) = {
+                    import XML.Decode._
+                    pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
+                  }
+                  val errors =
+                    for (err <- errs) yield {
+                      val prt = Protocol_Message.expose_no_reports(err)
+                      Pretty.string_of(prt, metric = Symbol.Metric)
+                    }
+                  (rc, errors)
+                }
+                catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
+
+              session.protocol_command("Prover.stop", rc.toString)
+              Build_Session_Errors(errors)
+              true
+            }
+
+            private def loading_theory(msg: Prover.Protocol_Output): Boolean =
+              msg.properties match {
+                case Markup.Loading_Theory(Markup.Name(name)) =>
+                  progress.theory(Progress.Theory(name, session = session_name))
+                  false
+                case _ => false
+              }
+
+            private def export_(msg: Prover.Protocol_Output): Boolean =
+              msg.properties match {
+                case Protocol.Export(args) =>
+                  export_consumer.make_entry(session_name, args, msg.chunk)
+                  true
+                case _ => false
+              }
+
+            override val functions: Session.Protocol_Functions =
+              List(
+                Markup.Build_Session_Finished.name -> build_session_finished,
+                Markup.Loading_Theory.name -> loading_theory,
+                Markup.EXPORT -> export_,
+                fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
+                fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
+                fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
+          })
+
+        session.command_timings += Session.Consumer("command_timings") {
+          case Session.Command_Timing(props) =>
+            for {
+              elapsed <- Markup.Elapsed.unapply(props)
+              elapsed_time = Time.seconds(elapsed)
+              if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
+            } command_timings += props.filter(Markup.command_timing_property)
+        }
+
+        session.runtime_statistics += Session.Consumer("ML_statistics") {
+          case Session.Runtime_Statistics(props) => runtime_statistics += props
+        }
+
+        session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
+          case snapshot =>
+            if (!progress.stopped) {
+              def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
+                if (!progress.stopped) {
+                  val theory_name = snapshot.node_name.theory
+                  val args =
+                    Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
+                  val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
+                  export_consumer.make_entry(session_name, args, body)
+                }
+              }
+              def export_text(name: String, text: String, compress: Boolean = true): Unit =
+                export_(name, List(XML.Text(text)), compress = compress)
+
+              for (command <- snapshot.snippet_command) {
+                export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
+              }
+
+              export_text(Export.FILES,
+                cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
+                compress = false)
+
+              for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
+                val xml = snapshot.switch(blob_name).xml_markup()
+                export_(Export.MARKUP + (i + 1), xml)
+              }
+              export_(Export.MARKUP, snapshot.xml_markup())
+              export_(Export.MESSAGES, snapshot.messages.map(_._1))
+            }
+        }
+
+        session.all_messages += Session.Consumer[Any]("build_session_output") {
+          case msg: Prover.Output =>
+            val message = msg.message
+            if (msg.is_system) resources.log(Protocol.message_text(message))
+
+            if (msg.is_stdout) {
+              stdout ++= Symbol.encode(XML.content(message))
+            }
+            else if (msg.is_stderr) {
+              stderr ++= Symbol.encode(XML.content(message))
+            }
+            else if (msg.is_exit) {
+              val err =
+                "Prover terminated" +
+                  (msg.properties match {
+                    case Markup.Process_Result(result) => ": " + result.print_rc
+                    case _ => ""
+                  })
+              Build_Session_Errors(List(err))
+            }
+          case _ =>
+        }
+
+        session_setup(session_name, session)
+
+        val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
+
+        val process =
+          Isabelle_Process.start(store, options, session, session_background,
+            logic = parent, raw_ml_system = is_pure,
+            use_prelude = use_prelude, eval_main = eval_main,
+            cwd = info.dir.file, env = env)
+
+        val build_errors =
+          Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
+            Exn.capture { process.await_startup() } match {
+              case Exn.Res(_) =>
+                val resources_yxml = resources.init_session_yxml
+                val encode_options: XML.Encode.T[Options] =
+                  options => session.prover_options(options).encode
+                val args_yxml =
+                  YXML.string_of_body(
+                    {
+                      import XML.Encode._
+                      pair(string, list(pair(encode_options, list(pair(string, properties)))))(
+                        (session_name, info.theories))
+                    })
+                session.protocol_command("build_session", resources_yxml, args_yxml)
+                Build_Session_Errors.result
+              case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
+            }
+          }
+
+        val process_result =
+          Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
+
+        session.stop()
+
+        val export_errors =
+          export_consumer.shutdown(close = true).map(Output.error_message_text)
+
+        val (document_output, document_errors) =
+          try {
+            if (build_errors.isInstanceOf[Exn.Res[_]] && process_result.ok && info.documents.nonEmpty) {
+              using(Export.open_database_context(store)) { database_context =>
+                val documents =
+                  using(database_context.open_session(session_background)) {
+                    session_context =>
+                      Document_Build.build_documents(
+                        Document_Build.context(session_context, progress = progress),
+                        output_sources = info.document_output,
+                        output_pdf = info.document_output)
+                  }
+                using(database_context.open_database(session_name, output = true))(session_database =>
+                  documents.foreach(_.write(session_database.db, session_name)))
+                (documents.flatMap(_.log_lines), Nil)
+              }
+            }
+            else (Nil, Nil)
+          }
+          catch {
+            case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
+            case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
+          }
+
+        val result = {
+          val theory_timing =
+            theory_timings.iterator.flatMap(
+              {
+                case props @ Markup.Name(name) => Some(name -> props)
+                case _ => None
+              }).toMap
+          val used_theory_timings =
+            for { (name, _) <- session_background.base.used_theories }
+              yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
+
+          val more_output =
+            Library.trim_line(stdout.toString) ::
+              command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
+              used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
+              session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
+              runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
+              task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
+              document_output
+
+          process_result.output(more_output)
+            .error(Library.trim_line(stderr.toString))
+            .errors_rc(export_errors ::: document_errors)
+        }
+
+        build_errors match {
+          case Exn.Res(build_errs) =>
+            val errs = build_errs ::: document_errors
+            if (errs.nonEmpty) {
+              result.error_rc.output(
+                errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
+                  errs.map(Protocol.Error_Message_Marker.apply))
+            }
+            else if (progress.stopped && result.ok) result.copy(rc = Process_Result.RC.interrupt)
+            else result
+          case Exn.Exn(Exn.Interrupt()) =>
+            if (result.ok) result.copy(rc = Process_Result.RC.interrupt)
+            else result
+          case Exn.Exn(exn) => throw exn
+        }
+      }
+
+    override def start(): Unit = future_result
+    override def terminate(): Unit = future_result.cancel()
+    override def is_finished: Boolean = future_result.is_finished
+
+    private val timeout_request: Option[Event_Timer.Request] = {
+      if (info.timeout_ignored) None
+      else Some(Event_Timer.request(Time.now() + info.timeout) { terminate() })
+    }
+
+    override def join: Process_Result = {
+      val result = future_result.join
+
+      val was_timeout =
+        timeout_request match {
+          case None => false
+          case Some(request) => !request.cancel()
+        }
+
+      if (result.ok) result
+      else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc
+      else if (result.interrupted) result.error(Output.error_message_text("Interrupt"))
+      else result
+    }
+
+    lazy val finish: SHA1.Shasum = {
+      require(is_finished, "Build job not finished: " + quote(session_name))
+      if (join.ok && do_store && store.output_heap(session_name).is_file) {
+        SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name)
+      }
+      else SHA1.no_shasum
+    }
+  }
+
   /* theory markup/messages from session database */
 
   def read_theory(
@@ -234,350 +600,3 @@
       }
     })
 }
-
-class Build_Job(progress: Progress,
-  session_background: Sessions.Background,
-  store: Sessions.Store,
-  val do_store: Boolean,
-  resources: Resources,
-  session_setup: (String, Session) => Unit,
-  val numa_node: Option[Int]
-) {
-  def session_name: String = session_background.session_name
-  val info: Sessions.Info = session_background.sessions_structure(session_name)
-  val options: Options = NUMA.policy_options(info.options, numa_node)
-
-  val session_sources: Sessions.Sources =
-    Sessions.Sources.load(session_background.base, cache = store.cache.compress)
-
-  private val future_result: Future[Process_Result] =
-    Future.thread("build", uninterruptible = true) {
-      val parent = info.parent.getOrElse("")
-
-      val env =
-        Isabelle_System.settings(
-          List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
-
-      val is_pure = Sessions.is_pure(session_name)
-
-      val use_prelude = if (is_pure) Thy_Header.ml_roots.map(_._1) else Nil
-
-      val eval_store =
-        if (do_store) {
-          (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
-          List("ML_Heap.save_child " +
-            ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
-        }
-        else Nil
-
-      def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
-        session_background.base.theory_load_commands.get(node_name.theory) match {
-          case None => Nil
-          case Some(spans) =>
-            val syntax = session_background.base.theory_syntax(node_name)
-            val master_dir = Path.explode(node_name.master_dir)
-            for (span <- spans; file <- span.loaded_files(syntax).files)
-              yield {
-                val src_path = Path.explode(file)
-                val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
-
-                val bytes = session_sources(blob_name.node).bytes
-                val text = bytes.text
-                val chunk = Symbol.Text_Chunk(text)
-
-                Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
-                  Document.Blobs.Item(bytes, text, chunk, changed = false)
-              }
-        }
-
-      val session =
-        new Session(options, resources) {
-          override val cache: Term.Cache = store.cache
-
-          override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
-            Command.Blobs_Info.make(session_blobs(node_name))
-
-          override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
-            Document.Blobs.make(session_blobs(node_name))
-        }
-
-      object Build_Session_Errors {
-        private val promise: Promise[List[String]] = Future.promise
-
-        def result: Exn.Result[List[String]] = promise.join_result
-        def cancel(): Unit = promise.cancel()
-        def apply(errs: List[String]): Unit = {
-          try { promise.fulfill(errs) }
-          catch { case _: IllegalStateException => }
-        }
-      }
-
-      val export_consumer =
-        Export.consumer(store.open_database(session_name, output = true), store.cache,
-          progress = progress)
-
-      val stdout = new StringBuilder(1000)
-      val stderr = new StringBuilder(1000)
-      val command_timings = new mutable.ListBuffer[Properties.T]
-      val theory_timings = new mutable.ListBuffer[Properties.T]
-      val session_timings = new mutable.ListBuffer[Properties.T]
-      val runtime_statistics = new mutable.ListBuffer[Properties.T]
-      val task_statistics = new mutable.ListBuffer[Properties.T]
-
-      def fun(
-        name: String,
-        acc: mutable.ListBuffer[Properties.T],
-        unapply: Properties.T => Option[Properties.T]
-      ): (String, Session.Protocol_Function) = {
-        name -> ((msg: Prover.Protocol_Output) =>
-          unapply(msg.properties) match {
-            case Some(props) => acc += props; true
-            case _ => false
-          })
-      }
-
-      session.init_protocol_handler(new Session.Protocol_Handler {
-          override def exit(): Unit = Build_Session_Errors.cancel()
-
-          private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
-            val (rc, errors) =
-              try {
-                val (rc, errs) = {
-                  import XML.Decode._
-                  pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
-                }
-                val errors =
-                  for (err <- errs) yield {
-                    val prt = Protocol_Message.expose_no_reports(err)
-                    Pretty.string_of(prt, metric = Symbol.Metric)
-                  }
-                (rc, errors)
-              }
-              catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
-
-            session.protocol_command("Prover.stop", rc.toString)
-            Build_Session_Errors(errors)
-            true
-          }
-
-          private def loading_theory(msg: Prover.Protocol_Output): Boolean =
-            msg.properties match {
-              case Markup.Loading_Theory(Markup.Name(name)) =>
-                progress.theory(Progress.Theory(name, session = session_name))
-                false
-              case _ => false
-            }
-
-          private def export_(msg: Prover.Protocol_Output): Boolean =
-            msg.properties match {
-              case Protocol.Export(args) =>
-                export_consumer.make_entry(session_name, args, msg.chunk)
-                true
-              case _ => false
-            }
-
-          override val functions: Session.Protocol_Functions =
-            List(
-              Markup.Build_Session_Finished.name -> build_session_finished,
-              Markup.Loading_Theory.name -> loading_theory,
-              Markup.EXPORT -> export_,
-              fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
-              fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
-              fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
-        })
-
-      session.command_timings += Session.Consumer("command_timings") {
-        case Session.Command_Timing(props) =>
-          for {
-            elapsed <- Markup.Elapsed.unapply(props)
-            elapsed_time = Time.seconds(elapsed)
-            if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
-          } command_timings += props.filter(Markup.command_timing_property)
-      }
-
-      session.runtime_statistics += Session.Consumer("ML_statistics") {
-        case Session.Runtime_Statistics(props) => runtime_statistics += props
-      }
-
-      session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
-        case snapshot =>
-          if (!progress.stopped) {
-            def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
-              if (!progress.stopped) {
-                val theory_name = snapshot.node_name.theory
-                val args =
-                  Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
-                val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
-                export_consumer.make_entry(session_name, args, body)
-              }
-            }
-            def export_text(name: String, text: String, compress: Boolean = true): Unit =
-              export_(name, List(XML.Text(text)), compress = compress)
-
-            for (command <- snapshot.snippet_command) {
-              export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
-            }
-
-            export_text(Export.FILES,
-              cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
-              compress = false)
-
-            for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
-              val xml = snapshot.switch(blob_name).xml_markup()
-              export_(Export.MARKUP + (i + 1), xml)
-            }
-            export_(Export.MARKUP, snapshot.xml_markup())
-            export_(Export.MESSAGES, snapshot.messages.map(_._1))
-          }
-      }
-
-      session.all_messages += Session.Consumer[Any]("build_session_output") {
-        case msg: Prover.Output =>
-          val message = msg.message
-          if (msg.is_system) resources.log(Protocol.message_text(message))
-
-          if (msg.is_stdout) {
-            stdout ++= Symbol.encode(XML.content(message))
-          }
-          else if (msg.is_stderr) {
-            stderr ++= Symbol.encode(XML.content(message))
-          }
-          else if (msg.is_exit) {
-            val err =
-              "Prover terminated" +
-                (msg.properties match {
-                  case Markup.Process_Result(result) => ": " + result.print_rc
-                  case _ => ""
-                })
-            Build_Session_Errors(List(err))
-          }
-        case _ =>
-      }
-
-      session_setup(session_name, session)
-
-      val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
-
-      val process =
-        Isabelle_Process.start(store, options, session, session_background,
-          logic = parent, raw_ml_system = is_pure,
-          use_prelude = use_prelude, eval_main = eval_main,
-          cwd = info.dir.file, env = env)
-
-      val build_errors =
-        Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
-          Exn.capture { process.await_startup() } match {
-            case Exn.Res(_) =>
-              val resources_yxml = resources.init_session_yxml
-              val encode_options: XML.Encode.T[Options] =
-                options => session.prover_options(options).encode
-              val args_yxml =
-                YXML.string_of_body(
-                  {
-                    import XML.Encode._
-                    pair(string, list(pair(encode_options, list(pair(string, properties)))))(
-                      (session_name, info.theories))
-                  })
-              session.protocol_command("build_session", resources_yxml, args_yxml)
-              Build_Session_Errors.result
-            case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
-          }
-        }
-
-      val process_result =
-        Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
-
-      session.stop()
-
-      val export_errors =
-        export_consumer.shutdown(close = true).map(Output.error_message_text)
-
-      val (document_output, document_errors) =
-        try {
-          if (build_errors.isInstanceOf[Exn.Res[_]] && process_result.ok && info.documents.nonEmpty) {
-            using(Export.open_database_context(store)) { database_context =>
-              val documents =
-                using(database_context.open_session(session_background)) {
-                  session_context =>
-                    Document_Build.build_documents(
-                      Document_Build.context(session_context, progress = progress),
-                      output_sources = info.document_output,
-                      output_pdf = info.document_output)
-                }
-              using(database_context.open_database(session_name, output = true))(session_database =>
-                documents.foreach(_.write(session_database.db, session_name)))
-              (documents.flatMap(_.log_lines), Nil)
-            }
-          }
-          else (Nil, Nil)
-        }
-        catch {
-          case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
-          case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
-        }
-
-      val result = {
-        val theory_timing =
-          theory_timings.iterator.flatMap(
-            {
-              case props @ Markup.Name(name) => Some(name -> props)
-              case _ => None
-            }).toMap
-        val used_theory_timings =
-          for { (name, _) <- session_background.base.used_theories }
-            yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
-
-        val more_output =
-          Library.trim_line(stdout.toString) ::
-            command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
-            used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
-            session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
-            runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
-            task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
-            document_output
-
-        process_result.output(more_output)
-          .error(Library.trim_line(stderr.toString))
-          .errors_rc(export_errors ::: document_errors)
-      }
-
-      build_errors match {
-        case Exn.Res(build_errs) =>
-          val errs = build_errs ::: document_errors
-          if (errs.nonEmpty) {
-            result.error_rc.output(
-              errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
-                errs.map(Protocol.Error_Message_Marker.apply))
-          }
-          else if (progress.stopped && result.ok) result.copy(rc = Process_Result.RC.interrupt)
-          else result
-        case Exn.Exn(Exn.Interrupt()) =>
-          if (result.ok) result.copy(rc = Process_Result.RC.interrupt)
-          else result
-        case Exn.Exn(exn) => throw exn
-      }
-    }
-
-  def terminate(): Unit = future_result.cancel()
-  def is_finished: Boolean = future_result.is_finished
-
-  private val timeout_request: Option[Event_Timer.Request] = {
-    if (info.timeout_ignored) None
-    else Some(Event_Timer.request(Time.now() + info.timeout) { terminate() })
-  }
-
-  def join: Process_Result = {
-    val result = future_result.join
-
-    val was_timeout =
-      timeout_request match {
-        case None => false
-        case Some(request) => !request.cancel()
-      }
-
-    if (result.ok) result
-    else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc
-    else if (result.interrupted) result.error(Output.error_message_text("Interrupt"))
-    else result
-  }
-}
--- a/src/Pure/Tools/build_process.scala	Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/Tools/build_process.scala	Sun Feb 19 13:51:49 2023 +0100
@@ -160,13 +160,6 @@
   private val build_deps = build_context.deps
   private val progress = build_context.progress
 
-  // global state
-  private val numa_nodes = new NUMA.Nodes(numa_shuffling)
-  private var build_graph = build_context.sessions_structure.build_graph
-  private var build_order = SortedSet.from(build_graph.keys)(build_context.ordering)
-  private var running = Map.empty[String, (SHA1.Shasum, Build_Job)]
-  private var results = Map.empty[String, Build_Process.Result]
-
   private val log =
     build_options.string("system_log") match {
       case "" => No_Logger
@@ -174,19 +167,65 @@
       case log_file => Logger.make(Some(Path.explode(log_file)))
     }
 
-  private def remove_pending(name: String): Unit = {
-    build_graph = build_graph.del_node(name)
-    build_order = build_order - name
+  // global state
+  private val _numa_nodes = new NUMA.Nodes(numa_shuffling)
+  private var _build_graph = build_context.sessions_structure.build_graph
+  private var _build_order = SortedSet.from(_build_graph.keys)(build_context.ordering)
+  private var _running = Map.empty[String, Build_Job]
+  private var _results = Map.empty[String, Build_Process.Result]
+
+  private def remove_pending(name: String): Unit = synchronized {
+    _build_graph = _build_graph.del_node(name)
+    _build_order = _build_order - name
+  }
+
+  private def next_pending(): Option[String] = synchronized {
+    if (_running.size < (max_jobs max 1)) {
+      _build_order.iterator
+        .dropWhile(name => _running.isDefinedAt(name) || !_build_graph.is_minimal(name))
+        .nextOption()
+    }
+    else None
+  }
+
+  private def next_numa_node(): Option[Int] = synchronized {
+    _numa_nodes.next(used =
+      Set.from(for { job <- _running.valuesIterator; i <- job.numa_node } yield i))
   }
 
-  private def next_pending(): Option[String] =
-    build_order.iterator
-      .dropWhile(name => running.isDefinedAt(name) || !build_graph.is_minimal(name))
-      .nextOption()
+  private def test_running(): Boolean = synchronized { !_build_graph.is_empty }
+
+  private def stop_running(): Unit = synchronized { _running.valuesIterator.foreach(_.terminate()) }
+
+  private def finished_running(): List[Build_Job.Build_Session] = synchronized {
+    List.from(
+      _running.valuesIterator.flatMap {
+        case job: Build_Job.Build_Session if job.is_finished => Some(job)
+        case _ => None
+      })
+  }
+
+  private def job_running(name: String, job: Build_Job): Build_Job = synchronized {
+    _running += (name -> job)
+    job
+  }
 
-  private def used_node(i: Int): Boolean =
-    running.iterator.exists(
-      { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i })
+  private def remove_running(name: String): Unit = synchronized {
+    _running -= name
+  }
+
+  private def add_result(
+    name: String,
+    current: Boolean,
+    output_heap: SHA1.Shasum,
+    process_result: Process_Result
+  ): Unit = synchronized {
+    _results += (name -> Build_Process.Result(current, output_heap, process_result))
+  }
+
+  private def get_results(names: List[String]): List[Build_Process.Result] = synchronized {
+    names.map(_results.apply)
+  }
 
   private def session_finished(session_name: String, process_result: Process_Result): String =
     "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
@@ -198,14 +237,10 @@
     "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")"
   }
 
-  private def finish_job(session_name: String, input_heaps: SHA1.Shasum, job: Build_Job): Unit = {
+  private def finish_job(job: Build_Job.Build_Session): Unit = {
+    val session_name = job.session_name
     val process_result = job.join
-
-    val output_heap =
-      if (process_result.ok && job.do_store && store.output_heap(session_name).is_file) {
-        SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name)
-      }
-      else SHA1.no_shasum
+    val output_heap = job.finish
 
     val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
     val process_result_tail = {
@@ -236,7 +271,7 @@
         build_log =
           if (process_result.timeout) build_log.error("Timeout") else build_log,
         build =
-          Build.Session_Info(build_deps.sources_shasum(session_name), input_heaps,
+          Sessions.Build_Info(build_deps.sources_shasum(session_name), job.input_heaps,
             output_heap, process_result.rc, UUID.random().toString)))
 
     // messages
@@ -251,15 +286,18 @@
       if (!process_result.interrupted) progress.echo(process_result_tail.out)
     }
 
-    remove_pending(session_name)
-    running -= session_name
-    results += (session_name -> Build_Process.Result(false, output_heap, process_result_tail))
+    synchronized {
+      remove_pending(session_name)
+      remove_running(session_name)
+      add_result(session_name, false, output_heap, process_result_tail)
+    }
   }
 
   private def start_job(session_name: String): Unit = {
     val ancestor_results =
-      build_deps.sessions_structure.build_requirements(List(session_name)).
-        filterNot(_ == session_name).map(results(_))
+      get_results(
+        build_deps.sessions_structure.build_requirements(List(session_name)).
+          filterNot(_ == session_name))
     val input_heaps =
       if (ancestor_results.isEmpty) {
         SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE")))
@@ -289,13 +327,17 @@
     val all_current = current && ancestor_results.forall(_.current)
 
     if (all_current) {
-      remove_pending(session_name)
-      results += (session_name -> Build_Process.Result(true, output_heap, Process_Result.ok))
+      synchronized {
+        remove_pending(session_name)
+        add_result(session_name, true, output_heap, Process_Result.ok)
+      }
     }
     else if (no_build) {
       progress.echo_if(verbose, "Skipping " + session_name + " ...")
-      remove_pending(session_name)
-      results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.error))
+      synchronized {
+        remove_pending(session_name)
+        add_result(session_name, false, output_heap, Process_Result.error)
+      }
     }
     else if (ancestor_results.forall(_.ok) && !progress.stopped) {
       progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...")
@@ -309,16 +351,21 @@
         new Resources(session_background, log = log,
           command_timings = build_context(session_name).old_command_timings)
 
-      val numa_node = numa_nodes.next(used_node)
       val job =
-        new Build_Job(progress, session_background, store, do_store,
-          resources, session_setup, numa_node)
-      running += (session_name -> (input_heaps, job))
+        synchronized {
+          val numa_node = next_numa_node()
+          job_running(session_name,
+            new Build_Job.Build_Session(progress, session_background, store, do_store,
+              resources, session_setup, input_heaps, numa_node))
+        }
+      job.start()
     }
     else {
       progress.echo(session_name + " CANCELLED")
-      remove_pending(session_name)
-      results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.undefined))
+      synchronized {
+        remove_pending(session_name)
+        add_result(session_name, false, output_heap, Process_Result.undefined)
+      }
     }
   }
 
@@ -328,23 +375,17 @@
     }
 
   def run(): Map[String, Build_Process.Result] = {
-    while (!build_graph.is_empty) {
-      if (progress.stopped) {
-        for ((_, (_, job)) <- running) job.terminate()
-      }
+    while (test_running()) {
+      if (progress.stopped) stop_running()
 
-      running.find({ case (_, (_, job)) => job.is_finished }) match {
-        case Some((session_name, (input_heaps, job))) =>
-          finish_job(session_name, input_heaps, job)
-        case None if running.size < (max_jobs max 1) =>
-          next_pending() match {
-            case Some(session_name) => start_job(session_name)
-            case None => sleep()
-          }
+      for (job <- finished_running()) finish_job(job)
+
+      next_pending() match {
+        case Some(session_name) => start_job(session_name)
         case None => sleep()
       }
     }
 
-    results
+    synchronized { _results }
   }
 }