separate static build_uuid from dynamic worker_uuid, to allow multiple worker processes participate in one build process;
authorwenzelm
Mon, 06 Mar 2023 09:32:18 +0100
changeset 77529 40ccee0fe19a
parent 77528 26ec258e5cf8
child 77530 3362f84a300a
separate static build_uuid from dynamic worker_uuid, to allow multiple worker processes participate in one build process;
src/Pure/Tools/build_job.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/Tools/build_job.scala	Sun Mar 05 20:41:45 2023 +0100
+++ b/src/Pure/Tools/build_job.scala	Mon Mar 06 09:32:18 2023 +0100
@@ -50,7 +50,7 @@
 
   object Session_Context {
     def load(
-      uuid: String,
+      build_uuid: String,
       name: String,
       deps: List[String],
       ancestors: List[String],
@@ -61,7 +61,7 @@
     ): Session_Context = {
       def default: Session_Context =
         Session_Context(
-          name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty, uuid)
+          name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty, build_uuid)
 
       store.try_open_database(name) match {
         case None => default
@@ -80,7 +80,7 @@
                 case _ => Time.zero
               }
             new Session_Context(
-              name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings, uuid)
+              name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings, build_uuid)
           }
           catch {
             case ERROR(msg) => ignore_error(msg)
@@ -100,7 +100,7 @@
     timeout: Time,
     old_time: Time,
     old_command_timings_blob: Bytes,
-    uuid: String
+    build_uuid: String
   ) {
     override def toString: String = name
   }
@@ -498,7 +498,8 @@
                 sources = build_context.sources_shasum(session_name),
                 input_heaps = input_shasum,
                 output_heap = output_shasum,
-                process_result.rc, build_context.uuid)))
+                process_result.rc,
+                build_context.build_uuid)))
 
         // messages
         process_result.err_lines.foreach(progress.echo(_))
--- a/src/Pure/Tools/build_process.scala	Sun Mar 05 20:41:45 2023 +0100
+++ b/src/Pure/Tools/build_process.scala	Mon Mar 06 09:32:18 2023 +0100
@@ -28,7 +28,7 @@
       fresh_build: Boolean = false,
       no_build: Boolean = false,
       session_setup: (String, Session) => Unit = (_, _) => (),
-      uuid: String = UUID.random().toString
+      build_uuid: String = UUID.random().toString
     ): Context = {
       val sessions_structure = build_deps.sessions_structure
       val build_graph = sessions_structure.build_graph
@@ -42,7 +42,7 @@
             val sources_shasum = build_deps.sources_shasum(name)
             val session_context =
               Build_Job.Session_Context.load(
-                uuid, name, deps, ancestors, sources_shasum, info.timeout, store,
+                build_uuid, name, deps, ancestors, sources_shasum, info.timeout, store,
                 progress = progress)
             name -> session_context
           })
@@ -82,11 +82,10 @@
       val numa_nodes = Host.numa_nodes(enabled = numa_shuffling)
       new Context(store, build_deps, sessions, ordering, hostname, numa_nodes,
         build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build,
-        no_build = no_build, session_setup, uuid = uuid)
+        no_build = no_build, session_setup, build_uuid = build_uuid)
     }
   }
 
-  // static context of one particular instance, identified by uuid
   final class Context private(
     val store: Sessions.Store,
     val build_deps: Sessions.Deps,
@@ -99,7 +98,7 @@
     val fresh_build: Boolean,
     val no_build: Boolean,
     val session_setup: (String, Session) => Unit,
-    val uuid: String
+    val build_uuid: String
   ) {
     def build_options: Options = store.options
 
@@ -162,7 +161,6 @@
     }
   }
 
-  // dynamic state of various instances, distinguished by uuid
   sealed case class State(
     serial: Long = 0,
     progress_seen: Long = 0,
@@ -241,34 +239,38 @@
       SQL.Table("isabelle_build" + if_proper(name, "_" + name), columns, body = body)
 
     object Generic {
-      val uuid = SQL.Column.string("uuid")
+      val build_uuid = SQL.Column.string("build_uuid")
+      val worker_uuid = SQL.Column.string("worker_uuid")
       val name = SQL.Column.string("name")
 
-      def sql_equal(uuid: String = "", name: String = ""): SQL.Source =
+      def sql(
+        build_uuid: String = "",
+        worker_uuid: String = "",
+        name: String = "",
+        names: Iterable[String] = Nil
+      ): SQL.Source =
         SQL.and(
-          if_proper(uuid, Generic.uuid.equal(uuid)),
-          if_proper(name, Generic.name.equal(name)))
-
-      def sql_member(uuid: String = "", names: Iterable[String] = Nil): SQL.Source =
-        SQL.and(
-          if_proper(uuid, Generic.uuid.equal(uuid)),
+          if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)),
+          if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)),
+          if_proper(name, Generic.name.equal(name)),
           if_proper(names, Generic.name.member(names)))
     }
 
     object Base {
-      val uuid = Generic.uuid.make_primary_key
+      val build_uuid = Generic.build_uuid.make_primary_key
       val ml_platform = SQL.Column.string("ml_platform")
       val options = SQL.Column.string("options")
 
-      val table = make_table("", List(uuid, ml_platform, options))
+      val table = make_table("", List(build_uuid, ml_platform, options))
     }
 
