reuse SSH.Server connection database server;
authorwenzelm
Sun, 16 Jul 2023 21:01:33 +0200
changeset 78372 30d3faa6c245
parent 78371 928e031b7c52
child 78373 2deecde7f1f6
reuse SSH.Server connection database server;
src/Pure/Thy/store.scala
src/Pure/Tools/build.scala
src/Pure/Tools/build_job.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/Thy/store.scala	Sun Jul 16 19:38:12 2023 +0200
+++ b/src/Pure/Thy/store.scala	Sun Jul 16 21:01:33 2023 +0200
@@ -313,26 +313,30 @@
       ssh_port = options.int("build_database_ssh_port"),
       ssh_user = options.string("build_database_ssh_user"))
 
-  def maybe_open_database_server(): Option[SQL.Database] =
-    if (build_database_server) Some(open_database_server()) else None
+  def maybe_open_database_server(server: SSH.Server = SSH.no_server): Option[SQL.Database] =
+    if (build_database_server) Some(open_database_server(server = server)) else None
 
-  def open_build_database(path: Path): SQL.Database =
-    if (build_database_server) open_database_server()
+  def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database =
+    if (build_database_server) open_database_server(server = server)
     else SQLite.open_database(path, restrict = true)
 
   def maybe_open_build_database(
-    path: Path = Path.explode("$ISABELLE_HOME_USER/build.db")
-  ): Option[SQL.Database] = if (build_database_test) Some(open_build_database(path)) else None
+    path: Path = Path.explode("$ISABELLE_HOME_USER/build.db"),
+    server: SSH.Server = SSH.no_server
+  ): Option[SQL.Database] = {
+    if (build_database_test) Some(open_build_database(path, server = server)) else None
+  }
 
   def try_open_database(
     name: String,
     output: Boolean = false,
+    server: SSH.Server = SSH.no_server,
     server_mode: Boolean = build_database_server
   ): Option[SQL.Database] = {
     def check(db: SQL.Database): Option[SQL.Database] =
       if (output || session_info_exists(db)) Some(db) else { db.close(); None }
 
-    if (server_mode) check(open_database_server())
+    if (server_mode) check(open_database_server(server = server))
     else if (output) Some(SQLite.open_database(output_database(name)))
     else {
       (for {
@@ -346,8 +350,13 @@
   def error_database(name: String): Nothing =
     error("Missing build database for session " + quote(name))
 
-  def open_database(name: String, output: Boolean = false): SQL.Database =
-    try_open_database(name, output = output) getOrElse error_database(name)
+  def open_database(
+    name: String,
+    output: Boolean = false,
+    server: SSH.Server = SSH.no_server
+  ): SQL.Database = {
+    try_open_database(name, output = output, server = server) getOrElse error_database(name)
+  }
 
   def clean_output(
     database_server: Option[SQL.Database],
@@ -378,6 +387,7 @@
   }
 
   def check_output(
+    server: SSH.Server,
     name: String,
     session_options: Options,
     sources_shasum: SHA1.Shasum,
@@ -385,7 +395,7 @@
     fresh_build: Boolean,
     store_heap: Boolean
   ): (Boolean, SHA1.Shasum) = {
-    try_open_database(name) match {
+    try_open_database(name, server = server) match {
       case Some(db) =>
         using(db) { _ =>
           read_build(db, name) match {
--- a/src/Pure/Tools/build.scala	Sun Jul 16 19:38:12 2023 +0200
+++ b/src/Pure/Tools/build.scala	Sun Jul 16 21:01:33 2023 +0200
@@ -72,8 +72,11 @@
     def build_options(options: Options): Options =
       options + "completion_limit=0" + "editor_tracing_messages=0"
 
-    def build_process(build_context: Build_Process.Context, build_progress: Progress): Build_Process =
-      new Build_Process(build_context, build_progress)
+    def build_process(
+      build_context: Build_Process.Context,
+      build_progress: Progress,
+      server: SSH.Server
+    ): Build_Process = new Build_Process(build_context, build_progress, server)
 
     final def build_store(options: Options, cache: Term.Cache = Term.Cache.make()): Store = {
       val store = Store(build_options(options), cache = cache)
@@ -85,9 +88,13 @@
       store
     }
 
-    final def run(context: Build_Process.Context, progress: Progress): Results =
+    final def run(
+      context: Build_Process.Context,
+      progress: Progress,
+      server: SSH.Server
+    ): Results =
       Isabelle_Thread.uninterruptible {
-        using(build_process(context, progress)) { build_process =>
+        using(build_process(context, progress, server)) { build_process =>
           Results(context, build_process.run())
         }
       }
@@ -126,108 +133,111 @@
     val store = build_engine.build_store(options, cache = cache)
     val build_options = store.options
 
+    using(store.open_server()) { server =>
+      using_optional(store.maybe_open_database_server(server = server)) { database_server =>
 
-    /* session selection and dependencies */
+
+        /* session selection and dependencies */
 
-    val full_sessions =
-      Sessions.load_structure(build_options, dirs = dirs, select_dirs = select_dirs, infos = infos,
-        augment_options = augment_options)
-    val full_sessions_selection = full_sessions.imports_selection(selection)
+        val full_sessions =
+          Sessions.load_structure(build_options, dirs = dirs, select_dirs = select_dirs, infos = infos,
+            augment_options = augment_options)
+        val full_sessions_selection = full_sessions.imports_selection(selection)
 
-    val build_deps = {
-      val deps0 =
-        Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true,
-          list_files = list_files, check_keywords = check_keywords).check_errors
+        val build_deps = {
+          val deps0 =
+            Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true,
+              list_files = list_files, check_keywords = check_keywords).check_errors
 
-      if (soft_build && !fresh_build) {
-        val outdated =
-          deps0.sessions_structure.build_topological_order.flatMap(name =>
-            store.try_open_database(name) match {
-              case Some(db) =>
-                using(db)(store.read_build(_, name)) match {
-                  case Some(build) if build.ok =>
-                    val session_options = deps0.sessions_structure(name).options
-                    val session_sources = deps0.sources_shasum(name)
-                    if (Sessions.eq_sources(session_options, build.sources, session_sources)) None
-                    else Some(name)
-                  case _ => Some(name)
-                }
-              case None => Some(name)
-            })
+          if (soft_build && !fresh_build) {
+            val outdated =
+              deps0.sessions_structure.build_topological_order.flatMap(name =>
+                store.try_open_database(name, server = server) match {
+                  case Some(db) =>
+                    using(db)(store.read_build(_, name)) match {
+                      case Some(build) if build.ok =>
+                        val session_options = deps0.sessions_structure(name).options
+                        val session_sources = deps0.sources_shasum(name)
+                        if (Sessions.eq_sources(session_options, build.sources, session_sources)) None
+                        else Some(name)
+                      case _ => Some(name)
+                    }
+                  case None => Some(name)
+                })
 
-        Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)),
-          progress = progress, inlined_files = true).check_errors
-      }
-      else deps0
-    }
+            Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)),
+              progress = progress, inlined_files = true).check_errors
+          }
+          else deps0
+        }
 
 
-    /* check unknown files */
+        /* check unknown files */
+
+        if (check_unknown_files) {
+          val source_files =
+            (for {
+              (_, base) <- build_deps.session_bases.iterator
+              (path, _) <- base.session_sources.iterator
+            } yield path).toList
+          Mercurial.check_files(source_files)._2 match {
+            case Nil =>
+            case unknown_files =>
+              progress.echo_warning("Unknown files (not part of the underlying Mercurial repository):" +
+                unknown_files.map(File.standard_path).sorted.mkString("\n  ", "\n  ", ""))
+          }
+        }
+
+
+        /* build process and results */
+
+        val build_context =
+          Build_Process.Context(store, build_deps, hostname = hostname(build_options),
+            build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs,
+            fresh_build = fresh_build, no_build = no_build, session_setup = session_setup,
+            master = true)
 
-    if (check_unknown_files) {
-      val source_files =
-        (for {
-          (_, base) <- build_deps.session_bases.iterator
-          (path, _) <- base.session_sources.iterator
-        } yield path).toList
-      Mercurial.check_files(source_files)._2 match {
-        case Nil =>
-        case unknown_files =>
-          progress.echo_warning("Unknown files (not part of the underlying Mercurial repository):" +
-            unknown_files.map(File.standard_path).sorted.mkString("\n  ", "\n  ", ""))
+        if (clean_build) {
+          for (name <- full_sessions.imports_descendants(full_sessions_selection)) {
+            store.clean_output(database_server, name) match {
+              case None =>
+              case Some(true) => progress.echo("Cleaned " + name)
+              case Some(false) => progress.echo(name + " FAILED to clean")
+            }
+          }
+        }
+
+        val results = build_engine.run(build_context, progress, server)
+
+        if (export_files) {
+          for (name <- full_sessions_selection.iterator if results(name).ok) {
+            val info = results.info(name)
+            if (info.export_files.nonEmpty) {
+              progress.echo("Exporting " + info.name + " ...")
+              for ((dir, prune, pats) <- info.export_files) {
+                Export.export_files(store, name, info.dir + dir,
+                  progress = if (progress.verbose) progress else new Progress,
+                  export_prune = prune,
+                  export_patterns = pats)
+              }
+            }
+          }
+        }
+
+        val presentation_sessions =
+          results.sessions_ok.filter(name => browser_info.enabled(results.info(name)))
+        if (presentation_sessions.nonEmpty && !progress.stopped) {
+          Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions,
+            progress = progress)
+        }
+
+        if (!results.ok && (progress.verbose || !no_build)) {
+          progress.echo("Unfinished session(s): " + commas(results.unfinished))
+        }
+
+        results
       }
     }
-
-
-    /* build process and results */
-
-    val build_context =
-      Build_Process.Context(store, build_deps, hostname = hostname(build_options),
-        build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs,
-        fresh_build = fresh_build, no_build = no_build, session_setup = session_setup,
-        master = true)
-
-    if (clean_build) {
-      using_optional(store.maybe_open_database_server()) { database_server =>
-        for (name <- full_sessions.imports_descendants(full_sessions_selection)) {
-          store.clean_output(database_server, name) match {
-            case None =>
-            case Some(true) => progress.echo("Cleaned " + name)
-            case Some(false) => progress.echo(name + " FAILED to clean")
-          }
-        }
-      }
-    }
-
-    val results = build_engine.run(build_context, progress)
-
-    if (export_files) {
-      for (name <- full_sessions_selection.iterator if results(name).ok) {
-        val info = results.info(name)
-        if (info.export_files.nonEmpty) {
-          progress.echo("Exporting " + info.name + " ...")
-          for ((dir, prune, pats) <- info.export_files) {
-            Export.export_files(store, name, info.dir + dir,
-              progress = if (progress.verbose) progress else new Progress,
-              export_prune = prune,
-              export_patterns = pats)
-          }
-        }
-      }
-    }
-
-    val presentation_sessions =
-      results.sessions_ok.filter(name => browser_info.enabled(results.info(name)))
-    if (presentation_sessions.nonEmpty && !progress.stopped) {
-      Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions,
-        progress = progress)
-    }
-
-    if (!results.ok && (progress.verbose || !no_build)) {
-      progress.echo("Unfinished session(s): " + commas(results.unfinished))
-    }
-
-    results
   }
 
 
@@ -450,29 +460,31 @@
     val store = build_engine.build_store(options)
     val build_options = store.options
 
-    using_optional(store.maybe_open_build_database()) { build_database =>
-      val builds = read_builds(build_database)
+    using(store.open_server()) { server =>
+      using_optional(store.maybe_open_build_database(server = server)) { build_database =>
+        val builds = read_builds(build_database)
 
-      if (list_builds) progress.echo(print_builds(build_database, builds))
+        if (list_builds) progress.echo(print_builds(build_database, builds))
 
-      if (!list_builds || build_id.nonEmpty) {
-        val build_master = id_builds(build_database, build_id, builds)
+        if (!list_builds || build_id.nonEmpty) {
+          val build_master = id_builds(build_database, build_id, builds)
 
-        val sessions_structure =
-          Sessions.load_structure(build_options, dirs = dirs).
-            selection(Sessions.Selection(sessions = build_master.sessions))
+          val sessions_structure =
+            Sessions.load_structure(build_options, dirs = dirs).
+              selection(Sessions.Selection(sessions = build_master.sessions))
 
-        val build_deps =
-          Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors
+          val build_deps =
+            Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors
 
-        val build_context =
-          Build_Process.Context(store, build_deps, hostname = hostname(build_options),
-            numa_shuffling = numa_shuffling, max_jobs = max_jobs,
-            build_uuid = build_master.build_uuid)
+          val build_context =
+            Build_Process.Context(store, build_deps, hostname = hostname(build_options),
+              numa_shuffling = numa_shuffling, max_jobs = max_jobs,
+              build_uuid = build_master.build_uuid)
 
-        Some(build_engine.run(build_context, progress))
+          Some(build_engine.run(build_context, progress, server))
+        }
+        else None
       }
-      else None
     }
   }
 
--- a/src/Pure/Tools/build_job.scala	Sun Jul 16 19:38:12 2023 +0200
+++ b/src/Pure/Tools/build_job.scala	Sun Jul 16 21:01:33 2023 +0200
@@ -24,14 +24,14 @@
     session_context: Session_Context,
     progress: Progress,
     log: Logger,
-    database_server: Option[SQL.Database],
+    server: SSH.Server,
     session_background: Sessions.Background,
     sources_shasum: SHA1.Shasum,
     input_shasum: SHA1.Shasum,
     node_info: Host.Node_Info,
     store_heap: Boolean
   ): Session_Job = {
-    new Session_Job(build_context, session_context, progress, log, database_server,
+    new Session_Job(build_context, session_context, progress, log, server,
       session_background, sources_shasum, input_shasum, node_info, store_heap)
   }
 
@@ -45,14 +45,15 @@
       sources_shasum: SHA1.Shasum,
       timeout: Time,
       store: Store,
-      progress: Progress = new Progress
+      progress: Progress = new Progress,
+      server: SSH.Server = SSH.no_server
     ): Session_Context = {
       def default: Session_Context =
         Session_Context(
           name, deps, ancestors, session_prefs, sources_shasum, timeout,
           Time.zero, Bytes.empty, build_uuid)
 
-      store.try_open_database(name) match {
+      store.try_open_database(name, server = server) match {
         case None => default
         case Some(db) =>
           def ignore_error(msg: String) = {
@@ -99,7 +100,7 @@
     session_context: Session_Context,
     progress: Progress,
     log: Logger,
-    database_server: Option[SQL.Database],
+    server: SSH.Server,
     session_background: Sessions.Background,
     sources_shasum: SHA1.Shasum,
     input_shasum: SHA1.Shasum,
@@ -115,414 +116,417 @@
 
         val store = build_context.store
 
-        store.clean_output(database_server, session_name, session_init = true)
-
-        val session_sources =
-          Store.Sources.load(session_background.base, cache = store.cache.compress)
-
-        val env =
-          Isabelle_System.settings(
-            List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
-
-        val session_heaps =
-          session_background.info.parent match {
-            case None => Nil
-            case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic)
-          }
-
-        val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil
-
-        val eval_store =
-          if (store_heap) {
-            (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
-            List("ML_Heap.save_child " +
-              ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
-          }
-          else Nil
+        using_optional(store.maybe_open_database_server(server = server)) { database_server =>
 
-        def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
-          session_background.base.theory_load_commands.get(node_name.theory) match {
-            case None => Nil
-            case Some(spans) =>
-              val syntax = session_background.base.theory_syntax(node_name)
-              val master_dir = Path.explode(node_name.master_dir)
-              for (span <- spans; file <- span.loaded_files(syntax).files)
-                yield {
-                  val src_path = Path.explode(file)
-                  val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
-
-                  val bytes = session_sources(blob_name.node).bytes
-                  val text = bytes.text
-                  val chunk = Symbol.Text_Chunk(text)
+          store.clean_output(database_server, session_name, session_init = true)
 
-                  Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
-                    Document.Blobs.Item(bytes, text, chunk, changed = false)
-                }
-          }
-
-
-        /* session */
-
-        val resources =
-          new Resources(session_background, log = log,
-            command_timings =
-              Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache))
-
-        val session =
-          new Session(options, resources) {
-            override val cache: Term.Cache = store.cache
-
-            override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
-              Command.Blobs_Info.make(session_blobs(node_name))
+          val session_sources =
+            Store.Sources.load(session_background.base, cache = store.cache.compress)
 
-            override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
-              Document.Blobs.make(session_blobs(node_name))
-          }
-
-        object Build_Session_Errors {
-          private val promise: Promise[List[String]] = Future.promise
-
-          def result: Exn.Result[List[String]] = promise.join_result
-          def cancel(): Unit = promise.cancel()
-          def apply(errs: List[String]): Unit = {
-            try { promise.fulfill(errs) }
-            catch { case _: IllegalStateException => }
-          }
-        }
-
-        val export_consumer =
-          Export.consumer(store.open_database(session_name, output = true), store.cache,
-            progress = progress)
-
-        val stdout = new StringBuilder(1000)
-        val stderr = new StringBuilder(1000)
-        val command_timings = new mutable.ListBuffer[Properties.T]
-        val theory_timings = new mutable.ListBuffer[Properties.T]
-        val session_timings = new mutable.ListBuffer[Properties.T]
-        val runtime_statistics = new mutable.ListBuffer[Properties.T]
-        val task_statistics = new mutable.ListBuffer[Properties.T]
+          val env =
+            Isabelle_System.settings(
+              List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
 
-        def fun(
-          name: String,
-          acc: mutable.ListBuffer[Properties.T],
-          unapply: Properties.T => Option[Properties.T]
-        ): (String, Session.Protocol_Function) = {
-          name -> ((msg: Prover.Protocol_Output) =>
-            unapply(msg.properties) match {
-              case Some(props) => acc += props; true
-              case _ => false
-            })
-        }
-
-        session.init_protocol_handler(new Session.Protocol_Handler {
-            override def exit(): Unit = Build_Session_Errors.cancel()
-
-            private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
-              val (rc, errors) =
-                try {
-                  val (rc, errs) = {
-                    import XML.Decode._
-                    pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
-                  }
-                  val errors =
-                    for (err <- errs) yield {
-                      val prt = Protocol_Message.expose_no_reports(err)
-                      Pretty.string_of(prt, metric = Symbol.Metric)
-                    }
-                  (rc, errors)
-                }
-                catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
-
-              session.protocol_command("Prover.stop", rc.toString)
-              Build_Session_Errors(errors)
-              true
+          val session_heaps =
+            session_background.info.parent match {
+              case None => Nil
+              case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic)
             }
 
-            private def loading_theory(msg: Prover.Protocol_Output): Boolean =
-              msg.properties match {
-                case Markup.Loading_Theory(Markup.Name(name)) =>
-                  progress.theory(Progress.Theory(name, session = session_name))
-                  false
-                case _ => false
-              }
-
-            private def export_(msg: Prover.Protocol_Output): Boolean =
-              msg.properties match {
-                case Protocol.Export(args) =>
-                  export_consumer.make_entry(session_name, args, msg.chunk)
-                  true
-                case _ => false
-              }
+          val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil
 
-            override val functions: Session.Protocol_Functions =
-              List(
-                Markup.Build_Session_Finished.name -> build_session_finished,
-                Markup.Loading_Theory.name -> loading_theory,
-                Markup.EXPORT -> export_,
-                fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
-                fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
-                fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
-          })
-
-        session.command_timings += Session.Consumer("command_timings") {
-          case Session.Command_Timing(props) =>
-            for {
-              elapsed <- Markup.Elapsed.unapply(props)
-              elapsed_time = Time.seconds(elapsed)
-              if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
-            } command_timings += props.filter(Markup.command_timing_property)
-        }
-
-        session.runtime_statistics += Session.Consumer("ML_statistics") {
-          case Session.Runtime_Statistics(props) => runtime_statistics += props
-        }
+          val eval_store =
+            if (store_heap) {
+              (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
+              List("ML_Heap.save_child " +
+                ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
+            }
+            else Nil
 
-        session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
-          case snapshot =>
-            if (!progress.stopped) {
-              def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
-                if (!progress.stopped) {
-                  val theory_name = snapshot.node_name.theory
-                  val args =
-                    Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
-                  val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
-                  export_consumer.make_entry(session_name, args, body)
-                }
-              }
-              def export_text(name: String, text: String, compress: Boolean = true): Unit =
-                export_(name, List(XML.Text(text)), compress = compress)
-
-              for (command <- snapshot.snippet_command) {
-                export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
-              }
-
-              export_text(Export.FILES,
-                cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
-                compress = false)
+          def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
+            session_background.base.theory_load_commands.get(node_name.theory) match {
+              case None => Nil
+              case Some(spans) =>
+                val syntax = session_background.base.theory_syntax(node_name)
+                val master_dir = Path.explode(node_name.master_dir)
+                for (span <- spans; file <- span.loaded_files(syntax).files)
+                  yield {
+                    val src_path = Path.explode(file)
+                    val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
 
-              for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
-                val xml = snapshot.switch(blob_name).xml_markup()
-                export_(Export.MARKUP + (i + 1), xml)
-              }
-              export_(Export.MARKUP, snapshot.xml_markup())
-              export_(Export.MESSAGES, snapshot.messages.map(_._1))
-            }
-        }
+                    val bytes = session_sources(blob_name.node).bytes
+                    val text = bytes.text
+                    val chunk = Symbol.Text_Chunk(text)
 
-        session.all_messages += Session.Consumer[Any]("build_session_output") {
-          case msg: Prover.Output =>
-            val message = msg.message
-            if (msg.is_system) resources.log(Protocol.message_text(message))
-
-            if (msg.is_stdout) {
-              stdout ++= Symbol.encode(XML.content(message))
+                    Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
+                      Document.Blobs.Item(bytes, text, chunk, changed = false)
+                  }
             }
-            else if (msg.is_stderr) {
-              stderr ++= Symbol.encode(XML.content(message))
-            }
-            else if (msg.is_exit) {
-              val err =
-                "Prover terminated" +
-                  (msg.properties match {
-                    case Markup.Process_Result(result) => ": " + result.print_rc
-                    case _ => ""
-                  })
-              Build_Session_Errors(List(err))
-            }
-          case _ =>
-        }
-
-        build_context.session_setup(session_name, session)
-
-        val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
 
 
-        /* process */
+          /* session */
 
-        val process =
-          Isabelle_Process.start(options, session, session_background, session_heaps,
-            use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env)
+          val resources =
+            new Resources(session_background, log = log,
+              command_timings =
+                Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache))
 
-        val timeout_request: Option[Event_Timer.Request] =
-          if (info.timeout_ignored) None
-          else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() })
+          val session =
+            new Session(options, resources) {
+              override val cache: Term.Cache = store.cache
 
-        val build_errors =
-          Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
-            Exn.capture { process.await_startup() } match {
-              case Exn.Res(_) =>
-                val resources_yxml = resources.init_session_yxml
-                val encode_options: XML.Encode.T[Options] =
-                  options => session.prover_options(options).encode
-                val args_yxml =
-                  YXML.string_of_body(
-                    {
-                      import XML.Encode._
-                      pair(string, list(pair(encode_options, list(pair(string, properties)))))(
-                        (session_name, info.theories))
-                    })
-                session.protocol_command("build_session", resources_yxml, args_yxml)
-                Build_Session_Errors.result
-              case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
+              override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
+                Command.Blobs_Info.make(session_blobs(node_name))
+
+              override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
+                Document.Blobs.make(session_blobs(node_name))
+            }
+
+          object Build_Session_Errors {
+            private val promise: Promise[List[String]] = Future.promise
+
+            def result: Exn.Result[List[String]] = promise.join_result
+            def cancel(): Unit = promise.cancel()
+            def apply(errs: List[String]): Unit = {
+              try { promise.fulfill(errs) }
+              catch { case _: IllegalStateException => }
             }
           }
 
-        val result0 =
-          Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
+          val export_consumer =
+            Export.consumer(store.open_database(session_name, output = true, server = server),
+              store.cache, progress = progress)
 
-        val was_timeout =
-          timeout_request match {
-            case None => false
-            case Some(request) => !request.cancel()
+          val stdout = new StringBuilder(1000)
+          val stderr = new StringBuilder(1000)
+          val command_timings = new mutable.ListBuffer[Properties.T]
+          val theory_timings = new mutable.ListBuffer[Properties.T]
+          val session_timings = new mutable.ListBuffer[Properties.T]
+          val runtime_statistics = new mutable.ListBuffer[Properties.T]
+          val task_statistics = new mutable.ListBuffer[Properties.T]
+
+          def fun(
+            name: String,
+            acc: mutable.ListBuffer[Properties.T],
+            unapply: Properties.T => Option[Properties.T]
+          ): (String, Session.Protocol_Function) = {
+            name -> ((msg: Prover.Protocol_Output) =>
+              unapply(msg.properties) match {
+                case Some(props) => acc += props; true
+                case _ => false
+              })
           }
 
-        session.stop()
+          session.init_protocol_handler(new Session.Protocol_Handler {
+              override def exit(): Unit = Build_Session_Errors.cancel()
+
+              private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
+                val (rc, errors) =
+                  try {
+                    val (rc, errs) = {
+                      import XML.Decode._
+                      pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
+                    }
+                    val errors =
+                      for (err <- errs) yield {
+                        val prt = Protocol_Message.expose_no_reports(err)
+                        Pretty.string_of(prt, metric = Symbol.Metric)
+                      }
+                    (rc, errors)
+                  }
+                  catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
+
+                session.protocol_command("Prover.stop", rc.toString)
+                Build_Session_Errors(errors)
+                true
+              }
 
-        val export_errors =
-          export_consumer.shutdown(close = true).map(Output.error_message_text)
+              private def loading_theory(msg: Prover.Protocol_Output): Boolean =
+                msg.properties match {
+                  case Markup.Loading_Theory(Markup.Name(name)) =>
+                    progress.theory(Progress.Theory(name, session = session_name))
+                    false
+                  case _ => false
+                }
+
+              private def export_(msg: Prover.Protocol_Output): Boolean =
+                msg.properties match {
+                  case Protocol.Export(args) =>
+                    export_consumer.make_entry(session_name, args, msg.chunk)
+                    true
+                  case _ => false
+                }
+
+              override val functions: Session.Protocol_Functions =
+                List(
+                  Markup.Build_Session_Finished.name -> build_session_finished,
+                  Markup.Loading_Theory.name -> loading_theory,
+                  Markup.EXPORT -> export_,
+                  fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
+                  fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
+                  fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
+            })
+
+          session.command_timings += Session.Consumer("command_timings") {
+            case Session.Command_Timing(props) =>
+              for {
+                elapsed <- Markup.Elapsed.unapply(props)
+                elapsed_time = Time.seconds(elapsed)
+                if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
+              } command_timings += props.filter(Markup.command_timing_property)
+          }
 
-        val (document_output, document_errors) =
-          try {
-            if (build_errors.isInstanceOf[Exn.Res[_]] && result0.ok && info.documents.nonEmpty) {
-              using(Export.open_database_context(store)) { database_context =>
-                val documents =
-                  using(database_context.open_session(session_background)) {
-                    session_context =>
-                      Document_Build.build_documents(
-                        Document_Build.context(session_context, progress = progress),
-                        output_sources = info.document_output,
-                        output_pdf = info.document_output)
+          session.runtime_statistics += Session.Consumer("ML_statistics") {
+            case Session.Runtime_Statistics(props) => runtime_statistics += props
+          }
+
+          session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
+            case snapshot =>
+              if (!progress.stopped) {
+                def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
+                  if (!progress.stopped) {
+                    val theory_name = snapshot.node_name.theory
+                    val args =
+                      Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
+                    val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
+                    export_consumer.make_entry(session_name, args, body)
                   }
-                using(database_context.open_database(session_name, output = true))(session_database =>
-                  documents.foreach(_.write(session_database.db, session_name)))
-                (documents.flatMap(_.log_lines), Nil)
+                }
+                def export_text(name: String, text: String, compress: Boolean = true): Unit =
+                  export_(name, List(XML.Text(text)), compress = compress)
+
+                for (command <- snapshot.snippet_command) {
+                  export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
+                }
+
+                export_text(Export.FILES,
+                  cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
+                  compress = false)
+
+                for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
+                  val xml = snapshot.switch(blob_name).xml_markup()
+                  export_(Export.MARKUP + (i + 1), xml)
+                }
+                export_(Export.MARKUP, snapshot.xml_markup())
+                export_(Export.MESSAGES, snapshot.messages.map(_._1))
               }
-            }
-            else (Nil, Nil)
           }
-          catch {
-            case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
-            case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
+
+          session.all_messages += Session.Consumer[Any]("build_session_output") {
+            case msg: Prover.Output =>
+              val message = msg.message
+              if (msg.is_system) resources.log(Protocol.message_text(message))
+
+              if (msg.is_stdout) {
+                stdout ++= Symbol.encode(XML.content(message))
+              }
+              else if (msg.is_stderr) {
+                stderr ++= Symbol.encode(XML.content(message))
+              }
+              else if (msg.is_exit) {
+                val err =
+                  "Prover terminated" +
+                    (msg.properties match {
+                      case Markup.Process_Result(result) => ": " + result.print_rc
+                      case _ => ""
+                    })
+                Build_Session_Errors(List(err))
+              }
+            case _ =>
           }
 
+          build_context.session_setup(session_name, session)
+
+          val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
+
 
-        /* process result */
+          /* process */
+
+          val process =
+            Isabelle_Process.start(options, session, session_background, session_heaps,
+              use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env)
+
+          val timeout_request: Option[Event_Timer.Request] =
+            if (info.timeout_ignored) None
+            else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() })
 
-        val result1 = {
-          val theory_timing =
-            theory_timings.iterator.flatMap(
-              {
-                case props @ Markup.Name(name) => Some(name -> props)
-                case _ => None
-              }).toMap
-          val used_theory_timings =
-            for { (name, _) <- session_background.base.used_theories }
-              yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
+          val build_errors =
+            Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
+              Exn.capture { process.await_startup() } match {
+                case Exn.Res(_) =>
+                  val resources_yxml = resources.init_session_yxml
+                  val encode_options: XML.Encode.T[Options] =
+                    options => session.prover_options(options).encode
+                  val args_yxml =
+                    YXML.string_of_body(
+                      {
+                        import XML.Encode._
+                        pair(string, list(pair(encode_options, list(pair(string, properties)))))(
+                          (session_name, info.theories))
+                      })
+                  session.protocol_command("build_session", resources_yxml, args_yxml)
+                  Build_Session_Errors.result
+                case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
+              }
+            }
 
-          val more_output =
-            Library.trim_line(stdout.toString) ::
-              command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
-              used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
-              session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
-              runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
-              task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
-              document_output
+          val result0 =
+            Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
 
-          result0.output(more_output)
-            .error(Library.trim_line(stderr.toString))
-            .errors_rc(export_errors ::: document_errors)
-        }
+          val was_timeout =
+            timeout_request match {
+              case None => false
+              case Some(request) => !request.cancel()
+            }
+
+          session.stop()
+
+          val export_errors =
+            export_consumer.shutdown(close = true).map(Output.error_message_text)
 
-        val result2 =
-          build_errors match {
-            case Exn.Res(build_errs) =>
-              val errs = build_errs ::: document_errors
-              if (errs.nonEmpty) {
-                result1.error_rc.output(
-                  errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
-                    errs.map(Protocol.Error_Message_Marker.apply))
+          val (document_output, document_errors) =
+            try {
+              if (build_errors.isInstanceOf[Exn.Res[_]] && result0.ok && info.documents.nonEmpty) {
+                using(Export.open_database_context(store)) { database_context =>
+                  val documents =
+                    using(database_context.open_session(session_background)) {
+                      session_context =>
+                        Document_Build.build_documents(
+                          Document_Build.context(session_context, progress = progress),
+                          output_sources = info.document_output,
+                          output_pdf = info.document_output)
+                    }
+                  using(database_context.open_database(session_name, output = true))(session_database =>
+                    documents.foreach(_.write(session_database.db, session_name)))
+                  (documents.flatMap(_.log_lines), Nil)
+                }
               }
-              else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
-              else result1
-            case Exn.Exn(Exn.Interrupt()) =>
-              if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
-              else result1
-            case Exn.Exn(exn) => throw exn
-          }
-
-        val process_result =
-          if (result2.ok) result2
-          else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc
-          else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt"))
-          else result2
+              else (Nil, Nil)
+            }
+            catch {
+              case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
+              case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
+            }
 
 
-        /* output heap */
+          /* process result */
 
-        val output_shasum = {
-          val heap = store.output_heap(session_name)
-          if (process_result.ok && store_heap && heap.is_file) {
-            val slice = Space.MiB(options.real("build_database_slice")).bytes
-            val digest = ML_Heap.store(database_server, session_name, heap, slice)
-            SHA1.shasum(digest, session_name)
-          }
-          else SHA1.no_shasum
-        }
+          val result1 = {
+            val theory_timing =
+              theory_timings.iterator.flatMap(
+                {
+                  case props @ Markup.Name(name) => Some(name -> props)
+                  case _ => None
+                }).toMap
+            val used_theory_timings =
+              for { (name, _) <- session_background.base.used_theories }
+                yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
 
-        val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
+            val more_output =
+              Library.trim_line(stdout.toString) ::
+                command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
+                used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
+                session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
+                runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
+                task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
+                document_output
 
-        val build_log =
-          Build_Log.Log_File(session_name, process_result.out_lines).
-            parse_session_info(
-              command_timings = true,
-              theory_timings = true,
-              ml_statistics = true,
-              task_statistics = true)
+            result0.output(more_output)
+              .error(Library.trim_line(stderr.toString))
+              .errors_rc(export_errors ::: document_errors)
+          }
 
-        // write log file
-        if (process_result.ok) {
-          File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines))
-        }
-        else File.write(store.output_log(session_name), terminate_lines(log_lines))
+          val result2 =
+            build_errors match {
+              case Exn.Res(build_errs) =>
+                val errs = build_errs ::: document_errors
+                if (errs.nonEmpty) {
+                  result1.error_rc.output(
+                    errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
+                      errs.map(Protocol.Error_Message_Marker.apply))
+                }
+                else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
+                else result1
+              case Exn.Exn(Exn.Interrupt()) =>
+                if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
+                else result1
+              case Exn.Exn(exn) => throw exn
+            }
+
+          val process_result =
+            if (result2.ok) result2
+            else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc
+            else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt"))
+            else result2
+
+
+          /* output heap */
 
-        // write database
-        using(store.open_database(session_name, output = true))(db =>
-          store.write_session_info(db, session_name, session_sources,
-            build_log =
-              if (process_result.timeout) build_log.error("Timeout") else build_log,
-            build =
-              Store.Build_Info(
-                sources = sources_shasum,
-                input_heaps = input_shasum,
-                output_heap = output_shasum,
-                process_result.rc,
-                build_context.build_uuid)))
+          val output_shasum = {
+            val heap = store.output_heap(session_name)
+            if (process_result.ok && store_heap && heap.is_file) {
+              val slice = Space.MiB(options.real("build_database_slice")).bytes
+              val digest = ML_Heap.store(database_server, session_name, heap, slice)
+              SHA1.shasum(digest, session_name)
+            }
+            else SHA1.no_shasum
+          }
+
+          val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
 
-        // messages
-        process_result.err_lines.foreach(progress.echo(_))
+          val build_log =
+            Build_Log.Log_File(session_name, process_result.out_lines).
+              parse_session_info(
+                command_timings = true,
+                theory_timings = true,
+                ml_statistics = true,
+                task_statistics = true)
+
+          // write log file
+          if (process_result.ok) {
+            File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines))
+          }
+          else File.write(store.output_log(session_name), terminate_lines(log_lines))
 
-        if (process_result.ok) {
-          val props = build_log.session_timing
-          val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
-          val timing = Markup.Timing_Properties.get(props)
-          progress.echo(
-            "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")",
-            verbose = true)
-          progress.echo(
-            "Finished " + session_name + " (" + process_result.timing.message_resources + ")")
+          // write database
+          using(store.open_database(session_name, output = true, server = server))(db =>
+            store.write_session_info(db, session_name, session_sources,
+              build_log =
+                if (process_result.timeout) build_log.error("Timeout") else build_log,
+              build =
+                Store.Build_Info(
+                  sources = sources_shasum,
+                  input_heaps = input_shasum,
+                  output_heap = output_shasum,
+                  process_result.rc,
+                  build_context.build_uuid)))
+
+          // messages
+          process_result.err_lines.foreach(progress.echo(_))
+
+          if (process_result.ok) {
+            val props = build_log.session_timing
+            val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
+            val timing = Markup.Timing_Properties.get(props)
+            progress.echo(
+              "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")",
+              verbose = true)
+            progress.echo(
+              "Finished " + session_name + " (" + process_result.timing.message_resources + ")")
+          }
+          else {
+            progress.echo(
+              session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")")
+            if (!process_result.interrupted) {
+              val tail = info.options.int("process_output_tail")
+              val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)
+              val prefix = if (log_lines.length == suffix.length) Nil else List("...")
+              progress.echo(Library.trim_line(cat_lines(prefix ::: suffix)))
+            }
+          }
+
+          (process_result.copy(out_lines = log_lines), output_shasum)
         }
