merged
authortraytel
Mon, 11 Mar 2024 08:46:20 +0100
changeset 79856 ab651e3abb40
parent 79855 3713ca49e32c (current diff)
parent 79854 ea5b1f0cb448 (diff)
child 79857 819c28a7280f
merged
--- a/src/Pure/Admin/build_log.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Admin/build_log.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -689,6 +689,13 @@
   object private_data extends SQL.Data("isabelle_build_log") {
     /* tables */
 
+    override lazy val tables: SQL.Tables =
+      SQL.Tables(
+        meta_info_table,
+        sessions_table,
+        theories_table,
+        ml_statistics_table)
+
     val meta_info_table =
       make_table(Column.log_name :: Prop.all_props ::: Settings.all_settings, name = "meta_info")
 
@@ -711,13 +718,6 @@
       make_table(List(Column.log_name, Column.session_name, Column.ml_statistics),
         name = "ml_statistics")
 
-    override val tables: SQL.Tables =
-      SQL.Tables(
-        meta_info_table,
-        sessions_table,
-        theories_table,
-        ml_statistics_table)
-
 
     /* earliest pull date for repository version (PostgreSQL queries) */
 
--- a/src/Pure/Build/build.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Build/build.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -517,17 +517,26 @@
 
         build_database match {
           case None => print(Nil)
+          case Some(db) if remove_builds && force =>
+            db.transaction {
+              val tables0 =
+                ML_Heap.private_data.tables.list :::
+                Store.private_data.tables.list :::
+                Progress.private_data.tables.list :::
+                Build_Process.private_data.tables.list
+              val tables = tables0.filter(t => db.exists_table(t.name)).sortBy(_.name)
+              if (tables.nonEmpty) {
+                progress.echo("Removing tables " + commas_quote(tables.map(_.name)) + " ...")
+                db.execute_statement(SQL.MULTI(tables.map(db.destroy)))
+              }
+            }
           case Some(db) =>
             Build_Process.private_data.transaction_lock(db, create = true, label = "build_process") {
               val builds = Build_Process.private_data.read_builds(db)
-              val remove =
-                if (!remove_builds) Nil
-                else if (force) builds.map(_.build_uuid)
-                else builds.flatMap(build => if (build.active) None else Some(build.build_uuid))
-
               print(builds)
-              if (remove.nonEmpty) {
-                if (remove_builds) {
+              if (remove_builds) {
+                val remove = builds.flatMap(_.active_build_uuid)
+                if (remove.nonEmpty) {
                   progress.echo("Removing " + commas(remove) + " ...")
                   Build_Process.private_data.remove_builds(db, remove)
                   print(Build_Process.private_data.read_builds(db))
--- a/src/Pure/Build/build_process.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Build/build_process.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -18,6 +18,7 @@
 
   sealed case class Build(
     build_uuid: String,   // Database_Progress.context_uuid
+    build_id: Long,
     ml_platform: String,
     options: String,
     start: Date,
@@ -25,6 +26,7 @@
     sessions: List[String]
   ) {
     def active: Boolean = stop.isEmpty
+    def active_build_uuid: Option[String] = if (active) Some(build_uuid) else None
 
     def print: String =
       build_uuid + " (platform: " + ml_platform + ", start: " + Build_Log.print_date(start) +
@@ -40,14 +42,22 @@
     serial: Long
   )
 
+  object Task {
+    type Entry = (String, Task)
+    def entry(name: String, deps: List[String], build_uuid: String): Entry =
+      name -> Task(name, deps, build_uuid)
+    def entry(session: Build_Job.Session_Context, build_context: isabelle.Build.Context): Entry =
+      entry(session.name, session.deps, build_context.build_uuid)
+  }
+
   sealed case class Task(
     name: String,
     deps: List[String],
     build_uuid: String
-  ) {
+  ) extends Library.Named {
     def is_ready: Boolean = deps.isEmpty
-    def resolve(dep: String): Task =
-      if (deps.contains(dep)) copy(deps = deps.filterNot(_ == dep)) else this
+    def resolve(dep: String): Option[Task] =
+      if (deps.contains(dep)) Some(copy(deps = deps.filterNot(_ == dep))) else None
   }
 
   sealed case class Job(
@@ -89,6 +99,9 @@
     def iterator: Iterator[Build_Job.Session_Context] =
       for (name <- graph.topological_order.iterator) yield apply(name)
 
+    def data: Library.Update.Data[Build_Job.Session_Context] =
+      Map.from(for ((_, (session, _)) <- graph.iterator) yield session.name -> session)
+
     def make(new_graph: Sessions.Graph): Sessions =
       if (graph == new_graph) this
       else {
@@ -198,31 +211,44 @@
     results: State.Results)     // finished results
 
   object State {
-    type Pending = List[Task]
-    type Running = Map[String, Job]
-    type Results = Map[String, Result]
+    def inc_serial(serial: Long) = {
+      require(serial < Long.MaxValue, "serial overflow")
+      serial + 1
+    }
+
+    type Pending = Library.Update.Data[Task]
+    type Running = Library.Update.Data[Job]
+    type Results = Library.Update.Data[Result]
   }
 
   sealed case class State(
     serial: Long = 0,
     numa_nodes: List[Int] = Nil,
     sessions: Sessions = Sessions.empty,
-    pending: State.Pending = Nil,
+    pending: State.Pending = Map.empty,
     running: State.Running = Map.empty,
     results: State.Results = Map.empty
   ) {
     require(serial >= 0, "serial underflow")
-    def inc_serial: State = {
-      require(serial < Long.MaxValue, "serial overflow")
-      copy(serial = serial + 1)
-    }
+
+    def next_serial: Long = State.inc_serial(serial)
+    def inc_serial: State = copy(serial = next_serial)
+
+    def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name)
+    def next_ready: List[Task] = ready.filter(entry => !is_running(entry.name))
 
-    def ready: State.Pending = pending.filter(_.is_ready)
-    def next_ready: State.Pending = ready.filter(entry => !is_running(entry.name))
-
-    def remove_pending(name: String): State =
-      copy(pending = pending.flatMap(
-        entry => if (entry.name == name) None else Some(entry.resolve(name))))
+    def remove_pending(a: String): State =
+      copy(pending =
+        pending.foldLeft(pending) {
+          case (map, (b, task)) =>
+            if (a == b) map - a
+            else {
+              task.resolve(a) match {
+                case None => map
+                case Some(task1) => map + (b -> task1)
+              }
+            }
+        })
 
     def is_running(name: String): Boolean = running.isDefinedAt(name)
 
@@ -263,6 +289,20 @@
   object private_data extends SQL.Data("isabelle_build") {
     val database: Path = Path.explode("$ISABELLE_HOME_USER/build.db")
 
+    override lazy val tables: SQL.Tables =
+      SQL.Tables(
+        Updates.table,
+        Sessions.table,
+        Pending.table,
+        Running.table,
+        Results.table,
+        Base.table,
+        Workers.table)
+
+    private lazy val build_uuid_tables = tables.filter(Generic.build_uuid_table)
+    private lazy val build_id_tables =
+      tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t))
+
     def pull[A <: Library.Named](
       data_domain: Set[String],
       data_iterator: Set[String] => Iterator[A],
@@ -290,40 +330,100 @@
     }
 
     object Generic {
+      val build_id = SQL.Column.long("build_id")
       val build_uuid = SQL.Column.string("build_uuid")
       val worker_uuid = SQL.Column.string("worker_uuid")
       val name = SQL.Column.string("name")
 
+      def build_id_table(table: SQL.Table): Boolean =
+        table.columns.exists(column => column.name == build_id.name)
+
+      def build_uuid_table(table: SQL.Table): Boolean =
+        table.columns.exists(column => column.name == build_uuid.name)
+
       def sql(
+        build_id: Long = 0,
         build_uuid: String = "",
         worker_uuid: String = "",
         names: Iterable[String] = Nil
       ): SQL.Source =
         SQL.and(
+          if_proper(build_id > 0, Generic.build_id.equal(build_id)),
           if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)),
           if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)),
           if_proper(names, Generic.name.member(names)))
 
       def sql_where(
+        build_id: Long = 0,
         build_uuid: String = "",
         worker_uuid: String = "",
         names: Iterable[String] = Nil
       ): SQL.Source = {
-        SQL.where(sql(build_uuid = build_uuid, worker_uuid = worker_uuid, names = names))
+        SQL.where(sql(
+          build_id = build_id,
+          build_uuid = build_uuid,
+          worker_uuid = worker_uuid,
+          names = names))
       }
     }
 
 
+    /* recorded updates */
+
+    object Updates {
+      val build_id = Generic.build_id.make_primary_key
+      val serial = SQL.Column.long("serial").make_primary_key
+      val kind = SQL.Column.int("kind").make_primary_key
+      val name = Generic.name.make_primary_key
+
+      val table = make_table(List(build_id, serial, kind, name), name = "updates")
+    }
+
+    def write_updates(
+      db: SQL.Database,
+      build_id: Long,
+      serial: Long,
+      updates: List[Library.Update]
+    ): Unit =
+      db.execute_batch_statement(db.insert_permissive(Updates.table), batch =
+        for (update <- updates.iterator; kind = update.kind; name <- update.domain.iterator)
+        yield { (stmt: SQL.Statement) =>
+          require(build_id > 0 && serial > 0 && kind > 0 && name.nonEmpty,
+            "Bad database update: build_id = " + build_id + ", serial = " + serial +
+              ", kind = " + kind + ", name = " + quote(name))
+          stmt.long(1) = build_id
+          stmt.long(2) = serial
+          stmt.int(3) = kind
+          stmt.string(4) = name
+        })
+
+
     /* base table */
 
     object Base {
       val build_uuid = Generic.build_uuid.make_primary_key
+      val build_id = Generic.build_id.make_primary_key
       val ml_platform = SQL.Column.string("ml_platform")
       val options = SQL.Column.string("options")
       val start = SQL.Column.date("start")
       val stop = SQL.Column.date("stop")
 
-      val table = make_table(List(build_uuid, ml_platform, options, start, stop))
+      val table = make_table(List(build_uuid, build_id, ml_platform, options, start, stop))
+    }
+
+    def read_build_ids(db: SQL.Database, build_uuids: List[String]): List[Long] =
+      db.execute_query_statement(
+        Base.table.select(List(Base.build_id),
+          sql = if_proper(build_uuids, Base.build_uuid.where_member(build_uuids))),
+        List.from[Long], res => res.long(Base.build_id))
+
+    def get_build_id(db: SQL.Database, build_uuid: String): Long = {
+      read_build_ids(db, build_uuids = List(build_uuid)) match {
+        case build_id :: _ => build_id
+        case _ =>
+          db.execute_query_statementO(
+            Base.table.select(List(Base.build_id.max)), _.long(Base.build_id)).getOrElse(0L) + 1L
+      }
     }
 
     def read_builds(db: SQL.Database): List[Build] = {
@@ -331,11 +431,12 @@
         db.execute_query_statement(Base.table.select(), List.from[Build],
           { res =>
             val build_uuid = res.string(Base.build_uuid)
+            val build_id = res.long(Base.build_id)
             val ml_platform = res.string(Base.ml_platform)
             val options = res.string(Base.options)
             val start = res.date(Base.start)
             val stop = res.get_date(Base.stop)
-            Build(build_uuid, ml_platform, options, start, stop, Nil)
+            Build(build_uuid, build_id, ml_platform, options, start, stop, Nil)
           })
 
       for (build <- builds.sortBy(_.start)(Date.Ordering)) yield {
@@ -344,14 +445,21 @@
       }
     }
 
-    def remove_builds(db: SQL.Database, remove: List[String]): Unit =
-      if (remove.nonEmpty) {
-        val sql = Generic.build_uuid.where_member(remove)
-        db.execute_statement(SQL.MULTI(build_uuid_tables.map(_.delete(sql = sql))))
+    def remove_builds(db: SQL.Database, build_uuids: List[String]): Unit =
+      if (build_uuids.nonEmpty) {
+        val build_ids = read_build_ids(db, build_uuids = build_uuids)
+
+        val sql1 = Generic.build_uuid.where_member(build_uuids)
+        val sql2 = Generic.build_id.where_member_long(build_ids)
+        db.execute_statement(
+          SQL.MULTI(
+            build_uuid_tables.map(_.delete(sql = sql1)) ++
+            build_id_tables.map(_.delete(sql = sql2))))
       }
 
     def start_build(
       db: SQL.Database,
+      build_id: Long,
       build_uuid: String,
       ml_platform: String,
       options: String,
@@ -360,10 +468,11 @@
       db.execute_statement(Base.table.insert(), body =
         { stmt =>
           stmt.string(1) = build_uuid
-          stmt.string(2) = ml_platform
-          stmt.string(3) = options
-          stmt.date(4) = start
-          stmt.date(5) = None
+          stmt.long(2) = build_id
+          stmt.string(3) = ml_platform
+          stmt.string(4) = options
+          stmt.date(5) = start
+          stmt.date(6) = None
         })
     }
 
@@ -400,6 +509,8 @@
           List(name, deps, ancestors, options, sources, timeout,
             old_time, old_command_timings, build_uuid),
           name = "sessions")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
     def read_sessions_domain(db: SQL.Database, build_uuid: String = ""): Set[String] =
@@ -440,12 +551,20 @@
       db: SQL.Database,
       sessions: Build_Process.Sessions,
       old_sessions: Build_Process.Sessions
-    ): Boolean = {
-      val insert = sessions.iterator.filterNot(s => old_sessions.defined(s.name)).toList
+    ): Library.Update = {
+      val update =
+        if (old_sessions.eq(sessions)) Library.Update.empty
+        else Library.Update.make(old_sessions.data, sessions.data, kind = Sessions.table_index)
 
-      if (insert.nonEmpty) {
+      if (update.deletes) {
+        db.execute_statement(
+          Sessions.table.delete(sql = Generic.sql_where(names = update.delete)))
+      }
+
+      if (update.inserts) {
         db.execute_batch_statement(Sessions.table.insert(), batch =
-          for (session <- insert) yield { (stmt: SQL.Statement) =>
+          for (name <- update.insert) yield { (stmt: SQL.Statement) =>
+            val session = sessions(name)
             stmt.string(1) = session.name
             stmt.string(2) = cat_lines(session.deps)
             stmt.string(3) = cat_lines(session.ancestors)
@@ -458,7 +577,7 @@
           })
       }
 
-      insert.nonEmpty
+      update
     }
 
 
@@ -474,6 +593,8 @@
 
       val table =
         make_table(List(worker_uuid, build_uuid, start, stamp, stop, serial), name = "workers")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
     def read_serial(db: SQL.Database): Long =
@@ -563,41 +684,44 @@
       val build_uuid = Generic.build_uuid
 
       val table = make_table(List(name, deps, build_uuid), name = "pending")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
-    def read_pending(db: SQL.Database): List[Task] =
+    def read_pending(db: SQL.Database): State.Pending =
       db.execute_query_statement(
-        Pending.table.select(sql = SQL.order_by(List(Pending.name))),
-        List.from[Task],
+        Pending.table.select(),
+        Map.from[String, Task],
         { res =>
           val name = res.string(Pending.name)
           val deps = res.string(Pending.deps)
           val build_uuid = res.string(Pending.build_uuid)
-          Task(name, split_lines(deps), build_uuid)
+          Task.entry(name, split_lines(deps), build_uuid)
         })
 
     def update_pending(
       db: SQL.Database,
       pending: State.Pending,
       old_pending: State.Pending
-    ): Boolean = {
-      val (delete, insert) = Library.symmetric_difference(old_pending, pending)
+    ): Library.Update = {
+      val update = Library.Update.make(old_pending, pending, kind = Pending.table_index)
 
-      if (delete.nonEmpty) {
+      if (update.deletes) {
         db.execute_statement(
-          Pending.table.delete(sql = Generic.sql_where(names = delete.map(_.name))))
+          Pending.table.delete(sql = Generic.sql_where(names = update.delete)))
       }
 
-      if (insert.nonEmpty) {
+      if (update.inserts) {
         db.execute_batch_statement(Pending.table.insert(), batch =
-          for (task <- insert) yield { (stmt: SQL.Statement) =>
+          for (name <- update.insert) yield { (stmt: SQL.Statement) =>
+            val task = pending(name)
             stmt.string(1) = task.name
             stmt.string(2) = cat_lines(task.deps)
             stmt.string(3) = task.build_uuid
           })
       }
 
-      delete.nonEmpty || insert.nonEmpty
+      update
     }
 
 
@@ -616,11 +740,13 @@
         make_table(
           List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus, start_date),
           name = "running")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
     def read_running(db: SQL.Database): State.Running =
       db.execute_query_statement(
-        Running.table.select(sql = SQL.order_by(List(Running.name))),
+        Running.table.select(),
         Map.from[String, Job],
         { res =>
           val name = res.string(Running.name)
@@ -640,19 +766,18 @@
       db: SQL.Database,
       running: State.Running,
       old_running: State.Running
-    ): Boolean = {
-      val running0 = old_running.valuesIterator.toList
-      val running1 = running.valuesIterator.toList
-      val (delete, insert) = Library.symmetric_difference(running0, running1)
+    ): Library.Update = {
+      val update = Library.Update.make(old_running, running, kind = Running.table_index)
 
-      if (delete.nonEmpty) {
+      if (update.deletes) {
         db.execute_statement(
-          Running.table.delete(sql = Generic.sql_where(names = delete.map(_.name))))
+          Running.table.delete(sql = Generic.sql_where(names = update.delete)))
       }
 
-      if (insert.nonEmpty) {
+      if (update.inserts) {
         db.execute_batch_statement(Running.table.insert(), batch =
-          for (job <- insert) yield { (stmt: SQL.Statement) =>
+          for (name <- update.insert) yield { (stmt: SQL.Statement) =>
+            val job = running(name)
             stmt.string(1) = job.name
             stmt.string(2) = job.worker_uuid
             stmt.string(3) = job.build_uuid
@@ -663,7 +788,7 @@
           })
       }
 
-      delete.nonEmpty || insert.nonEmpty
+      update
     }
 
 
@@ -690,6 +815,8 @@
           List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus,
             rc, out, err, timing_elapsed, timing_cpu, timing_gc, output_shasum, current),
           name = "results")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
     def read_results_domain(db: SQL.Database): Set[String] =
@@ -736,13 +863,18 @@
       db: SQL.Database,
       results: State.Results,
       old_results: State.Results
-    ): Boolean = {
-      val insert =
-        results.valuesIterator.filterNot(res => old_results.isDefinedAt(res.name)).toList
+    ): Library.Update = {
+      val update = Library.Update.make(old_results, results, kind = Results.table_index)
 
-      if (insert.nonEmpty) {
+      if (update.deletes) {
+        db.execute_statement(
+          Results.table.delete(sql = Generic.sql_where(names = update.delete)))
+      }
+
+      if (update.inserts) {
         db.execute_batch_statement(Results.table.insert(), batch =
-          for (result <- insert) yield { (stmt: SQL.Statement) =>
+          for (name <- update.insert) yield { (stmt: SQL.Statement) =>
+            val result = results(name)
             val process_result = result.process_result
             stmt.string(1) = result.name
             stmt.string(2) = result.worker_uuid
@@ -761,25 +893,12 @@
           })
       }
 
-      insert.nonEmpty
+      update
     }
 
 
     /* collective operations */
 
-    override val tables: SQL.Tables =
-      SQL.Tables(
-        Base.table,
-        Workers.table,
-        Sessions.table,
-        Pending.table,
-        Running.table,
-        Results.table)
-
-    private val build_uuid_tables =
-      tables.filter(table =>
-        table.columns.exists(column => column.name == Generic.build_uuid.name))
-
     def pull_database(db: SQL.Database, worker_uuid: String, state: State): State = {
       val serial_db = read_serial(db)
       if (serial_db == state.serial) state
@@ -799,21 +918,26 @@
 
     def update_database(
       db: SQL.Database,
+      build_id: Long,
       worker_uuid: String,
       state: State,
       old_state: State
     ): State = {
-      val changed =
+      val updates =
         List(
           update_sessions(db, state.sessions, old_state.sessions),
           update_pending(db, state.pending, old_state.pending),
           update_running(db, state.running, old_state.running),
-          update_results(db, state.results, old_state.results))
+          update_results(db, state.results, old_state.results)
+        ).filter(_.defined)
 
-      val state1 = if (changed.exists(identity)) state.inc_serial else state
-      if (state1.serial != state.serial) stamp_worker(db, worker_uuid, state1.serial)
-
-      state1
+      if (updates.nonEmpty) {
+        val serial = state.next_serial
+        write_updates(db, build_id, serial, updates)
+        stamp_worker(db, worker_uuid, serial)
+        state.copy(serial = serial)
+      }
+      else state
     }
   }
 
@@ -821,6 +945,21 @@
     private_data.transaction_lock(db, create = true, label = "Build_Process.read_builds") {
       private_data.read_builds(db)
     }
+
+  def init_build(
+    db: SQL.Database,
+    build_context: isabelle.Build.Context,
+    build_start: Date
+  ): Long =
+    private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") {
+      val build_uuid = build_context.build_uuid
+      val build_id = private_data.get_build_id(db, build_uuid)
+      if (build_context.master) {
+        private_data.start_build(db, build_id, build_uuid, build_context.ml_platform,
+          build_context.sessions_structure.session_prefs, build_start)
+      }
+      build_id
+    }
 }
 
 
@@ -915,9 +1054,15 @@
     }
   }
 
+  protected val log: Logger = Logger.make_system_log(progress, build_options)
+
   protected val build_start: Date = build_context.build_start getOrElse progress.now()
 
-  protected val log: Logger = Logger.make_system_log(progress, build_options)
+  protected val build_id: Long =
+    _build_database match {
+      case None => 0L
+      case Some(db) => Build_Process.init_build(db, build_context, build_start)
+    }
 
   protected def open_build_cluster(): Build_Cluster =
     Build_Cluster.make(build_context, progress = build_progress).open()
@@ -955,7 +1100,9 @@
             val old_state = Build_Process.private_data.pull_database(db, worker_uuid, _state)
             _state = old_state
             val res = body
-            _state = Build_Process.private_data.update_database(db, worker_uuid, _state, old_state)
+            _state =
+              Build_Process.private_data.update_database(
+                db, build_id, worker_uuid, _state, old_state)
             res
           }
       }