-    object Serial {
-      val uuid = Generic.uuid.make_primary_key
+    object Workers {
+      val worker_uuid = Generic.worker_uuid.make_primary_key
+      val build_uuid = Generic.build_uuid
       val stamp = SQL.Column.date("stamp")
       val serial = SQL.Column.long("serial")
 
-      val table = make_table("serial", List(uuid, stamp, serial))
+      val table = make_table("workers", List(worker_uuid, build_uuid, stamp, serial))
 
       val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")")
     }
@@ -278,9 +280,9 @@
       val kind = SQL.Column.int("kind")
       val text = SQL.Column.string("text")
       val verbose = SQL.Column.bool("verbose")
-      val uuid = Generic.uuid
+      val build_uuid = Generic.build_uuid
 
-      val table = make_table("progress", List(serial, kind, text, verbose, uuid))
+      val table = make_table("progress", List(serial, kind, text, verbose, build_uuid))
     }
 
     object Sessions {
@@ -291,10 +293,10 @@
       val timeout = SQL.Column.long("timeout")
       val old_time = SQL.Column.long("old_time")
       val old_command_timings = SQL.Column.bytes("old_command_timings")
-      val uuid = Generic.uuid
+      val build_uuid = Generic.build_uuid
 
       val table = make_table("sessions",
-        List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, uuid))
+        List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, build_uuid))
     }
 
     object Pending {
@@ -329,33 +331,35 @@
           List(name, hostname, numa_node, rc, out, err, timing_elapsed, timing_cpu, timing_gc))
     }
 
-    def get_serial(db: SQL.Database, uuid: String = ""): Long =
+    def get_serial(db: SQL.Database, worker_uuid: String = ""): Long =
       db.using_statement(
-        Serial.table.select(List(Serial.serial_max),
-          sql = SQL.where(Generic.sql_equal(uuid = uuid)))
-      )(stmt => stmt.execute_query().iterator(_.long(Serial.serial)).nextOption.getOrElse(0L))
+        Workers.table.select(List(Workers.serial_max),
+          sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))
+      )(stmt => stmt.execute_query().iterator(_.long(Workers.serial)).nextOption.getOrElse(0L))
 
-    def set_serial(db: SQL.Database, uuid: String, stamp: Date, serial: Long): Unit =
-      if (get_serial(db, uuid = uuid) != serial) {
+    def set_serial(db: SQL.Database, worker_uuid: String, build_uuid: String, serial: Long): Unit = {
+      if (get_serial(db, worker_uuid = worker_uuid) != serial) {
         db.using_statement(
-          Serial.table.delete(sql = SQL.where(Generic.sql_equal(uuid = uuid)))
+          Workers.table.delete(sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))
         )(_.execute())
-        db.using_statement(Serial.table.insert()) { stmt =>
-          stmt.string(1) = uuid
-          stmt.date(2) = stamp
-          stmt.long(3) = serial
+        db.using_statement(Workers.table.insert()) { stmt =>
+          stmt.string(1) = worker_uuid
+          stmt.string(2) = build_uuid
+          stmt.date(3) = db.now()
+          stmt.long(4) = serial
           stmt.execute()
         }
       }
+    }
 
-    def read_progress(db: SQL.Database, seen: Long = 0, uuid: String = ""): Progress_Messages =
+    def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages =
       db.using_statement(
         Progress.table.select(
           sql =
             SQL.where(
               SQL.and(
                 if (seen <= 0) "" else Progress.serial.ident + " > " + seen,
-                Generic.sql_equal(uuid = uuid))))
+                Generic.sql(build_uuid = build_uuid))))
       ) { stmt =>
           SortedMap.from(stmt.execute_query().iterator { res =>
             val serial = res.long(Progress.serial)
@@ -370,14 +374,14 @@
       db: SQL.Database,
       message_serial: Long,
       message: isabelle.Progress.Message,
-      uuid: String
+      build_uuid: String
     ): Unit = {
       db.using_statement(Progress.table.insert()) { stmt =>
         stmt.long(1) = message_serial
         stmt.int(2) = message.kind.id
         stmt.string(3) = message.text
         stmt.bool(4) = message.verbose
-        stmt.string(5) = uuid
+        stmt.string(5) = build_uuid
         stmt.execute()
       }
     }
@@ -398,9 +402,9 @@
             val timeout = Time.ms(res.long(Sessions.timeout))
             val old_time = Time.ms(res.long(Sessions.old_time))
             val old_command_timings_blob = res.bytes(Sessions.old_command_timings)
-            val uuid = res.string(Sessions.uuid)
+            val build_uuid = res.string(Sessions.build_uuid)
             name -> Build_Job.Session_Context(name, deps, ancestors, sources_shasum,
-              timeout, old_time, old_command_timings_blob, uuid)
+              timeout, old_time, old_command_timings_blob, build_uuid)
           })
         }
 
