merged
authorwenzelm
Mon, 17 Jul 2023 21:41:14 +0200
changeset 78387 7ecf0ee4ce9f
parent 78338 c4cc276821d4 (current diff)
parent 78386 ee588c4b5557 (diff)
child 78388 475600ef98b8
child 78458 b221d12978ee
merged
--- a/etc/options	Sun Jul 16 11:04:59 2023 +0100
+++ b/etc/options	Mon Jul 17 21:41:14 2023 +0200
@@ -198,6 +198,9 @@
 option build_database_slice : real = 300
   -- "slice size in MiB for ML heap stored within database"
 
+option build_delay : real = 0.2
+  -- "delay build process main loop"
+
 
 section "Editor Session"
 
--- a/src/Pure/Admin/build_log.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Admin/build_log.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -816,7 +816,7 @@
   def store(options: Options, cache: XML.Cache = XML.Cache.make()): Store =
     new Store(options, cache)
 
-  class Store private[Build_Log](options: Options, val cache: XML.Cache) {
+  class Store private[Build_Log](val options: Options, val cache: XML.Cache) {
     override def toString: String = {
       val s =
         Exn.capture { open_database() } match {
@@ -829,23 +829,16 @@
       "Store(" + s + ")"
     }
 
-    def open_database(
-      user: String = options.string("build_log_database_user"),
-      password: String = options.string("build_log_database_password"),
-      database: String = options.string("build_log_database_name"),
-      host: String = options.string("build_log_database_host"),
-      port: Int = options.int("build_log_database_port"),
-      ssh_host: String = options.string("build_log_ssh_host"),
-      ssh_user: String = options.string("build_log_ssh_user"),
-      ssh_port: Int = options.int("build_log_ssh_port")
-    ): PostgreSQL.Database = {
-      PostgreSQL.open_database(
-        user = user, password = password, database = database, host = host, port = port,
-        ssh =
-          if (ssh_host == "") None
-          else Some(SSH.open_session(options, host = ssh_host, user = ssh_user, port = ssh_port)),
-        ssh_close = true)
-    }
+    def open_database(server: SSH.Server = SSH.no_server): PostgreSQL.Database =
+      PostgreSQL.open_database_server(options, server = server,
+        user = options.string("build_log_database_user"),
+        password = options.string("build_log_database_password"),
+        database = options.string("build_log_database_name"),
+        host = options.string("build_log_database_host"),
+        port = options.int("build_log_database_port"),
+        ssh_host = options.string("build_log_ssh_host"),
+        ssh_port = options.int("build_log_ssh_port"),
+        ssh_user = options.string("build_log_ssh_user"))
 
     def snapshot_database(
       db: PostgreSQL.Database,
@@ -890,12 +883,13 @@
               db2.using_statement(table.insert()) { stmt2 =>
                 db.using_statement(
                   Data.recent_pull_date_table(days, afp_rev = afp_rev).query) { stmt =>
-                  val res = stmt.execute_query()
-                  while (res.next()) {
-                    for ((c, i) <- table.columns.zipWithIndex) {
-                      stmt2.string(i + 1) = res.get_string(c)
+                  using(stmt.execute_query()) { res =>
+                    while (res.next()) {
+                      for ((c, i) <- table.columns.zipWithIndex) {
+                        stmt2.string(i + 1) = res.get_string(c)
+                      }
+                      stmt2.execute()
                     }
-                    stmt2.execute()
                   }
                 }
               }
--- a/src/Pure/Admin/build_status.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Admin/build_status.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -259,90 +259,91 @@
         progress.echo(sql, verbose = true)
 
         db.using_statement(sql) { stmt =>
-          val res = stmt.execute_query()
-          while (res.next()) {
-            val session_name = res.string(Build_Log.Data.session_name)
-            val chapter = res.string(Build_Log.Data.chapter)
-            val groups = split_lines(res.string(Build_Log.Data.groups))
-            val threads = {
-              val threads1 =
-                res.string(Build_Log.Settings.ISABELLE_BUILD_OPTIONS) match {
-                  case Threads_Option(Value.Int(i)) => i
-                  case _ => 1
-                }
-              val threads2 = res.get_int(Build_Log.Data.threads).getOrElse(1)
-              threads1 max threads2
-            }
-            val ml_platform = res.string(Build_Log.Settings.ML_PLATFORM)
-            val ml_platform_64 =
-              ml_platform.startsWith("x86_64-") || ml_platform.startsWith("arm64-")
-            val data_name =
-              profile.description +
-                (if (ml_platform_64) ", 64bit" else "") +
-                (if (threads == 1) "" else ", " + threads + " threads")
+          using(stmt.execute_query()) { res =>
+            while (res.next()) {
+              val session_name = res.string(Build_Log.Data.session_name)
+              val chapter = res.string(Build_Log.Data.chapter)
+              val groups = split_lines(res.string(Build_Log.Data.groups))
+              val threads = {
+                val threads1 =
+                  res.string(Build_Log.Settings.ISABELLE_BUILD_OPTIONS) match {
+                    case Threads_Option(Value.Int(i)) => i
+                    case _ => 1
+                  }
+                val threads2 = res.get_int(Build_Log.Data.threads).getOrElse(1)
+                threads1 max threads2
+              }
+              val ml_platform = res.string(Build_Log.Settings.ML_PLATFORM)
+              val ml_platform_64 =
+                ml_platform.startsWith("x86_64-") || ml_platform.startsWith("arm64-")
+              val data_name =
+                profile.description +
+                  (if (ml_platform_64) ", 64bit" else "") +
+                  (if (threads == 1) "" else ", " + threads + " threads")
 
-            res.get_string(Build_Log.Prop.build_host).foreach(host =>
-              data_hosts += (data_name -> (get_hosts(data_name) + host)))
+              res.get_string(Build_Log.Prop.build_host).foreach(host =>
+                data_hosts += (data_name -> (get_hosts(data_name) + host)))
 
-            data_stretch += (data_name -> profile.stretch(options))
+              data_stretch += (data_name -> profile.stretch(options))
 
-            val isabelle_version = res.string(Build_Log.Prop.isabelle_version)
-            val afp_version = res.string(Build_Log.Prop.afp_version)
+              val isabelle_version = res.string(Build_Log.Prop.isabelle_version)
+              val afp_version = res.string(Build_Log.Prop.afp_version)
 
-            val ml_stats =
-              ML_Statistics(
-                if (ml_statistics) {
-                  Properties.uncompress(res.bytes(Build_Log.Data.ml_statistics), cache = store.cache)
-                }
-                else Nil,
-                domain = ml_statistics_domain,
-                heading = session_name + print_version(isabelle_version, afp_version, chapter))
+              val ml_stats =
+                ML_Statistics(
+                  if (ml_statistics) {
+                    Properties.uncompress(res.bytes(Build_Log.Data.ml_statistics), cache = store.cache)
+                  }
+                  else Nil,
+                  domain = ml_statistics_domain,
+                  heading = session_name + print_version(isabelle_version, afp_version, chapter))
 
-            val entry =
-              Entry(
-                chapter = chapter,
-                pull_date = res.date(Build_Log.Data.pull_date(afp = false)),
-                afp_pull_date =
-                  if (afp) res.get_date(Build_Log.Data.pull_date(afp = true)) else None,
-                isabelle_version = isabelle_version,
-                afp_version = afp_version,
-                timing =
-                  res.timing(
-                    Build_Log.Data.timing_elapsed,
-                    Build_Log.Data.timing_cpu,
-                    Build_Log.Data.timing_gc),
-                ml_timing =
-                  res.timing(
-                    Build_Log.Data.ml_timing_elapsed,
-                    Build_Log.Data.ml_timing_cpu,
-                    Build_Log.Data.ml_timing_gc),
-                maximum_code = Space.B(ml_stats.maximum(ML_Statistics.CODE_SIZE)),
-                average_code = Space.B(ml_stats.average(ML_Statistics.CODE_SIZE)),
-                maximum_stack = Space.B(ml_stats.maximum(ML_Statistics.STACK_SIZE)),
-                average_stack = Space.B(ml_stats.average(ML_Statistics.STACK_SIZE)),
-                maximum_heap = Space.B(ml_stats.maximum(ML_Statistics.HEAP_SIZE)),
-                average_heap = Space.B(ml_stats.average(ML_Statistics.HEAP_SIZE)),
-                stored_heap = Space.bytes(res.long(Build_Log.Data.heap_size)),
-                status = Build_Log.Session_Status.withName(res.string(Build_Log.Data.status)),
-                errors =
-                  Build_Log.uncompress_errors(
-                    res.bytes(Build_Log.Data.errors), cache = store.cache))
+              val entry =
+                Entry(
+                  chapter = chapter,
+                  pull_date = res.date(Build_Log.Data.pull_date(afp = false)),
+                  afp_pull_date =
+                    if (afp) res.get_date(Build_Log.Data.pull_date(afp = true)) else None,
+                  isabelle_version = isabelle_version,
+                  afp_version = afp_version,
+                  timing =
+                    res.timing(
+                      Build_Log.Data.timing_elapsed,
+                      Build_Log.Data.timing_cpu,
+                      Build_Log.Data.timing_gc),
+                  ml_timing =
+                    res.timing(
+                      Build_Log.Data.ml_timing_elapsed,
+                      Build_Log.Data.ml_timing_cpu,
+                      Build_Log.Data.ml_timing_gc),
+                  maximum_code = Space.B(ml_stats.maximum(ML_Statistics.CODE_SIZE)),
+                  average_code = Space.B(ml_stats.average(ML_Statistics.CODE_SIZE)),
+                  maximum_stack = Space.B(ml_stats.maximum(ML_Statistics.STACK_SIZE)),
+                  average_stack = Space.B(ml_stats.average(ML_Statistics.STACK_SIZE)),
+                  maximum_heap = Space.B(ml_stats.maximum(ML_Statistics.HEAP_SIZE)),
+                  average_heap = Space.B(ml_stats.average(ML_Statistics.HEAP_SIZE)),
+                  stored_heap = Space.bytes(res.long(Build_Log.Data.heap_size)),
+                  status = Build_Log.Session_Status.withName(res.string(Build_Log.Data.status)),
+                  errors =
+                    Build_Log.uncompress_errors(
+                      res.bytes(Build_Log.Data.errors), cache = store.cache))
 
-            val sessions = data_entries.getOrElse(data_name, Map.empty)
-            val session =
-              sessions.get(session_name) match {
-                case None =>
-                  Session(session_name, threads, List(entry), ml_stats, entry.date)
-                case Some(old) =>
-                  val (ml_stats1, ml_stats1_date) =
-                    if (entry.date > old.ml_statistics_date) (ml_stats, entry.date)
-                    else (old.ml_statistics, old.ml_statistics_date)
-                  Session(session_name, threads, entry :: old.entries, ml_stats1, ml_stats1_date)
+              val sessions = data_entries.getOrElse(data_name, Map.empty)
+              val session =
+                sessions.get(session_name) match {
+                  case None =>
+                    Session(session_name, threads, List(entry), ml_stats, entry.date)
+                  case Some(old) =>
+                    val (ml_stats1, ml_stats1_date) =
+                      if (entry.date > old.ml_statistics_date) (ml_stats, entry.date)
+                      else (old.ml_statistics, old.ml_statistics_date)
+                    Session(session_name, threads, entry :: old.entries, ml_stats1, ml_stats1_date)
+                }
+
+              if ((!afp || chapter == AFP.chapter) &&
+                  (!profile.bulky || groups.exists(AFP.groups_bulky.toSet))) {
+                data_entries += (data_name -> (sessions + (session_name -> session)))
               }
-
-            if ((!afp || chapter == AFP.chapter) &&
-                (!profile.bulky || groups.exists(AFP.groups_bulky.toSet))) {
-              data_entries += (data_name -> (sessions + (session_name -> session)))
             }
           }
         }
--- a/src/Pure/General/sql.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/General/sql.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -2,6 +2,8 @@
     Author:     Makarius
 
 Support for SQL databases: SQLite and PostgreSQL.
+
+See https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/Connection.html
 */
 
 package isabelle
@@ -240,10 +242,11 @@
     def iterator: Iterator[Table] = list.iterator
 
     // requires transaction
-    def lock(db: Database, create: Boolean = false): Unit = {
+    def lock(db: Database, create: Boolean = false): Boolean = {
       if (create) foreach(db.create_table(_))
       val sql = db.lock_tables(list)
-      if (sql.nonEmpty) db.execute_statement(sql)
+      if (sql.nonEmpty) { db.execute_statement(sql); true }
+      else false
     }
   }
 
@@ -253,10 +256,10 @@
     def transaction_lock[A](
       db: Database,
       create: Boolean = false,
-      synchronized: Boolean = false,
+      label: String = "",
+      log: Logger = new System_Logger
     )(body: => A): A = {
-      def run: A = db.transaction { tables.lock(db, create = create); body }
-      if (synchronized) db.synchronized { run } else run
+      db.transaction_lock(tables, create = create, label = label, log = log)(body)
     }
 
     def make_table(columns: List[Column], body: String = "", name: String = ""): Table = {
@@ -323,13 +326,13 @@
     def execute(): Boolean = rep.execute()
     def execute_query(): Result = new Result(this, rep.executeQuery())
 
-    def close(): Unit = rep.close()
+    override def close(): Unit = rep.close()
   }
 
 
   /* results */
 
-  class Result private[SQL](val stmt: Statement, val rep: ResultSet) {
+  class Result private[SQL](val stmt: Statement, val rep: ResultSet) extends AutoCloseable {
     res =>
 
     def next(): Boolean = rep.next()
@@ -368,6 +371,8 @@
     def get_string(column: Column): Option[String] = get(column, string)
     def get_bytes(column: Column): Option[Bytes] = get(column, bytes)
     def get_date(column: Column): Option[Date] = get(column, date)
+
+    override def close(): Unit = rep.close()
   }
 
 
@@ -431,25 +436,66 @@
       }
       else None
 
-    def close(): Unit = connection.close()
+    override def close(): Unit = connection.close()
 
-    def transaction[A](body: => A): A = {
-      val auto_commit = connection.getAutoCommit()
+    def transaction[A](body: => A): A = connection.synchronized {
+      require(connection.getAutoCommit(), "transaction already active")
       try {
         connection.setAutoCommit(false)
-        val savepoint = connection.setSavepoint()
         try {
           val result = body
           connection.commit()
           result
         }
-        catch { case exn: Throwable => connection.rollback(savepoint); throw exn }
+        catch { case exn: Throwable => connection.rollback(); throw exn }
       }
-      finally { connection.setAutoCommit(auto_commit) }
+      finally { connection.setAutoCommit(true) }
     }
 
-    def transaction_lock[A](tables: Tables, create: Boolean = false)(body: => A): A =
-      transaction { tables.lock(db, create = create); body }
+    def transaction_lock[A](
+      tables: Tables,
+      create: Boolean = false,
+      label: String = "",
+      log: Logger = new System_Logger
+    )(body: => A): A = {
+      val prop = "isabelle.transaction_trace"
+      val trace_min =
+        System.getProperty(prop, "") match {
+          case Value.Seconds(t) => t
+          case "true" => Time.min
+          case "false" | "" => Time.max
+          case s => error("Bad system property " + prop + ": " + quote(s))
+        }
+
+      val trace_count = - SQL.transaction_count()
+      val trace_start = Time.now()
+      var trace_nl = false
+
+      def trace(msg: String): Unit = {
+        val trace_time = Time.now() - trace_start
+        if (trace_time >= trace_min) {
+          val nl = if (trace_nl) "" else { trace_nl = true; "\n" }
+          log(nl + trace_time + " transaction " + trace_count +
+            if_proper(label, " " + label) + ": " + msg)
+        }
+      }
+
+      try {
+        val res =
+          transaction {
+            trace("begin")
+            if (tables.lock(db, create = create)) {
+              trace("locked " + commas_quote(tables.list.map(_.name)))
+            }
+            val res = Exn.capture { body }
+            trace("end")
+            res
+          }
+        trace("commit")
+        Exn.release(res)
+      }
+      catch { case exn: Throwable => trace("crash"); throw exn }
+    }
 
     def lock_tables(tables: List[Table]): Source = ""  // PostgreSQL only
 
@@ -469,13 +515,17 @@
       sql: Source,
       make_result: Iterator[A] => B,
       get: Result => A
-    ): B = using_statement(sql)(stmt => make_result(stmt.execute_query().iterator(get)))
+    ): B = {
+      using_statement(sql) { stmt =>
+        using(stmt.execute_query()) { res => make_result(res.iterator(get)) }
+      }
+    }
 
     def execute_query_statementO[A](sql: Source, get: Result => A): Option[A] =
       execute_query_statement[A, Option[A]](sql, _.nextOption, get)
 
     def execute_query_statementB(sql: Source): Boolean =
-      using_statement(sql)(stmt => stmt.execute_query().next())
+      using_statement(sql)(stmt => using(stmt.execute_query())(_.next()))
 
     def update_date(stmt: Statement, i: Int, date: Date): Unit
     def date(res: Result, column: Column): Date
@@ -485,13 +535,23 @@
 
     /* tables and views */
 
-    def tables: List[String] = {
+    def get_tables(pattern: String = "%"): List[String] = {
       val result = new mutable.ListBuffer[String]
-      val rs = connection.getMetaData.getTables(null, null, "%", null)
+      val rs = connection.getMetaData.getTables(null, null, pattern, null)
       while (rs.next) { result += rs.getString(3) }
       result.toList
     }
 
+    def exists_table(name: String): Boolean = {
+      val escape = connection.getMetaData.getSearchStringEscape
+      val pattern =
+        name.iterator.map(c =>
+          (if (c == '_' || c == '%' || c == escape(0)) escape else "") + c).mkString
+      get_tables(pattern = pattern).nonEmpty
+    }
+
+    def exists_table(table: Table): Boolean = exists_table(table.name)
+
     def create_table(table: Table, strict: Boolean = false, sql: Source = ""): Unit = {
       execute_statement(table.create(strict, sql_type) + SQL.separate(sql))
       if (is_postgresql) {
@@ -507,11 +567,14 @@
       execute_statement(table.create_index(name, columns, strict, unique))
 
     def create_view(table: Table, strict: Boolean = false): Unit = {
-      if (strict || !tables.contains(table.name)) {
+      if (strict || !exists_table(table)) {
         execute_statement("CREATE VIEW " + table + " AS " + { table.query; table.body })
       }
     }
   }
+
+
+  private val transaction_count = Counter.make()
 }
 
 
@@ -572,62 +635,89 @@
 
 /** PostgreSQL **/
 
+// see https://www.postgresql.org/docs/current/index.html
+// see https://jdbc.postgresql.org/documentation
+
 object PostgreSQL {
   type Source = SQL.Source
 
-  val default_port = 5432
+  lazy val init_jdbc: Unit = Class.forName("org.postgresql.Driver")
 
-  lazy val init_jdbc: Unit = Class.forName("org.postgresql.Driver")
+  val default_server: SSH.Server = SSH.local_server(port = 5432)
 
   def open_database(
     user: String,
     password: String,
     database: String = "",
-    host: String = "",
-    port: Int = 0,
-    ssh: Option[SSH.Session] = None,
-    ssh_close: Boolean = false,
-    // see https://www.postgresql.org/docs/current/transaction-iso.html
-    transaction_isolation: Int = Connection.TRANSACTION_SERIALIZABLE
+    server: SSH.Server = default_server,
+    server_close: Boolean = false,
   ): Database = {
     init_jdbc
 
-    if (user == "") error("Undefined database user")
+    if (user.isEmpty) error("Undefined database user")
+    if (server.host.isEmpty) error("Undefined database server host")
+    if (server.port <= 0) error("Undefined database server port")
+
+    val name = proper_string(database) getOrElse user
+    val url = "jdbc:postgresql://" + server.host + ":" + server.port + "/" + name
+    val ssh = server.ssh_system.ssh_session
+    val print = user + "@" + server + "/" + name + if_proper(ssh, " via ssh " + ssh.get)
 
-    val db_host = proper_string(host) getOrElse "localhost"
-    val db_port = if (port > 0 && port != default_port) ":" + port else ""
-    val db_name = "/" + (proper_string(database) getOrElse user)
+    val connection = DriverManager.getConnection(url, user, password)
+    new Database(connection, print, server, server_close)
+  }
+
+  def open_server(
+    options: Options,
+    host: String = "",
+    port: Int = 0,
+    ssh_host: String = "",
+    ssh_port: Int = 0,
+    ssh_user: String = ""
+  ): SSH.Server = {
+    val server_host = proper_string(host).getOrElse(default_server.host)
+    val server_port = if (port > 0) port else default_server.port
 
-    val (url, name, port_forwarding) =
-      ssh match {
-        case None =>
-          val spec = db_host + db_port + db_name
-          val url = "jdbc:postgresql://" + spec
-          val name = user + "@" + spec
-          (url, name, None)
-        case Some(ssh) =>
-          val fw =
-            ssh.port_forwarding(remote_host = db_host,
-              remote_port = if (port > 0) port else default_port,
-              ssh_close = ssh_close)
-          val url = "jdbc:postgresql://localhost:" + fw.port + db_name
-          val name = user + "@" + fw + db_name + " via ssh " + ssh
-          (url, name, Some(fw))
+    if (ssh_host.isEmpty) SSH.local_server(host = server_host, port = server_port)
+    else {
+      SSH.open_server(options, host = ssh_host, port = ssh_port, user = ssh_user,
+        remote_host = server_host, remote_port = server_port)
+    }
+  }
+
+  def open_database_server(
+    options: Options,
+    user: String = "",
+    password: String = "",
+    database: String = "",
+    server: SSH.Server = SSH.no_server,
+    host: String = "",
+    port: Int = 0,
+    ssh_host: String = "",
+    ssh_port: Int = 0,
+    ssh_user: String = ""
+  ): PostgreSQL.Database = {
+    val db_server =
+      if (server.defined) server
+      else {
+        open_server(options, host = host, port = port, ssh_host = ssh_host,
+          ssh_port = ssh_port, ssh_user = ssh_user)
       }
+    val server_close = !server.defined
     try {
-      val connection = DriverManager.getConnection(url, user, password)
-      connection.setTransactionIsolation(transaction_isolation)
-      new Database(name, connection, port_forwarding)
+      open_database(user = user, password = password, database = database,
+        server = db_server, server_close = server_close)
     }
-    catch { case exn: Throwable => port_forwarding.foreach(_.close()); throw exn }
+    catch { case exn: Throwable if server_close => db_server.close(); throw exn }
   }
 
   class Database private[PostgreSQL](
-    name: String,
     val connection: Connection,
-    port_forwarding: Option[SSH.Port_Forwarding]
+    print: String,
+    server: SSH.Server,
+    server_close: Boolean
   ) extends SQL.Database {
-    override def toString: String = name
+    override def toString: String = print
 
     override def now(): Date = {
       val now = SQL.Column.date("now")
@@ -678,6 +768,6 @@
       }
 
 
-    override def close(): Unit = { super.close(); port_forwarding.foreach(_.close()) }
+    override def close(): Unit = { super.close(); if (server_close) server.close() }
   }
 }
--- a/src/Pure/General/ssh.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/General/ssh.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -105,7 +105,7 @@
   ) extends System {
     ssh =>
 
-    override def is_local: Boolean = false
+    override def ssh_session: Option[Session] = Some(ssh)
 
     def port_suffix: String = if (port > 0) ":" + port else ""
     def user_prefix: String = if (user.nonEmpty) user + "@" else ""
@@ -314,18 +314,18 @@
     }
 
 
-    /* port forwarding */
+    /* open server on remote host */
 
-    def port_forwarding(
-      remote_port: Int,
+    def open_server(
+      remote_port: Int = 0,
       remote_host: String = "localhost",
       local_port: Int = 0,
       local_host: String = "localhost",
       ssh_close: Boolean = false
-    ): Port_Forwarding = {
-      val port = if (local_port > 0) local_port else Isabelle_System.local_port()
-
-      val forward = List(local_host, port, remote_host, remote_port).mkString(":")
+    ): Server = {
+      val forward_host = local_host
+      val forward_port = if (local_port > 0) local_port else Isabelle_System.local_port()
+      val forward = List(forward_host, forward_port, remote_host, remote_port).mkString(":")
       val forward_option = "-L " + Bash.string(forward)
 
       val cancel: () => Unit =
@@ -335,7 +335,7 @@
         }
         else {
           val result = Synchronized[Exn.Result[Boolean]](Exn.Res(false))
-          val thread = Isabelle_Thread.fork("port_forwarding") {
+          val thread = Isabelle_Thread.fork("ssh_server") {
             val opts =
               forward_option +
                 " " + Config.option("SessionType", "none") +
@@ -357,7 +357,7 @@
       val shutdown_hook =
         Isabelle_System.create_shutdown_hook { cancel() }
 
-      new Port_Forwarding(host, port, remote_host, remote_port) {
+      new Server(forward_host, forward_port, ssh) {
         override def toString: String = forward
         override def close(): Unit = {
           cancel()
@@ -368,12 +368,49 @@
     }
   }
 
-  abstract class Port_Forwarding private[SSH](
+
+  /* server port forwarding */
+
+  def open_server(
+    options: Options,
+    host: String,
+    port: Int = 0,
+    user: String = "",
+    remote_port: Int = 0,
+    remote_host: String = "localhost",
+    local_port: Int = 0,
+    local_host: String = "localhost"
+  ): Server = {
+    val ssh = open_session(options, host, port = port, user = user)
+    try {
+      ssh.open_server(remote_port = remote_port, remote_host = remote_host,
+        local_port = local_port, local_host = local_host, ssh_close = true)
+    }
+    catch { case exn: Throwable => ssh.close(); throw exn }
+  }
+
+  def local_server(port: Int = 0, host: String = "localhost"): Server =
+    new Local_Server(host, port)
+
+  val no_server: Server = new No_Server
+
+  class Server private[SSH](
     val host: String,
     val port: Int,
-    val remote_host: String,
-    val remote_port: Int
-  ) extends AutoCloseable
+    val ssh_system: System
+  ) extends AutoCloseable {
+    def defined: Boolean = host.nonEmpty && port > 0
+    override def close(): Unit = ()
+  }
+
+  class Local_Server private[SSH](host: String, port: Int)
+  extends Server(host, port, Local) {
+    override def toString: String = if_proper(host, host + ":") + port
+  }
+
+  class No_Server extends Server("", 0, Local) {
+    override def toString: String = "0"
+  }
 
 
   /* system operations */
@@ -389,7 +426,8 @@
   }
 
   trait System extends AutoCloseable {
-    def is_local: Boolean
+    def ssh_session: Option[Session]
+    def is_local: Boolean = ssh_session.isEmpty
 
     def close(): Unit = ()
 
@@ -449,6 +487,6 @@
   }
 
   object Local extends System {
-    override def is_local: Boolean = true
+    override def ssh_session: Option[Session] = None
   }
 }
--- a/src/Pure/General/time.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/General/time.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -20,6 +20,8 @@
   def now(): Time = ms(System.currentTimeMillis())
 
   val zero: Time = ms(0)
+  val min: Time = ms(Long.MinValue)
+  val max: Time = ms(Long.MaxValue)
 
   def print_seconds(s: Double): String =
     String.format(Locale.ROOT, "%.3f", s.asInstanceOf[AnyRef])
--- a/src/Pure/ML/ml_heap.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/ML/ml_heap.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -123,12 +123,12 @@
   }
 
   def clean_entry(db: SQL.Database, session_name: String): Unit =
-    Data.transaction_lock(db, create = true, synchronized = true) {
+    Data.transaction_lock(db, create = true, label = "ML_Heap.clean_entry") {
       Data.clean_entry(db, session_name)
     }
 
   def get_entry(db: SQL.Database, session_name: String): Option[SHA1.Digest] =
-    Data.transaction_lock(db, create = true, synchronized = true) {
+    Data.transaction_lock(db, create = true, label = "ML_Heap.get_entry") {
       Data.get_entry(db, session_name)
     }
 
@@ -149,7 +149,7 @@
         val step = (size.toDouble / slices.toDouble).ceil.toLong
 
         try {
-          Data.transaction_lock(db, create = true, synchronized = true) {
+          Data.transaction_lock(db, create = true, label = "ML_Heap.store1") {
             Data.prepare_entry(db, session_name)
           }
 
@@ -160,17 +160,17 @@
             val content =
               Bytes.read_file(heap.file, offset = offset, limit = limit)
                 .compress(cache = cache)
-            Data.transaction_lock(db, synchronized = true) {
+            Data.transaction_lock(db, label = "ML_Heap.store2") {
               Data.write_entry(db, session_name, i, content)
             }
           }
 
-          Data.transaction_lock(db, synchronized = true) {
+          Data.transaction_lock(db, label = "ML_Heap.store3") {
             Data.finish_entry(db, session_name, size, digest)
           }
         }
         catch { case exn: Throwable =>
-          Data.transaction_lock(db, create = true, synchronized = true) {
+          Data.transaction_lock(db, create = true, label = "ML_Heap.store4") {
             Data.clean_entry(db, session_name)
           }
           throw exn
@@ -188,7 +188,7 @@
     database match {
       case None =>
       case Some(db) =>
-        Data.transaction_lock(db, create = true, synchronized = true) {
+        Data.transaction_lock(db, create = true, label = "ML_Heap.restore") {
           val db_digest = Data.get_entry(db, session_name)
           val file_digest = read_file_digest(heap)
 
--- a/src/Pure/System/host.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/System/host.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -134,7 +134,7 @@
     else {
       val available = available_nodes.zipWithIndex
       val used = used_nodes
-      Data.transaction_lock(db, create = true) {
+      Data.transaction_lock(db, create = true, label = "Host.next_numa_node") {
         val numa_next = Data.read_numa_next(db, hostname)
         val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0)
         val candidates = available.drop(numa_index) ::: available.take(numa_index)
--- a/src/Pure/System/progress.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/System/progress.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -253,13 +253,13 @@
   database_progress =>
 
   private var _agent_uuid: String = ""
-  private var _context: Long = 0
+  private var _context: Long = -1
   private var _seen: Long = 0
 
   def agent_uuid: String = synchronized { _agent_uuid }
 
   private def init(): Unit = synchronized {
-    Progress.Data.transaction_lock(db, create = true) {
+    Progress.Data.transaction_lock(db, create = true, label = "Database_Progress.init") {
       Progress.Data.read_progress_context(db, context_uuid) match {
         case Some(context) =>
           _context = context
@@ -296,7 +296,7 @@
 
   def exit(close: Boolean = false): Unit = synchronized {
     if (_context > 0) {
-      Progress.Data.transaction_lock(db) {
+      Progress.Data.transaction_lock(db, label = "Database_Progress.exit") {
         Progress.Data.update_agent(db, _agent_uuid, _seen, stop_now = true)
       }
       _context = 0
@@ -305,8 +305,10 @@
   }
 
   private def sync_database[A](body: => A): A = synchronized {
-    require(_context > 0)
-    Progress.Data.transaction_lock(db) {
+    if (_context < 0) throw new IllegalStateException("Database_Progress before init")
+    if (_context == 0) throw new IllegalStateException("Database_Progress after exit")
+
+    Progress.Data.transaction_lock(db, label = "Database_Progress.sync") {
       val stopped_db = Progress.Data.read_progress_stopped(db, _context)
       val stopped = base_progress.stopped
 
--- a/src/Pure/Thy/browser_info.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Thy/browser_info.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -671,12 +671,13 @@
     store: Store,
     deps: Sessions.Deps,
     sessions: List[String],
-    progress: Progress = new Progress
+    progress: Progress = new Progress,
+    server: SSH.Server = SSH.no_server
   ): Unit = {
     val root_dir = browser_info.presentation_dir(store).absolute
     progress.echo("Presentation in " + root_dir)
 
-    using(Export.open_database_context(store)) { database_context =>
+    using(Export.open_database_context(store, server = server)) { database_context =>
       val context0 = context(deps.sessions_structure, root_dir = root_dir)
 
       val sessions1 =
--- a/src/Pure/Thy/document_build.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Thy/document_build.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -104,13 +104,19 @@
   }
 
   def clean_session(db: SQL.Database, session_name: String): Unit =
-    Data.transaction_lock(db, create = true) { Data.clean_session(db, session_name) }
+    Data.transaction_lock(db, create = true, label = "Document_Build.clean_session") {
+      Data.clean_session(db, session_name)
+    }
 
   def read_document(db: SQL.Database, session_name: String, name: String): Option[Document_Output] =
-    Data.transaction_lock(db) { Data.read_document(db, session_name, name) }
+    Data.transaction_lock(db, label = "Document_Build.read_document") {
+      Data.read_document(db, session_name, name)
+    }
 
   def write_document(db: SQL.Database, session_name: String, doc: Document_Output): Unit =
-    Data.transaction_lock(db) { Data.write_document(db, session_name, doc) }
+    Data.transaction_lock(db, label = "Document_Build.write_document") {
+      Data.write_document(db, session_name, doc)
+    }
 
 
   /* background context */
--- a/src/Pure/Thy/export.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Thy/export.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -204,16 +204,24 @@
   }
 
   def clean_session(db: SQL.Database, session_name: String): Unit =
-    Data.transaction_lock(db, create = true) { Data.clean_session(db, session_name) }
+    Data.transaction_lock(db, create = true, label = "Export.clean_session") {
+      Data.clean_session(db, session_name)
+    }
 
   def read_theory_names(db: SQL.Database, session_name: String): List[String] =
-    Data.transaction_lock(db) { Data.read_theory_names(db, session_name) }
+    Data.transaction_lock(db, label = "Export.read_theory_names") {
+      Data.read_theory_names(db, session_name)
+    }
 
   def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] =
-    Data.transaction_lock(db) { Data.read_entry_names(db, session_name) }
+    Data.transaction_lock(db, label = "Export.read_entry_names") {
+      Data.read_entry_names(db, session_name)
+    }
 
   def read_entry(db: SQL.Database, entry_name: Entry_Name, cache: XML.Cache): Option[Entry] =
-    Data.transaction_lock(db) { Data.read_entry(db, entry_name, cache) }
+    Data.transaction_lock(db, label = "Export.read_entry") {
+      Data.read_entry(db, entry_name, cache)
+    }
 
 
   /* database consumer thread */
@@ -230,7 +238,7 @@
         consume =
           { (args: List[(Entry, Boolean)]) =>
             val results =
-              Data.transaction_lock(db) {
+              Data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") {
                 for ((entry, strict) <- args)
                 yield {
                   if (progress.stopped) {
@@ -266,18 +274,25 @@
 
   /* context for database access */
 
-  def open_database_context(store: Store): Database_Context =
-    new Database_Context(store, store.maybe_open_database_server())
+  def open_database_context(store: Store, server: SSH.Server = SSH.no_server): Database_Context =
+    new Database_Context(store, store.maybe_open_database_server(server = server))
 
-  def open_session_context0(store: Store, session: String): Session_Context =
-    open_database_context(store).open_session0(session, close_database_context = true)
+  def open_session_context0(
+    store: Store,
+    session: String,
+    server: SSH.Server = SSH.no_server
+  ): Session_Context = {
+    open_database_context(store, server = server)
+      .open_session0(session, close_database_context = true)
+  }
 
   def open_session_context(
     store: Store,
     session_background: Sessions.Background,
-    document_snapshot: Option[Document.Snapshot] = None
+    document_snapshot: Option[Document.Snapshot] = None,
+    server: SSH.Server = SSH.no_server
   ): Session_Context = {
-    open_database_context(store).open_session(
+    open_database_context(store, server = server).open_session(
       session_background, document_snapshot = document_snapshot, close_database_context = true)
   }
 
@@ -324,7 +339,8 @@
           case Some(db) => session_hierarchy.map(name => new Session_Database(name, db))
           case None =>
             val attempts =
-              session_hierarchy.map(name => name -> store.try_open_database(name, server = false))
+              for (name <- session_hierarchy)
+                yield name -> store.try_open_database(name, server_mode = false)
             attempts.collectFirst({ case (name, None) => name }) match {
               case Some(bad) =>
                 for ((_, Some(db)) <- attempts) db.close()
--- a/src/Pure/Thy/store.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Thy/store.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -149,25 +149,20 @@
     def read_errors(db: SQL.Database, name: String, cache: Term.Cache): List[String] =
       Build_Log.uncompress_errors(read_bytes(db, name, Session_Info.errors), cache = cache)
 
-    def read_build(db: SQL.Database, name: String): Option[Store.Build_Info] = {
-      if (db.tables.contains(Session_Info.table.name)) {
-        db.execute_query_statementO[Store.Build_Info](
-          Session_Info.table.select(sql = Session_Info.session_name.where_equal(name)),
-          { res =>
-            val uuid =
-              try { Option(res.string(Session_Info.uuid)).getOrElse("") }
-              catch { case _: SQLException => "" }
-            Store.Build_Info(
-              SHA1.fake_shasum(res.string(Session_Info.sources)),
-              SHA1.fake_shasum(res.string(Session_Info.input_heaps)),
-              SHA1.fake_shasum(res.string(Session_Info.output_heap)),
-              res.int(Session_Info.return_code),
-              uuid)
-          }
-        )
-      }
-      else None
-    }
+    def read_build(db: SQL.Database, name: String): Option[Store.Build_Info] =
+      db.execute_query_statementO[Store.Build_Info](
+        Session_Info.table.select(sql = Session_Info.session_name.where_equal(name)),
+        { res =>
+          val uuid =
+            try { Option(res.string(Session_Info.uuid)).getOrElse("") }
+            catch { case _: SQLException => "" }
+          Store.Build_Info(
+            SHA1.fake_shasum(res.string(Session_Info.sources)),
+            SHA1.fake_shasum(res.string(Session_Info.input_heaps)),
+            SHA1.fake_shasum(res.string(Session_Info.output_heap)),
+            res.int(Session_Info.return_code),
+            uuid)
+        })
 
     def write_session_info(
       db: SQL.Database,
@@ -294,41 +289,49 @@
   def build_database_server: Boolean = options.bool("build_database_server")
   def build_database_test: Boolean = options.bool("build_database_test")
 
-  def open_database_server(): PostgreSQL.Database =
-    PostgreSQL.open_database(
+  def open_server(): SSH.Server =
+    PostgreSQL.open_server(options,
+      host = options.string("build_database_host"),
+      port = options.int("build_database_port"),
+      ssh_host = options.string("build_database_ssh_host"),
+      ssh_port = options.int("build_database_ssh_port"),
+      ssh_user = options.string("build_database_ssh_user"))
+
+  def open_database_server(server: SSH.Server = SSH.no_server): PostgreSQL.Database =
+    PostgreSQL.open_database_server(options, server = server,
       user = options.string("build_database_user"),
       password = options.string("build_database_password"),
       database = options.string("build_database_name"),
       host = options.string("build_database_host"),
       port = options.int("build_database_port"),
-      ssh =
-        proper_string(options.string("build_database_ssh_host")).map(ssh_host =>
-          SSH.open_session(options,
-            host = ssh_host,
-            user = options.string("build_database_ssh_user"),
-            port = options.int("build_database_ssh_port"))),
-      ssh_close = true)
+      ssh_host = options.string("build_database_ssh_host"),
+      ssh_port = options.int("build_database_ssh_port"),
+      ssh_user = options.string("build_database_ssh_user"))
 
-  def maybe_open_database_server(): Option[SQL.Database] =
-    if (build_database_server) Some(open_database_server()) else None
+  def maybe_open_database_server(server: SSH.Server = SSH.no_server): Option[SQL.Database] =
+    if (build_database_server) Some(open_database_server(server = server)) else None
 
-  def open_build_database(path: Path): SQL.Database =
-    if (build_database_server) open_database_server()
+  def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database =
+    if (build_database_server) open_database_server(server = server)
     else SQLite.open_database(path, restrict = true)
 
   def maybe_open_build_database(
-    path: Path = Path.explode("$ISABELLE_HOME_USER/build.db")
-  ): Option[SQL.Database] = if (build_database_test) Some(open_build_database(path)) else None
+    path: Path = Path.explode("$ISABELLE_HOME_USER/build.db"),
+    server: SSH.Server = SSH.no_server
+  ): Option[SQL.Database] = {
+    if (build_database_test) Some(open_build_database(path, server = server)) else None
+  }
 
   def try_open_database(
     name: String,
     output: Boolean = false,
-    server: Boolean = build_database_server
+    server: SSH.Server = SSH.no_server,
+    server_mode: Boolean = build_database_server
   ): Option[SQL.Database] = {
     def check(db: SQL.Database): Option[SQL.Database] =
       if (output || session_info_exists(db)) Some(db) else { db.close(); None }
 
-    if (server) check(open_database_server())
+    if (server_mode) check(open_database_server(server = server))
     else if (output) Some(SQLite.open_database(output_database(name)))
     else {
       (for {
@@ -342,8 +345,13 @@
   def error_database(name: String): Nothing =
     error("Missing build database for session " + quote(name))
 
-  def open_database(name: String, output: Boolean = false): SQL.Database =
-    try_open_database(name, output = output) getOrElse error_database(name)
+  def open_database(
+    name: String,
+    output: Boolean = false,
+    server: SSH.Server = SSH.no_server
+  ): SQL.Database = {
+    try_open_database(name, output = output, server = server) getOrElse error_database(name)
+  }
 
   def clean_output(
     database_server: Option[SQL.Database],
@@ -374,6 +382,7 @@
   }
 
   def check_output(
+    database_server: Option[SQL.Database],
     name: String,
     session_options: Options,
     sources_shasum: SHA1.Shasum,
@@ -381,34 +390,34 @@
     fresh_build: Boolean,
     store_heap: Boolean
   ): (Boolean, SHA1.Shasum) = {
-    try_open_database(name) match {
-      case Some(db) =>
-        using(db) { _ =>
-          read_build(db, name) match {
-            case Some(build) =>
-              val output_shasum = heap_shasum(if (db.is_postgresql) Some(db) else None, name)
-              val current =
-                !fresh_build &&
-                  build.ok &&
-                  Sessions.eq_sources(session_options, build.sources, sources_shasum) &&
-                  build.input_heaps == input_shasum &&
-                  build.output_heap == output_shasum &&
-                  !(store_heap && output_shasum.is_empty)
-              (current, output_shasum)
-            case None => (false, SHA1.no_shasum)
-          }
-        }
-      case None => (false, SHA1.no_shasum)
+    def no_check: (Boolean, SHA1.Shasum) = (false, SHA1.no_shasum)
+
+    def check(db: SQL.Database): (Boolean, SHA1.Shasum) =
+      read_build(db, name) match {
+        case Some(build) =>
+          val output_shasum = heap_shasum(if (db.is_postgresql) Some(db) else None, name)
+          val current =
+            !fresh_build &&
+              build.ok &&
+              Sessions.eq_sources(session_options, build.sources, sources_shasum) &&
+              build.input_heaps == input_shasum &&
+              build.output_heap == output_shasum &&
+              !(store_heap && output_shasum.is_empty)
+          (current, output_shasum)
+        case None => no_check
+      }
+
+    database_server match {
+      case Some(db) => if (session_info_exists(db)) check(db) else no_check
+      case None => using_option(try_open_database(name))(check) getOrElse no_check
     }
   }
 
 
   /* session info */
 
-  def session_info_exists(db: SQL.Database): Boolean = {
-    val tables = db.tables
-    Store.Data.tables.forall(table => tables.contains(table.name))
-  }
+  def session_info_exists(db: SQL.Database): Boolean =
+    Store.Data.tables.forall(db.exists_table)
 
   def session_info_defined(db: SQL.Database, name: String): Boolean =
     db.execute_query_statementB(
@@ -419,7 +428,7 @@
     Export.clean_session(db, name)
     Document_Build.clean_session(db, name)
 
-    Store.Data.transaction_lock(db, create = true, synchronized = true) {
+    Store.Data.transaction_lock(db, create = true, label = "Store.clean_session_info") {
       val already_defined = session_info_defined(db, name)
 
       db.execute_statement(
@@ -440,36 +449,52 @@
     build_log: Build_Log.Session_Info,
     build: Store.Build_Info
   ): Unit = {
-    Store.Data.transaction_lock(db, synchronized = true) {
+    Store.Data.transaction_lock(db, label = "Store.write_session_info") {
       Store.Data.write_sources(db, session_name, sources)
       Store.Data.write_session_info(db, cache.compress, session_name, build_log, build)
     }
   }
 
   def read_session_timing(db: SQL.Database, session: String): Properties.T =
-    Store.Data.transaction_lock(db) { Store.Data.read_session_timing(db, session, cache) }
+    Store.Data.transaction_lock(db, label = "Store.read_session_timing") {
+      Store.Data.read_session_timing(db, session, cache)
+    }
 
   def read_command_timings(db: SQL.Database, session: String): Bytes =
-    Store.Data.transaction_lock(db) { Store.Data.read_command_timings(db, session) }
+    Store.Data.transaction_lock(db, label = "Store.read_command_timings") {
+      Store.Data.read_command_timings(db, session)
+    }
 
   def read_theory_timings(db: SQL.Database, session: String): List[Properties.T] =
-    Store.Data.transaction_lock(db) { Store.Data.read_theory_timings(db, session, cache) }
+    Store.Data.transaction_lock(db, label = "Store.read_theory_timings") {
+      Store.Data.read_theory_timings(db, session, cache)
+    }
 
   def read_ml_statistics(db: SQL.Database, session: String): List[Properties.T] =
-    Store.Data.transaction_lock(db) { Store.Data.read_ml_statistics(db, session, cache) }
+    Store.Data.transaction_lock(db, label = "Store.read_ml_statistics") {
+      Store.Data.read_ml_statistics(db, session, cache)
+    }
 
   def read_task_statistics(db: SQL.Database, session: String): List[Properties.T] =
-    Store.Data.transaction_lock(db) { Store.Data.read_task_statistics(db, session, cache) }
+    Store.Data.transaction_lock(db, label = "Store.read_task_statistics") {
+      Store.Data.read_task_statistics(db, session, cache)
+    }
 
   def read_theories(db: SQL.Database, session: String): List[String] =
     read_theory_timings(db, session).flatMap(Markup.Name.unapply)
 
   def read_errors(db: SQL.Database, session: String): List[String] =
-    Store.Data.transaction_lock(db) { Store.Data.read_errors(db, session, cache) }
+    Store.Data.transaction_lock(db, label = "Store.read_errors") {
+      Store.Data.read_errors(db, session, cache)
+    }
 
   def read_build(db: SQL.Database, session: String): Option[Store.Build_Info] =
-    Store.Data.transaction_lock(db) { Store.Data.read_build(db, session) }
+    Store.Data.transaction_lock(db, label = "Store.read_build") {
+      if (session_info_exists(db)) Store.Data.read_build(db, session) else None
+    }
 
   def read_sources(db: SQL.Database, session: String, name: String = ""): List[Store.Source_File] =
-    Store.Data.transaction_lock(db) { Store.Data.read_sources(db, session, name, cache.compress) }
+    Store.Data.transaction_lock(db, label = "Store.read_sources") {
+      Store.Data.read_sources(db, session, name, cache.compress)
+    }
 }
--- a/src/Pure/Tools/build.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Tools/build.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -72,8 +72,11 @@
     def build_options(options: Options): Options =
       options + "completion_limit=0" + "editor_tracing_messages=0"
 
-    def build_process(build_context: Build_Process.Context, build_progress: Progress): Build_Process =
-      new Build_Process(build_context, build_progress)
+    def build_process(
+      build_context: Build_Process.Context,
+      build_progress: Progress,
+      server: SSH.Server
+    ): Build_Process = new Build_Process(build_context, build_progress, server)
 
     final def build_store(options: Options, cache: Term.Cache = Term.Cache.make()): Store = {
       val store = Store(build_options(options), cache = cache)
@@ -85,9 +88,13 @@
       store
     }
 
-    final def run(context: Build_Process.Context, progress: Progress): Results =
+    final def run(
+      context: Build_Process.Context,
+      progress: Progress,
+      server: SSH.Server
+    ): Results =
       Isabelle_Thread.uninterruptible {
-        using(build_process(context, progress)) { build_process =>
+        using(build_process(context, progress, server)) { build_process =>
           Results(context, build_process.run())
         }
       }
@@ -126,108 +133,111 @@
     val store = build_engine.build_store(options, cache = cache)
     val build_options = store.options
 
+    using(store.open_server()) { server =>
+      using_optional(store.maybe_open_database_server(server = server)) { database_server =>
 
-    /* session selection and dependencies */
+
+        /* session selection and dependencies */
 
-    val full_sessions =
-      Sessions.load_structure(build_options, dirs = dirs, select_dirs = select_dirs, infos = infos,
-        augment_options = augment_options)
-    val full_sessions_selection = full_sessions.imports_selection(selection)
+        val full_sessions =
+          Sessions.load_structure(build_options, dirs = dirs, select_dirs = select_dirs, infos = infos,
+            augment_options = augment_options)
+        val full_sessions_selection = full_sessions.imports_selection(selection)
 
-    val build_deps = {
-      val deps0 =
-        Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true,
-          list_files = list_files, check_keywords = check_keywords).check_errors
+        val build_deps = {
+          val deps0 =
+            Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true,
+              list_files = list_files, check_keywords = check_keywords).check_errors
 
-      if (soft_build && !fresh_build) {
-        val outdated =
-          deps0.sessions_structure.build_topological_order.flatMap(name =>
-            store.try_open_database(name) match {
-              case Some(db) =>
-                using(db)(store.read_build(_, name)) match {
-                  case Some(build) if build.ok =>
-                    val session_options = deps0.sessions_structure(name).options
-                    val session_sources = deps0.sources_shasum(name)
-                    if (Sessions.eq_sources(session_options, build.sources, session_sources)) None
-                    else Some(name)
-                  case _ => Some(name)
-                }
-              case None => Some(name)
-            })
+          if (soft_build && !fresh_build) {
+            val outdated =
+              deps0.sessions_structure.build_topological_order.flatMap(name =>
+                store.try_open_database(name, server = server) match {
+                  case Some(db) =>
+                    using(db)(store.read_build(_, name)) match {
+                      case Some(build) if build.ok =>
+                        val session_options = deps0.sessions_structure(name).options
+                        val session_sources = deps0.sources_shasum(name)
+                        if (Sessions.eq_sources(session_options, build.sources, session_sources)) None
+                        else Some(name)
+                      case _ => Some(name)
+                    }
+                  case None => Some(name)
+                })
 
-        Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)),
-          progress = progress, inlined_files = true).check_errors
-      }
-      else deps0
-    }
+            Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)),
+              progress = progress, inlined_files = true).check_errors
+          }
+          else deps0
+        }
 
 
-    /* check unknown files */
+        /* check unknown files */
+
+        if (check_unknown_files) {
+          val source_files =
+            (for {
+              (_, base) <- build_deps.session_bases.iterator
+              (path, _) <- base.session_sources.iterator
+            } yield path).toList
+          Mercurial.check_files(source_files)._2 match {
+            case Nil =>
+            case unknown_files =>
+              progress.echo_warning("Unknown files (not part of the underlying Mercurial repository):" +
+                unknown_files.map(File.standard_path).sorted.mkString("\n  ", "\n  ", ""))
+          }
+        }
+
+
+        /* build process and results */
+
+        val build_context =
+          Build_Process.Context(store, build_deps, hostname = hostname(build_options),
+            build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs,
+            fresh_build = fresh_build, no_build = no_build, session_setup = session_setup,
+            master = true)
 
-    if (check_unknown_files) {
-      val source_files =
-        (for {
-          (_, base) <- build_deps.session_bases.iterator
-          (path, _) <- base.session_sources.iterator
-        } yield path).toList
-      Mercurial.check_files(source_files)._2 match {
-        case Nil =>
-        case unknown_files =>
-          progress.echo_warning("Unknown files (not part of the underlying Mercurial repository):" +
-            unknown_files.map(File.standard_path).sorted.mkString("\n  ", "\n  ", ""))
+        if (clean_build) {
+          for (name <- full_sessions.imports_descendants(full_sessions_selection)) {
+            store.clean_output(database_server, name) match {
+              case None =>
+              case Some(true) => progress.echo("Cleaned " + name)
+              case Some(false) => progress.echo(name + " FAILED to clean")
+            }
+          }
+        }
+
+        val results = build_engine.run(build_context, progress, server)
+
+        if (export_files) {
+          for (name <- full_sessions_selection.iterator if results(name).ok) {
+            val info = results.info(name)
+            if (info.export_files.nonEmpty) {
+              progress.echo("Exporting " + info.name + " ...")
+              for ((dir, prune, pats) <- info.export_files) {
+                Export.export_files(store, name, info.dir + dir,
+                  progress = if (progress.verbose) progress else new Progress,
+                  export_prune = prune,
+                  export_patterns = pats)
+              }
+            }
+          }
+        }
+
+        val presentation_sessions =
+          results.sessions_ok.filter(name => browser_info.enabled(results.info(name)))
+        if (presentation_sessions.nonEmpty && !progress.stopped) {
+          Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions,
+            progress = progress, server = server)
+        }
+
+        if (!results.ok && (progress.verbose || !no_build)) {
+          progress.echo("Unfinished session(s): " + commas(results.unfinished))
+        }
+
+        results
       }
     }
-
-
-    /* build process and results */
-
-    val build_context =
-      Build_Process.Context(store, build_deps, hostname = hostname(build_options),
-        build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs,
-        fresh_build = fresh_build, no_build = no_build, session_setup = session_setup,
-        master = true)
-
-    if (clean_build) {
-      using_optional(store.maybe_open_database_server()) { database_server =>
-        for (name <- full_sessions.imports_descendants(full_sessions_selection)) {
-          store.clean_output(database_server, name) match {
-            case None =>
-            case Some(true) => progress.echo("Cleaned " + name)
-            case Some(false) => progress.echo(name + " FAILED to clean")
-          }
-        }
-      }
-    }
-
-    val results = build_engine.run(build_context, progress)
-
-    if (export_files) {
-      for (name <- full_sessions_selection.iterator if results(name).ok) {
-        val info = results.info(name)
-        if (info.export_files.nonEmpty) {
-          progress.echo("Exporting " + info.name + " ...")
-          for ((dir, prune, pats) <- info.export_files) {
-            Export.export_files(store, name, info.dir + dir,
-              progress = if (progress.verbose) progress else new Progress,
-              export_prune = prune,
-              export_patterns = pats)
-          }
-        }
-      }
-    }
-
-    val presentation_sessions =
-      results.sessions_ok.filter(name => browser_info.enabled(results.info(name)))
-    if (presentation_sessions.nonEmpty && !progress.stopped) {
-      Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions,
-        progress = progress)
-    }
-
-    if (!results.ok && (progress.verbose || !no_build)) {
-      progress.echo("Unfinished session(s): " + commas(results.unfinished))
-    }
-
-    results
   }
 
 
@@ -392,40 +402,45 @@
 
   /* identified builds */
 
-  def read_builds(options: Options): List[Build_Process.Build] =
-    using_option(Store(options).maybe_open_build_database())(
-      Build_Process.read_builds).getOrElse(Nil).filter(_.active)
-
-  def print_builds(options: Options, builds: List[Build_Process.Build]): String =
-    using_optional(Store(options).maybe_open_build_database()) { build_database =>
-      val print_database =
-        build_database match {
-          case None => ""
-          case Some(db) => " (database: " + db + ")"
-        }
-      if (builds.isEmpty) "No build processes available" + print_database
-      else {
-        "Available build processes" + print_database +
-          (for ((build, i) <- builds.iterator.zipWithIndex)
-            yield {
-              "\n  " + (i + 1) + ": " + build.build_uuid +
-                " (platform: " + build.ml_platform +
-                ", start: " + Build_Log.print_date(build.start) + ")"
-            }).mkString
-      }
+  def read_builds(build_database: Option[SQL.Database]): List[Build_Process.Build] =
+    build_database match {
+      case None => Nil
+      case Some(db) => Build_Process.read_builds(db).filter(_.active)
     }
 
+  def print_builds(build_database: Option[SQL.Database], builds: List[Build_Process.Build]): String =
+  {
+    val print_database =
+      build_database match {
+        case None => ""
+        case Some(db) => " (database: " + db + ")"
+      }
+    if (builds.isEmpty) "No build processes available" + print_database
+    else {
+      "Available build processes" + print_database +
+        (for ((build, i) <- builds.iterator.zipWithIndex)
+          yield {
+            "\n  " + (i + 1) + ": " + build.build_uuid +
+              " (platform: " + build.ml_platform +
+              ", start: " + Build_Log.print_date(build.start) + ")"
+          }).mkString
+    }
+  }
+
   def id_builds(
-    options: Options,
-    id: String,
+    build_database: Option[SQL.Database],
+    build_id: String,
     builds: List[Build_Process.Build]
   ): Build_Process.Build =
-    (id, builds.length) match {
+    (build_id, builds.length) match {
       case (Value.Int(i), n) if 1 <= i && i <= n => builds(i - 1)
-      case (UUID(_), _) if builds.exists(_.build_uuid == id) => builds.find(_.build_uuid == id).get
-      case ("", 0) => error(print_builds(options, builds))
+      case (UUID(_), _) if builds.exists(_.build_uuid == build_id) =>
+        builds.find(_.build_uuid == build_id).get
+      case ("", 0) => error(print_builds(build_database, builds))
       case ("", 1) => builds.head
-      case _ => cat_error("Cannot identify build process " + quote(id), print_builds(options, builds))
+      case _ =>
+        cat_error("Cannot identify build process " + quote(build_id),
+          print_builds(build_database, builds))
     }
 
 
@@ -433,30 +448,44 @@
 
   def build_worker(
     options: Options,
-    build_master: Build_Process.Build,
+    build_id: String,
     progress: Progress = new Progress,
+    list_builds: Boolean = false,
     dirs: List[Path] = Nil,
     numa_shuffling: Boolean = false,
     max_jobs: Int = 1,
     cache: Term.Cache = Term.Cache.make()
-  ): Results = {
+  ): Option[Results] = {
     val build_engine = Engine(engine_name(options))
-
-    val store = build_engine.build_store(options, cache = cache)
+    val store = build_engine.build_store(options)
     val build_options = store.options
 
-    val sessions_structure =
-      Sessions.load_structure(build_options, dirs = dirs).
-        selection(Sessions.Selection(sessions = build_master.sessions))
+    using(store.open_server()) { server =>
+      using_optional(store.maybe_open_build_database(server = server)) { build_database =>
+        val builds = read_builds(build_database)
+
+        if (list_builds) progress.echo(print_builds(build_database, builds))
+
+        if (!list_builds || build_id.nonEmpty) {
+          val build_master = id_builds(build_database, build_id, builds)
+
+          val sessions_structure =
+            Sessions.load_structure(build_options, dirs = dirs).
+              selection(Sessions.Selection(sessions = build_master.sessions))
 
-    val build_deps =
-      Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors
+          val build_deps =
+            Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors
 
-    val build_context =
-      Build_Process.Context(store, build_deps, hostname = hostname(build_options),
-        numa_shuffling = numa_shuffling, max_jobs = max_jobs, build_uuid = build_master.build_uuid)
+          val build_context =
+            Build_Process.Context(store, build_deps, hostname = hostname(build_options),
+              numa_shuffling = numa_shuffling, max_jobs = max_jobs,
+              build_uuid = build_master.build_uuid)
 
-    build_engine.run(build_context, progress)
+          Some(build_engine.run(build_context, progress, server))
+        }
+        else None
+      }
+    }
   }
 
 
@@ -504,23 +533,19 @@
 
       val progress = new Console_Progress(verbose = verbose)
 
-      val builds = read_builds(options)
-
-      if (list_builds) progress.echo(print_builds(options, builds))
-
-      if (!list_builds || build_id.nonEmpty) {
-        val build = id_builds(options, build_id, builds)
+      val res =
+        progress.interrupt_handler {
+          build_worker(options, build_id,
+            progress = progress,
+            list_builds = list_builds,
+            dirs = dirs,
+            numa_shuffling = Host.numa_check(progress, numa_shuffling),
+            max_jobs = max_jobs)
+        }
 
-        val results =
-          progress.interrupt_handler {
-            build_worker(options, build,
-              progress = progress,
-              dirs = dirs,
-              numa_shuffling = Host.numa_check(progress, numa_shuffling),
-              max_jobs = max_jobs)
-          }
-
-        sys.exit(results.rc)
+      res match {
+        case Some(results) if !results.ok => sys.exit(results.rc)
+        case _ =>
       }
     })
 
--- a/src/Pure/Tools/build_job.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Tools/build_job.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -24,19 +24,20 @@
     session_context: Session_Context,
     progress: Progress,
     log: Logger,
-    database_server: Option[SQL.Database],
+    server: SSH.Server,
     session_background: Sessions.Background,
     sources_shasum: SHA1.Shasum,
     input_shasum: SHA1.Shasum,
     node_info: Host.Node_Info,
     store_heap: Boolean
   ): Session_Job = {
-    new Session_Job(build_context, session_context, progress, log, database_server,
+    new Session_Job(build_context, session_context, progress, log, server,
       session_background, sources_shasum, input_shasum, node_info, store_heap)
   }
 
   object Session_Context {
     def load(
+      database_server: Option[SQL.Database],
       build_uuid: String,
       name: String,
       deps: List[String],
@@ -52,32 +53,34 @@
           name, deps, ancestors, session_prefs, sources_shasum, timeout,
           Time.zero, Bytes.empty, build_uuid)
 
-      store.try_open_database(name) match {
-        case None => default
-        case Some(db) =>
-          def ignore_error(msg: String) = {
-            progress.echo_warning(
-              "Ignoring bad database " + db + " for session " + quote(name) +
-              if_proper(msg, ":\n" + msg))
-            default
-          }
-          try {
-            val command_timings = store.read_command_timings(db, name)
-            val elapsed =
-              store.read_session_timing(db, name) match {
-                case Markup.Elapsed(s) => Time.seconds(s)
-                case _ => Time.zero
-              }
-            new Session_Context(
-              name, deps, ancestors, session_prefs, sources_shasum, timeout,
-              elapsed, command_timings, build_uuid)
-          }
-          catch {
-            case ERROR(msg) => ignore_error(msg)
-            case exn: java.lang.Error => ignore_error(Exn.message(exn))
-            case _: XML.Error => ignore_error("XML.Error")
-          }
-          finally { db.close() }
+      def read(db: SQL.Database): Session_Context = {
+        def ignore_error(msg: String) = {
+          progress.echo_warning(
+            "Ignoring bad database " + db + " for session " + quote(name) +
+            if_proper(msg, ":\n" + msg))
+          default
+        }
+        try {
+          val command_timings = store.read_command_timings(db, name)
+          val elapsed =
+            store.read_session_timing(db, name) match {
+              case Markup.Elapsed(s) => Time.seconds(s)
+              case _ => Time.zero
+            }
+          new Session_Context(
+            name, deps, ancestors, session_prefs, sources_shasum, timeout,
+            elapsed, command_timings, build_uuid)
+        }
+        catch {
+          case ERROR(msg) => ignore_error(msg)
+          case exn: java.lang.Error => ignore_error(Exn.message(exn))
+          case _: XML.Error => ignore_error("XML.Error")
+        }
+      }
+
+      database_server match {
+        case Some(db) => if (store.session_info_exists(db)) read(db) else default
+        case None => using_option(store.try_open_database(name))(read) getOrElse default
       }
     }
   }
@@ -99,428 +102,437 @@
     session_context: Session_Context,
     progress: Progress,
     log: Logger,
-    database_server: Option[SQL.Database],
+    server: SSH.Server,
     session_background: Sessions.Background,
     sources_shasum: SHA1.Shasum,
     input_shasum: SHA1.Shasum,
     node_info: Host.Node_Info,
     store_heap: Boolean
   ) extends Build_Job {
-    private val store = build_context.store
-
     def session_name: String = session_background.session_name
 
-    private val info: Sessions.Info = session_background.sessions_structure(session_name)
-    private val options: Options = node_info.process_policy(info.options)
-
-    private val session_sources =
-      Store.Sources.load(session_background.base, cache = store.cache.compress)
-
     private val future_result: Future[(Process_Result, SHA1.Shasum)] =
       Future.thread("build", uninterruptible = true) {
-        val env =
-          Isabelle_System.settings(
-            List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
-
-        val session_heaps =
-          session_background.info.parent match {
-            case None => Nil
-            case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic)
-          }
-
-        val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil
-
-        val eval_store =
-          if (store_heap) {
-            (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
-            List("ML_Heap.save_child " +
-              ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
-          }
-          else Nil
+        val info = session_background.sessions_structure(session_name)
+        val options = node_info.process_policy(info.options)
 
-        def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
-          session_background.base.theory_load_commands.get(node_name.theory) match {
-            case None => Nil
-            case Some(spans) =>
-              val syntax = session_background.base.theory_syntax(node_name)
-              val master_dir = Path.explode(node_name.master_dir)
-              for (span <- spans; file <- span.loaded_files(syntax).files)
-                yield {
-                  val src_path = Path.explode(file)
-                  val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
-
-                  val bytes = session_sources(blob_name.node).bytes
-                  val text = bytes.text
-                  val chunk = Symbol.Text_Chunk(text)
+        val store = build_context.store
 
-                  Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
-                    Document.Blobs.Item(bytes, text, chunk, changed = false)
-                }
-          }
-
-
-        /* session */
+        using_optional(store.maybe_open_database_server(server = server)) { database_server =>
 
-        val resources =
-          new Resources(session_background, log = log,
-            command_timings =
-              Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache))
-
-        val session =
-          new Session(options, resources) {
-            override val cache: Term.Cache = store.cache
-
-            override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
-              Command.Blobs_Info.make(session_blobs(node_name))
+          store.clean_output(database_server, session_name, session_init = true)
 
-            override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
-              Document.Blobs.make(session_blobs(node_name))
-          }
-
-        object Build_Session_Errors {
-          private val promise: Promise[List[String]] = Future.promise
-
-          def result: Exn.Result[List[String]] = promise.join_result
-          def cancel(): Unit = promise.cancel()
-          def apply(errs: List[String]): Unit = {
-            try { promise.fulfill(errs) }
-            catch { case _: IllegalStateException => }
-          }
-        }
-
-        val export_consumer =
-          Export.consumer(store.open_database(session_name, output = true), store.cache,
-            progress = progress)
-
-        val stdout = new StringBuilder(1000)
-        val stderr = new StringBuilder(1000)
-        val command_timings = new mutable.ListBuffer[Properties.T]
-        val theory_timings = new mutable.ListBuffer[Properties.T]
-        val session_timings = new mutable.ListBuffer[Properties.T]
-        val runtime_statistics = new mutable.ListBuffer[Properties.T]
-        val task_statistics = new mutable.ListBuffer[Properties.T]
+          val session_sources =
+            Store.Sources.load(session_background.base, cache = store.cache.compress)
 
-        def fun(
-          name: String,
-          acc: mutable.ListBuffer[Properties.T],
-          unapply: Properties.T => Option[Properties.T]
-        ): (String, Session.Protocol_Function) = {
-          name -> ((msg: Prover.Protocol_Output) =>
-            unapply(msg.properties) match {
-              case Some(props) => acc += props; true
-              case _ => false
-            })
-        }
-
-        session.init_protocol_handler(new Session.Protocol_Handler {
-            override def exit(): Unit = Build_Session_Errors.cancel()
+          val env =
+            Isabelle_System.settings(
+              List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
 
-            private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
-              val (rc, errors) =
-                try {
-                  val (rc, errs) = {
-                    import XML.Decode._
-                    pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
-                  }
-                  val errors =
-                    for (err <- errs) yield {
-                      val prt = Protocol_Message.expose_no_reports(err)
-                      Pretty.string_of(prt, metric = Symbol.Metric)
-                    }
-                  (rc, errors)
-                }
-                catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
-
-              session.protocol_command("Prover.stop", rc.toString)
-              Build_Session_Errors(errors)
-              true
+          val session_heaps =
+            session_background.info.parent match {
+              case None => Nil
+              case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic)
             }
 
-            private def loading_theory(msg: Prover.Protocol_Output): Boolean =
-              msg.properties match {
-                case Markup.Loading_Theory(Markup.Name(name)) =>
-                  progress.theory(Progress.Theory(name, session = session_name))
-                  false
-                case _ => false
-              }
-
-            private def export_(msg: Prover.Protocol_Output): Boolean =
-              msg.properties match {
-                case Protocol.Export(args) =>
-                  export_consumer.make_entry(session_name, args, msg.chunk)
-                  true
-                case _ => false
-              }
+          val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil
 
-            override val functions: Session.Protocol_Functions =
-              List(
-                Markup.Build_Session_Finished.name -> build_session_finished,
-                Markup.Loading_Theory.name -> loading_theory,
-                Markup.EXPORT -> export_,
-                fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
-                fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
-                fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
-          })
-
-        session.command_timings += Session.Consumer("command_timings") {
-          case Session.Command_Timing(props) =>
-            for {
-              elapsed <- Markup.Elapsed.unapply(props)
-              elapsed_time = Time.seconds(elapsed)
-              if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
-            } command_timings += props.filter(Markup.command_timing_property)
-        }
-
-        session.runtime_statistics += Session.Consumer("ML_statistics") {
-          case Session.Runtime_Statistics(props) => runtime_statistics += props
-        }
+          val eval_store =
+            if (store_heap) {
+              (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
+              List("ML_Heap.save_child " +
+                ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
+            }
+            else Nil
 
-        session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
-          case snapshot =>
-            if (!progress.stopped) {
-              def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
-                if (!progress.stopped) {
-                  val theory_name = snapshot.node_name.theory
-                  val args =
-                    Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
-                  val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
-                  export_consumer.make_entry(session_name, args, body)
-                }
-              }
-              def export_text(name: String, text: String, compress: Boolean = true): Unit =
-                export_(name, List(XML.Text(text)), compress = compress)
-
-              for (command <- snapshot.snippet_command) {
-                export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
-              }
-
-              export_text(Export.FILES,
-                cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
-                compress = false)
+          def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
+            session_background.base.theory_load_commands.get(node_name.theory) match {
+              case None => Nil
+              case Some(spans) =>
+                val syntax = session_background.base.theory_syntax(node_name)
+                val master_dir = Path.explode(node_name.master_dir)
+                for (span <- spans; file <- span.loaded_files(syntax).files)
+                  yield {
+                    val src_path = Path.explode(file)
+                    val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
 
-              for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
-                val xml = snapshot.switch(blob_name).xml_markup()
-                export_(Export.MARKUP + (i + 1), xml)
-              }
-              export_(Export.MARKUP, snapshot.xml_markup())
-              export_(Export.MESSAGES, snapshot.messages.map(_._1))
-            }
-        }
+                    val bytes = session_sources(blob_name.node).bytes
+                    val text = bytes.text
+                    val chunk = Symbol.Text_Chunk(text)
 
-        session.all_messages += Session.Consumer[Any]("build_session_output") {
-          case msg: Prover.Output =>
-            val message = msg.message
-            if (msg.is_system) resources.log(Protocol.message_text(message))
-
-            if (msg.is_stdout) {
-              stdout ++= Symbol.encode(XML.content(message))
+                    Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
+                      Document.Blobs.Item(bytes, text, chunk, changed = false)
+                  }
             }
-            else if (msg.is_stderr) {
-              stderr ++= Symbol.encode(XML.content(message))
-            }
-            else if (msg.is_exit) {
-              val err =
-                "Prover terminated" +
-                  (msg.properties match {
-                    case Markup.Process_Result(result) => ": " + result.print_rc
-                    case _ => ""
-                  })
-              Build_Session_Errors(List(err))
-            }
-          case _ =>
-        }
-
-        build_context.session_setup(session_name, session)
-
-        val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
 
 
-        /* process */
+          /* session */
 
-        val process =
-          Isabelle_Process.start(options, session, session_background, session_heaps,
-            use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env)
+          val resources =
+            new Resources(session_background, log = log,
+              command_timings =
+                Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache))
 
-        val timeout_request: Option[Event_Timer.Request] =
-          if (info.timeout_ignored) None
-          else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() })
+          val session =
+            new Session(options, resources) {
+              override val cache: Term.Cache = store.cache
 
-        val build_errors =
-          Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
-            Exn.capture { process.await_startup() } match {
-              case Exn.Res(_) =>
-                val resources_yxml = resources.init_session_yxml
-                val encode_options: XML.Encode.T[Options] =
-                  options => session.prover_options(options).encode
-                val args_yxml =
-                  YXML.string_of_body(
-                    {
-                      import XML.Encode._
-                      pair(string, list(pair(encode_options, list(pair(string, properties)))))(
-                        (session_name, info.theories))
-                    })
-                session.protocol_command("build_session", resources_yxml, args_yxml)
-                Build_Session_Errors.result
-              case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
+              override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
+                Command.Blobs_Info.make(session_blobs(node_name))
+
+              override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
+                Document.Blobs.make(session_blobs(node_name))
+            }
+
+          object Build_Session_Errors {
+            private val promise: Promise[List[String]] = Future.promise
+
+            def result: Exn.Result[List[String]] = promise.join_result
+            def cancel(): Unit = promise.cancel()
+            def apply(errs: List[String]): Unit = {
+              try { promise.fulfill(errs) }
+              catch { case _: IllegalStateException => }
             }
           }
 
-        val result0 =
-          Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
+          val export_consumer =
+            Export.consumer(store.open_database(session_name, output = true, server = server),
+              store.cache, progress = progress)
 
-        val was_timeout =
-          timeout_request match {
-            case None => false
-            case Some(request) => !request.cancel()
+          val stdout = new StringBuilder(1000)
+          val stderr = new StringBuilder(1000)
+          val command_timings = new mutable.ListBuffer[Properties.T]
+          val theory_timings = new mutable.ListBuffer[Properties.T]
+          val session_timings = new mutable.ListBuffer[Properties.T]
+          val runtime_statistics = new mutable.ListBuffer[Properties.T]
+          val task_statistics = new mutable.ListBuffer[Properties.T]
+
+          def fun(
+            name: String,
+            acc: mutable.ListBuffer[Properties.T],
+            unapply: Properties.T => Option[Properties.T]
+          ): (String, Session.Protocol_Function) = {
+            name -> ((msg: Prover.Protocol_Output) =>
+              unapply(msg.properties) match {
+                case Some(props) => acc += props; true
+                case _ => false
+              })
           }
 
-        session.stop()
+          session.init_protocol_handler(new Session.Protocol_Handler {
+              override def exit(): Unit = Build_Session_Errors.cancel()
+
+              private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
+                val (rc, errors) =
+                  try {
+                    val (rc, errs) = {
+                      import XML.Decode._
+                      pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
+                    }
+                    val errors =
+                      for (err <- errs) yield {
+                        val prt = Protocol_Message.expose_no_reports(err)
+                        Pretty.string_of(prt, metric = Symbol.Metric)
+                      }
+                    (rc, errors)
+                  }
+                  catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
+
+                session.protocol_command("Prover.stop", rc.toString)
+                Build_Session_Errors(errors)
+                true
+              }
 
-        val export_errors =
-          export_consumer.shutdown(close = true).map(Output.error_message_text)
+              private def loading_theory(msg: Prover.Protocol_Output): Boolean =
+                msg.properties match {
+                  case Markup.Loading_Theory(Markup.Name(name)) =>
+                    progress.theory(Progress.Theory(name, session = session_name))
+                    false
+                  case _ => false
+                }
+
+              private def export_(msg: Prover.Protocol_Output): Boolean =
+                msg.properties match {
+                  case Protocol.Export(args) =>
+                    export_consumer.make_entry(session_name, args, msg.chunk)
+                    true
+                  case _ => false
+                }
+
+              override val functions: Session.Protocol_Functions =
+                List(
+                  Markup.Build_Session_Finished.name -> build_session_finished,
+                  Markup.Loading_Theory.name -> loading_theory,
+                  Markup.EXPORT -> export_,
+                  fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
+                  fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
+                  fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
+            })
+
+          session.command_timings += Session.Consumer("command_timings") {
+            case Session.Command_Timing(props) =>
+              for {
+                elapsed <- Markup.Elapsed.unapply(props)
+                elapsed_time = Time.seconds(elapsed)
+                if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
+              } command_timings += props.filter(Markup.command_timing_property)
+          }
 
-        val (document_output, document_errors) =
-          try {
-            if (build_errors.isInstanceOf[Exn.Res[_]] && result0.ok && info.documents.nonEmpty) {
-              using(Export.open_database_context(store)) { database_context =>
-                val documents =
-                  using(database_context.open_session(session_background)) {
-                    session_context =>
-                      Document_Build.build_documents(
-                        Document_Build.context(session_context, progress = progress),
-                        output_sources = info.document_output,
-                        output_pdf = info.document_output)
+          session.runtime_statistics += Session.Consumer("ML_statistics") {
+            case Session.Runtime_Statistics(props) => runtime_statistics += props
+          }
+
+          session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
+            case snapshot =>
+              if (!progress.stopped) {
+                def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
+                  if (!progress.stopped) {
+                    val theory_name = snapshot.node_name.theory
+                    val args =
+                      Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
+                    val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
+                    export_consumer.make_entry(session_name, args, body)
                   }
-                using(database_context.open_database(session_name, output = true))(session_database =>
-                  documents.foreach(_.write(session_database.db, session_name)))
-                (documents.flatMap(_.log_lines), Nil)
+                }
+                def export_text(name: String, text: String, compress: Boolean = true): Unit =
+                  export_(name, List(XML.Text(text)), compress = compress)
+
+                for (command <- snapshot.snippet_command) {
+                  export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
+                }
+
+                export_text(Export.FILES,
+                  cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
+                  compress = false)
+
+                for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
+                  val xml = snapshot.switch(blob_name).xml_markup()
+                  export_(Export.MARKUP + (i + 1), xml)
+                }
+                export_(Export.MARKUP, snapshot.xml_markup())
+                export_(Export.MESSAGES, snapshot.messages.map(_._1))
               }
-            }
-            else (Nil, Nil)
           }
-          catch {
-            case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
-            case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
+
+          session.all_messages += Session.Consumer[Any]("build_session_output") {
+            case msg: Prover.Output =>
+              val message = msg.message
+              if (msg.is_system) resources.log(Protocol.message_text(message))
+
+              if (msg.is_stdout) {
+                stdout ++= Symbol.encode(XML.content(message))
+              }
+              else if (msg.is_stderr) {
+                stderr ++= Symbol.encode(XML.content(message))
+              }
+              else if (msg.is_exit) {
+                val err =
+                  "Prover terminated" +
+                    (msg.properties match {
+                      case Markup.Process_Result(result) => ": " + result.print_rc
+                      case _ => ""
+                    })
+                Build_Session_Errors(List(err))
+              }
+            case _ =>
           }
 
+          build_context.session_setup(session_name, session)
+
+          val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
+
 
-        /* process result */
+          /* process */
+
+          val process =
+            Isabelle_Process.start(options, session, session_background, session_heaps,
+              use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env)
+
+          val timeout_request: Option[Event_Timer.Request] =
+            if (info.timeout_ignored) None
+            else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() })
 
-        val result1 = {
-          val theory_timing =
-            theory_timings.iterator.flatMap(
-              {
-                case props @ Markup.Name(name) => Some(name -> props)
-                case _ => None
-              }).toMap
-          val used_theory_timings =
-            for { (name, _) <- session_background.base.used_theories }
-              yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
+          val build_errors =
+            Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
+              Exn.capture { process.await_startup() } match {
+                case Exn.Res(_) =>
+                  val resources_yxml = resources.init_session_yxml
+                  val encode_options: XML.Encode.T[Options] =
+                    options => session.prover_options(options).encode
+                  val args_yxml =
+                    YXML.string_of_body(
+                      {
+                        import XML.Encode._
+                        pair(string, list(pair(encode_options, list(pair(string, properties)))))(
+                          (session_name, info.theories))
+                      })
+                  session.protocol_command("build_session", resources_yxml, args_yxml)
+                  Build_Session_Errors.result
+                case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
+              }
+            }
 
-          val more_output =
-            Library.trim_line(stdout.toString) ::
-              command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
-              used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
-              session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
-              runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
-              task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
-              document_output
+          val result0 =
+            Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
 
-          result0.output(more_output)
-            .error(Library.trim_line(stderr.toString))
-            .errors_rc(export_errors ::: document_errors)
-        }
+          val was_timeout =
+            timeout_request match {
+              case None => false
+              case Some(request) => !request.cancel()
+            }
+
+          session.stop()
+
+          val export_errors =
+            export_consumer.shutdown(close = true).map(Output.error_message_text)
 
-        val result2 =
-          build_errors match {
-            case Exn.Res(build_errs) =>
-              val errs = build_errs ::: document_errors
-              if (errs.nonEmpty) {
-                result1.error_rc.output(
-                  errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
-                    errs.map(Protocol.Error_Message_Marker.apply))
+          val (document_output, document_errors) =
+            try {
+              if (build_errors.isInstanceOf[Exn.Res[_]] && result0.ok && info.documents.nonEmpty) {
+                using(Export.open_database_context(store, server = server)) { database_context =>
+                  val documents =
+                    using(database_context.open_session(session_background)) {
+                      session_context =>
+                        Document_Build.build_documents(
+                          Document_Build.context(session_context, progress = progress),
+                          output_sources = info.document_output,
+                          output_pdf = info.document_output)
+                    }
+                  using(database_context.open_database(session_name, output = true))(session_database =>
+                    documents.foreach(_.write(session_database.db, session_name)))
+                  (documents.flatMap(_.log_lines), Nil)
+                }
               }
-              else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
-              else result1
-            case Exn.Exn(Exn.Interrupt()) =>
-              if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
-              else result1
-            case Exn.Exn(exn) => throw exn
-          }
-
-        val process_result =
-          if (result2.ok) result2
-          else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc
-          else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt"))
-          else result2
+              else (Nil, Nil)
+            }
+            catch {
+              case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
+              case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
+            }
 
 
-        /* output heap */
+          /* process result */
 
-        val output_shasum = {
-          val heap = store.output_heap(session_name)
-          if (process_result.ok && store_heap && heap.is_file) {
-            val slice = Space.MiB(options.real("build_database_slice")).bytes
-            val digest = ML_Heap.store(database_server, session_name, heap, slice)
-            SHA1.shasum(digest, session_name)
-          }
-          else SHA1.no_shasum
-        }
+          val result1 = {
+            val theory_timing =
+              theory_timings.iterator.flatMap(
+                {
+                  case props @ Markup.Name(name) => Some(name -> props)
+                  case _ => None
+                }).toMap
+            val used_theory_timings =
+              for { (name, _) <- session_background.base.used_theories }
+                yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
 
-        val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
+            val more_output =
+              Library.trim_line(stdout.toString) ::
+                command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
+                used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
+                session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
+                runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
+                task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
+                document_output
 
-        val build_log =
-          Build_Log.Log_File(session_name, process_result.out_lines).
-            parse_session_info(
-              command_timings = true,
-              theory_timings = true,
-              ml_statistics = true,
-              task_statistics = true)
+            result0.output(more_output)
+              .error(Library.trim_line(stderr.toString))
+              .errors_rc(export_errors ::: document_errors)
+          }
 
-        // write log file
-        if (process_result.ok) {
-          File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines))
-        }
-        else File.write(store.output_log(session_name), terminate_lines(log_lines))
+          val result2 =
+            build_errors match {
+              case Exn.Res(build_errs) =>
+                val errs = build_errs ::: document_errors
+                if (errs.nonEmpty) {
+                  result1.error_rc.output(
+                    errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
+                      errs.map(Protocol.Error_Message_Marker.apply))
+                }
+                else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
+                else result1
+              case Exn.Exn(Exn.Interrupt()) =>
+                if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
+                else result1
+              case Exn.Exn(exn) => throw exn
+            }
+
+          val process_result =
+            if (result2.ok) result2
+            else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc
+            else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt"))
+            else result2
+
+
+          /* output heap */
 
-        // write database
-        using(store.open_database(session_name, output = true))(db =>
-          store.write_session_info(db, session_name, session_sources,
-            build_log =
-              if (process_result.timeout) build_log.error("Timeout") else build_log,
-            build =
-              Store.Build_Info(
-                sources = sources_shasum,
-                input_heaps = input_shasum,
-                output_heap = output_shasum,
-                process_result.rc,
-                build_context.build_uuid)))
+          val output_shasum = {
+            val heap = store.output_heap(session_name)
+            if (process_result.ok && store_heap && heap.is_file) {
+              val slice = Space.MiB(options.real("build_database_slice")).bytes
+              val digest = ML_Heap.store(database_server, session_name, heap, slice)
+              SHA1.shasum(digest, session_name)
+            }
+            else SHA1.no_shasum
+          }
+
+          val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
 
-        // messages
-        process_result.err_lines.foreach(progress.echo(_))
+          val build_log =
+            Build_Log.Log_File(session_name, process_result.out_lines).
+              parse_session_info(
+                command_timings = true,
+                theory_timings = true,
+                ml_statistics = true,
+                task_statistics = true)
+
+          // write log file
+          if (process_result.ok) {
+            File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines))
+          }
+          else File.write(store.output_log(session_name), terminate_lines(log_lines))
 
-        if (process_result.ok) {
-          val props = build_log.session_timing
-          val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
-          val timing = Markup.Timing_Properties.get(props)
-          progress.echo(
-            "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")",
-            verbose = true)
-          progress.echo(
-            "Finished " + session_name + " (" + process_result.timing.message_resources + ")")
+          // write database
+          def write_info(db: SQL.Database): Unit =
+            store.write_session_info(db, session_name, session_sources,
+              build_log =
+                if (process_result.timeout) build_log.error("Timeout") else build_log,
+              build =
+                Store.Build_Info(
+                  sources = sources_shasum,
+                  input_heaps = input_shasum,
+                  output_heap = output_shasum,
+                  process_result.rc,
+                  build_context.build_uuid))
+          database_server match {
+            case Some(db) => write_info(db)
+            case None => using(store.open_database(session_name, output = true))(write_info)
+          }
+
+          // messages
+          process_result.err_lines.foreach(progress.echo(_))
+
+          if (process_result.ok) {
+            val props = build_log.session_timing
+            val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
+            val timing = Markup.Timing_Properties.get(props)
+            progress.echo(
+              "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")",
+              verbose = true)
+            progress.echo(
+              "Finished " + session_name + " (" + process_result.timing.message_resources + ")")
+          }
+          else {
+            progress.echo(
+              session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")")
+            if (!process_result.interrupted) {
+              val tail = info.options.int("process_output_tail")
+              val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)
+              val prefix = if (log_lines.length == suffix.length) Nil else List("...")
+              progress.echo(Library.trim_line(cat_lines(prefix ::: suffix)))
+            }
+          }
+
+          (process_result.copy(out_lines = log_lines), output_shasum)
         }
-        else {
-          progress.echo(
-            session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")")
-          if (!process_result.interrupted) {
-            val tail = info.options.int("process_output_tail")
-            val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)
-            val prefix = if (log_lines.length == suffix.length) Nil else List("...")
-            progress.echo(Library.trim_line(cat_lines(prefix ::: suffix)))
-          }
-        }
-
-        (process_result.copy(out_lines = log_lines), output_shasum)
       }
 
     override def cancel(): Unit = future_result.cancel()
--- a/src/Pure/Tools/build_process.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Tools/build_process.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -128,7 +128,11 @@
       make(data(dom).foldLeft(graph.restrict(dom)) { case (g, e) => g.new_node(e.name, e) })
     }
 
-    def init(build_context: Context, progress: Progress = new Progress): Sessions = {
+    def init(
+      build_context: Context,
+      database_server: Option[SQL.Database],
+      progress: Progress = new Progress
+    ): Sessions = {
       val sessions_structure = build_context.sessions_structure
       make(
         sessions_structure.build_graph.iterator.foldLeft(graph) {
@@ -165,7 +169,7 @@
             }
             else {
               val session =
-                Build_Job.Session_Context.load(
+                Build_Job.Session_Context.load(database_server,
                   build_context.build_uuid, name, deps, ancestors, prefs, sources_shasum,
                   info.timeout, build_context.store, progress = progress)
               graph0.new_node(name, session)
@@ -824,7 +828,9 @@
   }
 
   def read_builds(db: SQL.Database): List[Build] =
-    Data.transaction_lock(db, create = true) { Data.read_builds(db) }
+    Data.transaction_lock(db, create = true, label = "Build_Process.read_builds") {
+      Data.read_builds(db)
+    }
 }
 
 
@@ -833,7 +839,8 @@
 
 class Build_Process(
   protected final val build_context: Build_Process.Context,
-  protected final val build_progress: Progress
+  protected final val build_progress: Progress,
+  protected final val server: SSH.Server
 )
 extends AutoCloseable {
   /* context */
@@ -848,25 +855,28 @@
   /* progress backed by database */
 
   private val _database_server: Option[SQL.Database] =
-    try { store.maybe_open_database_server() }
+    try { store.maybe_open_database_server(server = server) }
     catch { case exn: Throwable => close(); throw exn }
 
   private val _build_database: Option[SQL.Database] =
     try {
-      for (db <- store.maybe_open_build_database()) yield {
+      for (db <- store.maybe_open_build_database(server = server)) yield {
         val store_tables = if (db.is_postgresql) Store.Data.tables else SQL.Tables.empty
-        Build_Process.Data.transaction_lock(db, create = true) {
+        Build_Process.Data.transaction_lock(db,
+          create = true,
+          label = "Build_Process.build_database"
+        ) {
           Build_Process.Data.clean_build(db)
           store_tables.lock(db, create = true)
         }
-        db.vacuum(Build_Process.Data.tables ::: store_tables)
+        if (build_context.master) db.vacuum(Build_Process.Data.tables ::: store_tables)
         db
       }
     }
     catch { case exn: Throwable => close(); throw exn }
 
   private val _host_database: Option[SQL.Database] =
-    try { store.maybe_open_build_database(path = Host.Data.database) }
+    try { store.maybe_open_build_database(path = Host.Data.database, server = server) }
     catch { case exn: Throwable => close(); throw exn }
 
   protected val (progress, worker_uuid) = synchronized {
@@ -874,7 +884,7 @@
       case None => (build_progress, UUID.random().toString)
       case Some(db) =>
         try {
-          val progress_db = store.open_build_database(Progress.Data.database)
+          val progress_db = store.open_build_database(Progress.Data.database, server = server)
           val progress =
             new Database_Progress(progress_db, build_progress,
               hostname = hostname,
@@ -903,26 +913,27 @@
 
   private var _state: Build_Process.State = Build_Process.State()
 
-  protected def synchronized_database[A](body: => A): A = synchronized {
-    _build_database match {
-      case None => body
-      case Some(db) =>
-        Build_Process.Data.transaction_lock(db) {
+  protected def synchronized_database[A](label: String)(body: => A): A =
+    synchronized {
+      _build_database match {
+        case None => body
+        case Some(db) =>
           progress.asInstanceOf[Database_Progress].sync()
-          _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state)
-          val res = body
-          _state =
-            Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state)
-          res
-        }
+          Build_Process.Data.transaction_lock(db, label = label) {
+            _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state)
+            val res = body
+            _state =
+              Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state)
+            res
+          }
+      }
     }
-  }
 
 
   /* policy operations */
 
   protected def init_state(state: Build_Process.State): Build_Process.State = {
-    val sessions1 = state.sessions.init(build_context, progress = build_progress)
+    val sessions1 = state.sessions.init(build_context, _database_server, progress = build_progress)
 
     val old_pending = state.pending.iterator.map(_.name).toSet
     val new_pending =
@@ -960,7 +971,7 @@
       state.sessions.iterator.exists(_.ancestors.contains(session_name))
 
     val (current, output_shasum) =
-      store.check_output(session_name,
+      store.check_output(_database_server, session_name,
         session_options = build_context.sessions_structure(session_name).options,
         sources_shasum = sources_shasum,
         input_shasum = input_shasum,
@@ -1009,12 +1020,10 @@
         (if (store_heap) "Building " else "Running ") + session_name +
           if_proper(node_info.numa_node, " on " + node_info) + " ...")
 
-      store.clean_output(_database_server, session_name, session_init = true)
-
       val session = state.sessions(session_name)
 
       val build =
-        Build_Job.start_session(build_context, session, progress, log, _database_server,
+        Build_Job.start_session(build_context, session, progress, log, server,
           build_deps.background(session_name), sources_shasum, input_shasum, node_info, store_heap)
 
       val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))
@@ -1029,27 +1038,27 @@
   final def is_session_name(job_name: String): Boolean =
     !Long_Name.is_qualified(job_name)
 
-  protected final def start_build(): Unit = synchronized_database {
+  protected final def start_build(): Unit = synchronized_database("Build_Process.start_build") {
     for (db <- _build_database) {
       Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
         build_context.sessions_structure.session_prefs)
     }
   }
 
-  protected final def stop_build(): Unit = synchronized_database {
+  protected final def stop_build(): Unit = synchronized_database("Build_Process.stop_build") {
     for (db <- _build_database) {
       Build_Process.Data.stop_build(db, build_uuid)
     }
   }
 
-  protected final def start_worker(): Unit = synchronized_database {
+  protected final def start_worker(): Unit = synchronized_database("Build_Process.start_worker") {
     for (db <- _build_database) {
       _state = _state.inc_serial
       Build_Process.Data.start_worker(db, worker_uuid, build_uuid, _state.serial)
     }
   }
 
-  protected final def stop_worker(): Unit = synchronized_database {
+  protected final def stop_worker(): Unit = synchronized_database("Build_Process.stop_worker") {
     for (db <- _build_database) {
       Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop_now = true)
     }
@@ -1059,16 +1068,18 @@
   /* run */
 
   def run(): Map[String, Process_Result] = {
-    if (build_context.master) synchronized_database { _state = init_state(_state) }
+    if (build_context.master) {
+      synchronized_database("Build_Process.init") { _state = init_state(_state) }
+    }
 
-    def finished(): Boolean = synchronized_database { _state.finished }
+    def finished(): Boolean = synchronized_database("Build_Process.test") { _state.finished }
 
     def sleep(): Unit =
       Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
-        build_options.seconds("editor_input_delay").sleep()
+        build_options.seconds("build_delay").sleep()
       }
 
-    def start_job(): Boolean = synchronized_database {
+    def check_job(): Boolean = synchronized_database("Build_Process.check_job") {
       next_job(_state) match {
         case Some(name) =>
           if (is_session_name(name)) {
@@ -1094,7 +1105,7 @@
 
       try {
         while (!finished()) {
-          synchronized_database {
+          synchronized_database("Build_Process.main") {
             if (progress.stopped) _state.stop_running()
 
             for (job <- _state.finished_running()) {
@@ -1107,7 +1118,7 @@
             }
           }
 
-          if (!start_job()) sleep()
+          if (!check_job()) sleep()
         }
       }
       finally {
@@ -1115,7 +1126,7 @@
         if (build_context.master) stop_build()
       }
 
-      synchronized_database {
+      synchronized_database("Build_Process.result") {
         for ((name, result) <- _state.results) yield name -> result.process_result
       }
     }
@@ -1124,7 +1135,7 @@
 
   /* snapshot */
 
-  def snapshot(): Build_Process.Snapshot = synchronized_database {
+  def snapshot(): Build_Process.Snapshot = synchronized_database("Build_Process.snapshot") {
     val (builds, workers) =
       _build_database match {
         case None => (Nil, Nil)
--- a/src/Pure/Tools/server.scala	Sun Jul 16 11:04:59 2023 +0100
+++ b/src/Pure/Tools/server.scala	Mon Jul 17 21:41:14 2023 +0200
@@ -375,7 +375,7 @@
   }
 
   def list(db: SQLite.Database): List[Info] =
-    if (db.tables.contains(Data.table.name)) {
+    if (db.exists_table(Data.table)) {
       db.execute_query_statement(Data.table.select(),
         List.from[Info],
         { res =>