@@ -964,19 +1111,6 @@
 
   /* policy operations */
 
-  protected def init_state(state: Build_Process.State): Build_Process.State = {
-    val sessions1 = state.sessions.init(build_context, _database_server, progress = build_progress)
-
-    val old_pending = state.pending.iterator.map(_.name).toSet
-    val new_pending =
-      List.from(
-        for (session <- sessions1.iterator if !old_pending(session.name))
-          yield Build_Process.Task(session.name, session.deps, build_uuid))
-    val pending1 = new_pending ::: state.pending
-
-    state.copy(sessions = sessions1, pending = pending1)
-  }
-
   protected def next_jobs(state: Build_Process.State): List[String] = {
     val limit = {
       if (progress.stopped) { if (build_context.master) Int.MaxValue else 0 }
@@ -1088,13 +1222,6 @@
   final def is_session_name(job_name: String): Boolean =
     !Long_Name.is_qualified(job_name)
 
-  protected final def start_build(): Unit = synchronized_database("Build_Process.start_build") {
-    for (db <- _build_database) {
-      Build_Process.private_data.start_build(db, build_uuid, build_context.ml_platform,
-        build_context.sessions_structure.session_prefs, build_start)
-    }
-  }
-
   protected final def stop_build(): Unit = synchronized_database("Build_Process.stop_build") {
     for (db <- _build_database) {
       Build_Process.private_data.stop_build(db, build_uuid)
@@ -1134,6 +1261,23 @@
   protected def sleep(): Unit =
     Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() }
 
+  protected def init_unsynchronized(): Unit = {
+    if (build_context.master) {
+      val sessions1 =
+        _state.sessions.init(build_context, _database_server, progress = build_progress)
+      val pending1 =
+        sessions1.iterator.foldLeft(_state.pending) {
+          case (map, session) =>
+            if (map.isDefinedAt(session.name)) map
+            else map + Build_Process.Task.entry(session, build_context)
+        }
+      _state = _state.copy(sessions = sessions1, pending = pending1)
+    }
+
+    val numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling)
+    _state = _state.copy(numa_nodes = numa_nodes)
+  }
+
   protected def main_unsynchronized(): Unit = {
     for (job <- _state.build_running.filter(_.is_finished)) {
       _state = _state.remove_running(job.name)
@@ -1165,19 +1309,16 @@
   def run(): Build.Results = {
     val vacuous =
       synchronized_database("Build_Process.init") {
-        if (build_context.master) {
-          _build_cluster.init()
-          _state = init_state(_state)
-        }
-        _state = _state.copy(numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling))
+        _build_cluster.init()
+        init_unsynchronized()
         build_context.master && _state.pending.isEmpty
       }
     if (vacuous) {
       progress.echo_warning("Nothing to build")
+      if (build_context.master) stop_build()
       Build.Results(build_context)
     }
     else {
-      if (build_context.master) start_build()
       start_worker()
       _build_cluster.start()
 
--- a/src/Pure/Build/build_schedule.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Build/build_schedule.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -469,11 +469,9 @@
     serial: Long = 0,
   ) {
     require(serial >= 0, "serial underflow")
-    def inc_serial: Schedule = {
-      require(serial < Long.MaxValue, "serial overflow")
-      copy(serial = serial + 1)
-    }
-    
+
+    def next_serial: Long = Build_Process.State.inc_serial(serial)
+
     def end: Date =
       if (graph.is_empty) start
       else graph.maximals.map(graph.get_node).map(_.end).maxBy(_.unix_epoch)
@@ -503,7 +501,6 @@
 
     def update(state: Build_Process.State): Schedule = {
       val start1 = Date.now()
-      val pending = state.pending.map(_.name).toSet
 
       def shift_elapsed(graph: Schedule.Graph, name: String): Schedule.Graph =
         graph.map_node(name, { node =>
@@ -517,7 +514,8 @@
           node.copy(start = starts.max(Date.Ordering))
         })
 
-      val graph0 = state.running.keys.foldLeft(graph.restrict(pending.contains))(shift_elapsed)
+      val graph0 =
+        state.running.keys.foldLeft(graph.restrict(state.pending.isDefinedAt))(shift_elapsed)
       val graph1 = graph0.topological_order.foldLeft(graph0)(shift_starts)
 
       copy(start = start1, graph = graph1)
@@ -859,7 +857,8 @@
               _schedule = old_schedule
               val res = body
               _state =
-                Build_Process.private_data.update_database(db, worker_uuid, _state, old_state)
+                Build_Process.private_data.update_database(
+                  db, build_id, worker_uuid, _state, old_state)
               _schedule = Build_Schedule.private_data.update_schedule(db, _schedule, old_schedule)
               res
             }
@@ -1063,6 +1062,12 @@
   object private_data extends SQL.Data("isabelle_build") {
     import Build_Process.private_data.{Base, Generic}
 
+    override lazy val tables: SQL.Tables =
+      SQL.Tables(Schedules.table, Nodes.table)
+
+    lazy val all_tables: SQL.Tables =
+      SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list)
+
 
     /* schedule */
 
@@ -1081,10 +1086,10 @@
           SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))),
           _.long(Schedules.serial)).getOrElse(0L)
 
-    def read_scheduled_builds_domain(db: SQL.Database): List[String] =
+    def read_scheduled_builds_domain(db: SQL.Database): Map[String, Unit] =
       db.execute_query_statement(
         Schedules.table.select(List(Schedules.build_uuid)),
-        List.from[String], res => res.string(Schedules.build_uuid))
+        Map.from[String, Unit], res => res.string(Schedules.build_uuid) -> ())
 
     def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = {
       val schedules =
@@ -1191,7 +1196,7 @@
         schedule.graph != old_schedule.graph
       
       val schedule1 =
-        if (changed) schedule.copy(serial = old_schedule.serial).inc_serial else schedule
+        if (changed) schedule.copy(serial = old_schedule.next_serial) else schedule
       if (schedule1.serial != schedule.serial) write_schedule(db, schedule1)
       
       schedule1
@@ -1207,18 +1212,12 @@
       val running_builds_domain =
         db.execute_query_statement(
           Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)),