-        else {
-          progress.echo(
-            session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")")
-          if (!process_result.interrupted) {
-            val tail = info.options.int("process_output_tail")
-            val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)
-            val prefix = if (log_lines.length == suffix.length) Nil else List("...")
-            progress.echo(Library.trim_line(cat_lines(prefix ::: suffix)))
-          }
-        }
-
-        (process_result.copy(out_lines = log_lines), output_shasum)
       }
 
     override def cancel(): Unit = future_result.cancel()
--- a/src/Pure/Tools/build_process.scala	Sun Jul 16 19:38:12 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Sun Jul 16 21:01:33 2023 +0200
@@ -128,7 +128,11 @@
       make(data(dom).foldLeft(graph.restrict(dom)) { case (g, e) => g.new_node(e.name, e) })
     }
 
-    def init(build_context: Context, progress: Progress = new Progress): Sessions = {
+    def init(
+      build_context: Context,
+      progress: Progress = new Progress,
+      server: SSH.Server = SSH.no_server
+    ): Sessions = {
       val sessions_structure = build_context.sessions_structure
       make(
         sessions_structure.build_graph.iterator.foldLeft(graph) {
@@ -167,7 +171,7 @@
               val session =
                 Build_Job.Session_Context.load(
                   build_context.build_uuid, name, deps, ancestors, prefs, sources_shasum,
-                  info.timeout, build_context.store, progress = progress)
+                  info.timeout, build_context.store, progress = progress, server = server)
               graph0.new_node(name, session)
             }
         }
@@ -835,7 +839,8 @@
 
 class Build_Process(
   protected final val build_context: Build_Process.Context,
-  protected final val build_progress: Progress
+  protected final val build_progress: Progress,
+  protected final val server: SSH.Server
 )
 extends AutoCloseable {
   /* context */
@@ -850,12 +855,12 @@
   /* progress backed by database */
 
   private val _database_server: Option[SQL.Database] =
-    try { store.maybe_open_database_server() }
+    try { store.maybe_open_database_server(server = server) }
     catch { case exn: Throwable => close(); throw exn }
 
   private val _build_database: Option[SQL.Database] =
     try {
-      for (db <- store.maybe_open_build_database()) yield {
+      for (db <- store.maybe_open_build_database(server = server)) yield {
         val store_tables = if (db.is_postgresql) Store.Data.tables else SQL.Tables.empty
         Build_Process.Data.transaction_lock(db,
           create = true,
@@ -871,7 +876,7 @@
     catch { case exn: Throwable => close(); throw exn }
 
   private val _host_database: Option[SQL.Database] =
-    try { store.maybe_open_build_database(path = Host.Data.database) }
+    try { store.maybe_open_build_database(path = Host.Data.database, server = server) }
     catch { case exn: Throwable => close(); throw exn }
 
   protected val (progress, worker_uuid) = synchronized {
@@ -879,7 +884,7 @@
       case None => (build_progress, UUID.random().toString)
       case Some(db) =>
         try {
-          val progress_db = store.open_build_database(Progress.Data.database)
+          val progress_db = store.open_build_database(Progress.Data.database, server = server)
           val progress =
             new Database_Progress(progress_db, build_progress,
               hostname = hostname,
@@ -966,7 +971,7 @@
       state.sessions.iterator.exists(_.ancestors.contains(session_name))
 
     val (current, output_shasum) =
-      store.check_output(session_name,
+      store.check_output(server, session_name,
         session_options = build_context.sessions_structure(session_name).options,
         sources_shasum = sources_shasum,
         input_shasum = input_shasum,
@@ -1018,7 +1023,7 @@
       val session = state.sessions(session_name)
 
       val build =
-        Build_Job.start_session(build_context, session, progress, log, _database_server,
+        Build_Job.start_session(build_context, session, progress, log, server,
           build_deps.background(session_name), sources_shasum, input_shasum, node_info, store_heap)
 
       val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))