@@ -417,7 +421,7 @@
           stmt.long(5) = session.timeout.ms
           stmt.long(6) = session.old_time.ms
           stmt.bytes(7) = session.old_command_timings_blob
-          stmt.string(8) = session.uuid
+          stmt.string(8) = session.build_uuid
           stmt.execute()
         }
       }
@@ -443,7 +447,7 @@
       if (delete.nonEmpty) {
         db.using_statement(
           Pending.table.delete(
-            sql = SQL.where(Generic.sql_member(names = delete.map(_.name)))))(_.execute())
+            sql = SQL.where(Generic.sql(names = delete.map(_.name)))))(_.execute())
       }
 
       for (entry <- insert) {
@@ -478,7 +482,7 @@
       if (delete.nonEmpty) {
         db.using_statement(
           Running.table.delete(
-            sql = SQL.where(Generic.sql_member(names = delete.map(_.job_name)))))(_.execute())
+            sql = SQL.where(Generic.sql(names = delete.map(_.job_name)))))(_.execute())
       }
 
       for (job <- insert) {
@@ -551,7 +555,7 @@
       val tables =
         List(
           Base.table,
-          Serial.table,
+          Workers.table,
           Progress.table,
           Sessions.table,
           Pending.table,
@@ -570,7 +574,7 @@
       for (table <- tables) db.using_statement(table.delete())(_.execute())
 
       db.using_statement(Base.table.insert()) { stmt =>
-        stmt.string(1) = build_context.uuid
+        stmt.string(1) = build_context.build_uuid
         stmt.string(2) = Isabelle_System.getenv("ML_PLATFORM")
         stmt.string(3) = build_context.store.options.make_prefs(Options.init(prefs = ""))
         stmt.execute()
@@ -579,7 +583,8 @@
 
     def update_database(
       db: SQL.Database,
-      uuid: String,
+      worker_uuid: String,
+      build_uuid: String,
       hostname: String,
       state: State
     ): State = {
@@ -594,7 +599,7 @@
       val serial0 = get_serial(db)
       val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
 
-      set_serial(db, uuid, db.now(), serial)
+      set_serial(db, worker_uuid, build_uuid, serial)
       state.set_serial(serial)
     }
   }
@@ -614,6 +619,8 @@
   protected val store: Sessions.Store = build_context.store
   protected val build_options: Options = store.options
   protected val build_deps: Sessions.Deps = build_context.build_deps
+  protected val build_uuid: String = build_context.build_uuid
+  protected val worker_uuid: String = UUID.random().toString
 
 
   /* global state: internal var vs. external database */
@@ -645,7 +652,7 @@
       for (db <- _database) {
         _state =
           Build_Process.Data.update_database(
-            db, build_context.uuid, build_context.hostname, _state)
+            db, worker_uuid, build_uuid, build_context.hostname, _state)
       }
     }
 
@@ -656,8 +663,8 @@
     synchronized_database {
       val state1 = _state.inc_serial.progress_serial()
       for (db <- _database) {
-        Build_Process.Data.write_progress(db, state1.serial, message, build_context.uuid)
-        Build_Process.Data.set_serial(db, build_context.uuid, db.now(), state1.serial)
+        Build_Process.Data.write_progress(db, state1.serial, message, build_uuid)
+        Build_Process.Data.set_serial(db, worker_uuid, build_uuid, state1.serial)
       }
       body
       _state = state1