-          List.from[String], res => res.string(Base.build_uuid))
-
-      val (remove, _) =
-        Library.symmetric_difference(read_scheduled_builds_domain(db), running_builds_domain)
+          Map.from[String, Unit], res => res.string(Base.build_uuid) -> ())
 
-      remove_schedules(db, remove)
-    }
+      val update = Library.Update.make(read_scheduled_builds_domain(db), running_builds_domain)
 
-    override val tables: SQL.Tables = SQL.Tables(Schedules.table, Nodes.table)
-
-    val all_tables: SQL.Tables =
-      SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list)
+      remove_schedules(db, update.delete)
+    }
   }
 
 
@@ -1329,11 +1328,10 @@
       val timing_data = Timing_Data.load(host_infos, log_database)
 
       val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress)
-      def task(session: Build_Job.Session_Context): Build_Process.Task =
-        Build_Process.Task(session.name, session.deps, build_context.build_uuid)
 
       val build_state =
-        Build_Process.State(sessions = sessions, pending = sessions.iterator.map(task).toList)
+        Build_Process.State(sessions = sessions,
+          pending = Map.from(sessions.iterator.map(Build_Process.Task.entry(_, build_context))))
 
       val scheduler = Build_Engine.scheduler(timing_data, build_context)
       def schedule_msg(res: Exn.Result[Schedule]): String =
--- a/src/Pure/Build/export.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Build/export.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -31,7 +31,7 @@
   /* SQL data model */
 
   object private_data extends SQL.Data() {
-    override lazy val tables = SQL.Tables(Base.table)
+    override lazy val tables: SQL.Tables = SQL.Tables(Base.table)
 
     object Base {
       val session_name = SQL.Column.string("session_name").make_primary_key
--- a/src/Pure/Build/store.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Build/store.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -114,7 +114,7 @@
   /* SQL data model */
 
   object private_data extends SQL.Data() {
-    override lazy val tables = SQL.Tables(Session_Info.table, Sources.table)
+    override lazy val tables: SQL.Tables = SQL.Tables(Session_Info.table, Sources.table)
 
     object Session_Info {
       val session_name = SQL.Column.string("session_name").make_primary_key
--- a/src/Pure/General/sql.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/General/sql.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -57,7 +57,6 @@
 
   val join_outer: Source = " LEFT OUTER JOIN "
   val join_inner: Source = " INNER JOIN "
-  def join(outer: Boolean): Source = if (outer) join_outer else join_inner
 
   def MULTI(args: Iterable[Source]): Source =
     args.iterator.filter(_.nonEmpty).mkString(";\n")
@@ -81,9 +80,12 @@
   def equal(sql: Source, x: Long): Source = sql + " = " + x
   def equal(sql: Source, x: String): Source = sql + " = " + string(x)
 
+  def member_int(sql: Source, set: Iterable[Int]): Source =
+    if (set.isEmpty) FALSE else OR(set.iterator.map(equal(sql, _)).toList)
+  def member_long(sql: Source, set: Iterable[Long]): Source =
+    if (set.isEmpty) FALSE else OR(set.iterator.map(equal(sql, _)).toList)
   def member(sql: Source, set: Iterable[String]): Source =
-    if (set.isEmpty) FALSE
-    else OR(set.iterator.map(equal(sql, _)).toList)
+    if (set.isEmpty) FALSE else OR(set.iterator.map(equal(sql, _)).toList)
 
   def where(sql: Source): Source = if_proper(sql, " WHERE " + sql)
   def where_and(args: Source*): Source = where(and(args:_*))
@@ -163,7 +165,12 @@
     def where_equal(x: Long): Source = SQL.where(equal(x))
     def where_equal(x: String): Source = SQL.where(equal(x))
 
+    def member_int(set: Iterable[Int]): Source = SQL.member_int(ident, set)
+    def member_long(set: Iterable[Long]): Source = SQL.member_long(ident, set)
     def member(set: Iterable[String]): Source = SQL.member(ident, set)
+
+    def where_member_int(set: Iterable[Int]): Source = SQL.where(member_int(set))
+    def where_member_long(set: Iterable[Long]): Source = SQL.where(member_long(set))
     def where_member(set: Iterable[String]): Source = SQL.where(member(set))
 
     def max: Column = copy(expr = "MAX(" + ident + ")")
@@ -234,6 +241,11 @@
 
     def iterator: Iterator[Table] = list.iterator
 
+    def index(table: Table): Int =
+      iterator.zipWithIndex
+        .collectFirst({ case (t, i) if t.name == table.name => i })
+        .getOrElse(error("No table " + quote(table.name)))
+
     // requires transaction
     def lock(db: Database, create: Boolean = false): Boolean = {
       if (create) foreach(db.create_table(_))
@@ -549,6 +561,8 @@
 
     def insert_permissive(table: Table, sql: Source = ""): Source
 
+    def destroy(table: Table): Source = "DROP TABLE IF EXISTS " + table
+
 
     /* tables and views */
 
@@ -779,6 +793,9 @@
     def insert_permissive(table: SQL.Table, sql: SQL.Source = ""): SQL.Source =
       table.insert_cmd(sql = if_proper(sql, sql + " ") + "ON CONFLICT DO NOTHING")
 
+    override def destroy(table: SQL.Table): SQL.Source =
+      super.destroy(table) + " CASCADE"
+
 
     /* explicit locking: only applicable to PostgreSQL within transaction context */
     // see https://www.postgresql.org/docs/14/sql-lock.html
--- a/src/Pure/ML/ml_heap.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/ML/ml_heap.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -41,7 +41,7 @@
   sealed case class Log_DB(uuid: String, content: Bytes)
 
   object private_data extends SQL.Data("isabelle_heaps") {
-    override lazy val tables = SQL.Tables(Base.table, Slices.table)
+    override lazy val tables: SQL.Tables = SQL.Tables(Base.table, Slices.table)
 
     object Generic {
       val name = SQL.Column.string("name").make_primary_key
--- a/src/Pure/System/host.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/System/host.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -168,7 +168,7 @@
   object private_data extends SQL.Data("isabelle_host") {
     val database: Path = Path.explode("$ISABELLE_HOME_USER/host.db")
 
-    override lazy val tables = SQL.Tables(Node_Info.table, Info.table)
+    override lazy val tables: SQL.Tables = SQL.Tables(Node_Info.table, Info.table)
 
     object Node_Info {
       val hostname = SQL.Column.string("hostname").make_primary_key
--- a/src/Pure/System/progress.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/System/progress.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -56,7 +56,8 @@
   object private_data extends SQL.Data("isabelle_progress") {
     val database: Path = Path.explode("$ISABELLE_HOME_USER/progress.db")
 
-    override lazy val tables = SQL.Tables(Base.table, Agents.table, Messages.table)
+    override lazy val tables: SQL.Tables =
+      SQL.Tables(Base.table, Agents.table, Messages.table)
 
     object Base {
       val context_uuid = SQL.Column.string("context_uuid").make_primary_key
--- a/src/Pure/Thy/document_build.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Thy/document_build.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -55,7 +55,7 @@
   /* SQL data model */
 
   object private_data extends SQL.Data("isabelle_documents") {
-    override lazy val tables = SQL.Tables(Base.table)
+    override lazy val tables: SQL.Tables = SQL.Tables(Base.table)
 
     object Base {
       val session_name = SQL.Column.string("session_name").make_primary_key
--- a/src/Pure/Tools/server.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Tools/server.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -366,7 +366,7 @@
   object private_data extends SQL.Data() {
     val database = Path.explode("$ISABELLE_HOME_USER/servers.db")
 
-    override lazy val tables = SQL.Tables(Base.table)
+    override lazy val tables: SQL.Tables = SQL.Tables(Base.table)
 
     object Base {
       val name = SQL.Column.string("name").make_primary_key
--- a/src/Pure/library.scala	Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/library.scala	Mon Mar 11 08:46:20 2024 +0100
@@ -284,8 +284,33 @@
       case _ => error(message)
     }
 
-  def symmetric_difference[A](xs: List[A], ys: List[A]): (List[A], List[A]) =
-    (xs.filterNot(ys.toSet), ys.filterNot(xs.toSet))
+
+  /* data update */
+
+  object Update {
+    type Data[A] = Map[String, A]
+
+    val empty: Update = Update()
+
+    def make[A](a: Data[A], b: Data[A], kind: Int = 0): Update =
+      if (a eq b) empty
+      else {
+        val delete = List.from(for ((x, y) <- a.iterator if !b.get(x).contains(y)) yield x)
+        val insert = List.from(for ((x, y) <- b.iterator if !a.get(x).contains(y)) yield x)
+        Update(delete = delete, insert = insert, kind = kind)
+      }
+  }
+
+  sealed case class Update(
+    delete: List[String] = Nil,
+    insert: List[String] = Nil,
+    kind: Int = 0
+  ) {
+    def deletes: Boolean = delete.nonEmpty
+    def inserts: Boolean = insert.nonEmpty
+    def defined: Boolean = deletes || inserts
+    lazy val domain: Set[String] = delete.toSet ++ insert
+  }
 
 
   /* proper values */