merged
authorwenzelm
Fri, 03 Mar 2023 20:11:08 +0100
changeset 77497 b9e01beef1c5
parent 77492 fd9422c110ed (current diff)
parent 77496 f0d9f9196b9b (diff)
child 77498 2d12cb7ff829
child 77690 71d075d18b6e
merged
--- a/src/Pure/Admin/build_log.scala	Fri Mar 03 12:22:07 2023 +0000
+++ b/src/Pure/Admin/build_log.scala	Fri Mar 03 20:11:08 2023 +0100
@@ -1092,9 +1092,16 @@
                 chapter = res.string(Data.chapter),
                 groups = split_lines(res.string(Data.groups)),
                 threads = res.get_int(Data.threads),
-                timing = res.timing(Data.timing_elapsed, Data.timing_cpu, Data.timing_gc),
+                timing =
+                  res.timing(
+                    Data.timing_elapsed,
+                    Data.timing_cpu,
+                    Data.timing_gc),
                 ml_timing =
-                  res.timing(Data.ml_timing_elapsed, Data.ml_timing_cpu, Data.ml_timing_gc),
+                  res.timing(
+                    Data.ml_timing_elapsed,
+                    Data.ml_timing_cpu,
+                    Data.ml_timing_gc),
                 sources = res.get_string(Data.sources),
                 heap_size = res.get_long(Data.heap_size).map(Space.bytes),
                 status = res.get_string(Data.status).map(Session_Status.withName),
--- a/src/Pure/Tools/build_job.scala	Fri Mar 03 12:22:07 2023 +0000
+++ b/src/Pure/Tools/build_job.scala	Fri Mar 03 20:11:08 2023 +0100
@@ -46,6 +46,7 @@
 
   object Session_Context {
     def load(
+      uuid: String,
       name: String,
       deps: List[String],
       ancestors: List[String],
@@ -55,7 +56,8 @@
       progress: Progress = new Progress
     ): Session_Context = {
       def default: Session_Context =
-        new Session_Context(name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty)
+        Session_Context(
+          name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty, uuid)
 
       store.try_open_database(name) match {
         case None => default
@@ -74,7 +76,7 @@
                 case _ => Time.zero
               }
             new Session_Context(
-              name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings)
+              name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings, uuid)
           }
           catch {
             case ERROR(msg) => ignore_error(msg)
@@ -86,14 +88,15 @@
     }
   }
 
-  final class Session_Context(
-    val name: String,
-    val deps: List[String],
-    val ancestors: List[String],
-    val sources_shasum: SHA1.Shasum,
-    val timeout: Time,
-    val old_time: Time,
-    val old_command_timings_blob: Bytes
+  sealed case class Session_Context(
+    name: String,
+    deps: List[String],
+    ancestors: List[String],
+    sources_shasum: SHA1.Shasum,
+    timeout: Time,
+    old_time: Time,
+    old_command_timings_blob: Bytes,
+    uuid: String
   ) {
     override def toString: String = name
   }
@@ -111,13 +114,13 @@
     def session_name: String = session_background.session_name
     def job_name: String = session_name
 
-    val info: Sessions.Info = session_background.sessions_structure(session_name)
-    val options: Options = Host.process_policy_options(info.options, node_info.numa_node)
+    private val info: Sessions.Info = session_background.sessions_structure(session_name)
+    private val options: Options = Host.process_policy_options(info.options, node_info.numa_node)
 
-    val session_sources: Sessions.Sources =
+    private val session_sources =
       Sessions.Sources.load(session_background.base, cache = store.cache.compress)
 
-    val store_heap = build_context.store_heap(session_name)
+    private val store_heap = build_context.store_heap(session_name)
 
     private val future_result: Future[(Process_Result, SHA1.Shasum)] =
       Future.thread("build", uninterruptible = true) {
--- a/src/Pure/Tools/build_process.scala	Fri Mar 03 12:22:07 2023 +0000
+++ b/src/Pure/Tools/build_process.scala	Fri Mar 03 20:11:08 2023 +0100
@@ -42,7 +42,8 @@
             val sources_shasum = build_deps.sources_shasum(name)
             val session_context =
               Build_Job.Session_Context.load(
-                name, deps, ancestors, sources_shasum, info.timeout, store, progress = progress)
+                uuid, name, deps, ancestors, sources_shasum, info.timeout, store,
+                progress = progress)
             name -> session_context
           })
 
@@ -85,10 +86,11 @@
     }
   }
 
+  // static context of one particular instance, identified by uuid
   final class Context private(
     val store: Sessions.Store,
     val build_deps: Sessions.Deps,
-    val sessions: Map[String, Build_Job.Session_Context],
+    val sessions: State.Sessions,
     val ordering: Ordering[String],
     val progress: Progress,
     val hostname: String,
@@ -145,12 +147,21 @@
     def ok: Boolean = process_result.ok
   }
 
+  object State {
+    type Sessions = Map[String, Build_Job.Session_Context]
+    type Pending = List[Entry]
+    type Running = Map[String, Build_Job]
+    type Results = Map[String, Result]
+  }
+
+  // dynamic state of various instances, distinguished by uuid
   sealed case class State(
     serial: Long = 0,
     numa_index: Int = 0,
-    pending: List[Entry] = Nil,
-    running: Map[String, Build_Job] = Map.empty,
-    results: Map[String, Build_Process.Result] = Map.empty
+    sessions: State.Sessions = Map.empty,   // static build targets
+    pending: State.Pending = Nil,           // dynamic build "queue"
+    running: State.Running = Map.empty,     // presently running jobs
+    results: State.Results = Map.empty      // finished results
   ) {
     def numa_next(numa_nodes: List[Int]): (Option[Int], State) =
       if (numa_nodes.isEmpty) (None, this)
@@ -235,6 +246,20 @@
       val table = make_table("serial", List(serial))
     }
 
+    object Sessions {
+      val name = Generic.name.make_primary_key
+      val deps = SQL.Column.string("deps")
+      val ancestors = SQL.Column.string("ancestors")
+      val sources = SQL.Column.string("sources")
+      val timeout = SQL.Column.long("timeout")
+      val old_time = SQL.Column.long("old_time")
+      val old_command_timings = SQL.Column.bytes("old_command_timings")
+      val uuid = Generic.uuid
+
+      val table = make_table("sessions",
+        List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, uuid))
+    }
+
     object Pending {
       val name = Generic.name.make_primary_key
       val deps = SQL.Column.string("deps")
@@ -280,6 +305,49 @@
         }
       }
 
+    def read_sessions_domain(db: SQL.Database): Set[String] =
+      db.using_statement(Sessions.table.select(List(Sessions.name)))(stmt =>
+        Set.from(stmt.execute_query().iterator(_.string(Sessions.name))))
+
+    def read_sessions(db: SQL.Database, names: Iterable[String] = Nil): State.Sessions =
+      db.using_statement(
+        Sessions.table.select(sql = if_proper(names, Sessions.name.where_member(names)))
+      ) { stmt =>
+          Map.from(stmt.execute_query().iterator { res =>
+            val name = res.string(Sessions.name)
+            val deps = split_lines(res.string(Sessions.deps))
+            val ancestors = split_lines(res.string(Sessions.ancestors))
+            val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources))
+            val timeout = Time.ms(res.long(Sessions.timeout))
+            val old_time = Time.ms(res.long(Sessions.old_time))
+            val old_command_timings_blob = res.bytes(Sessions.old_command_timings)
+            val uuid = res.string(Sessions.uuid)
+            name -> Build_Job.Session_Context(name, deps, ancestors, sources_shasum,
+              timeout, old_time, old_command_timings_blob, uuid)
+          })
+        }
+
+    def update_sessions(db:SQL.Database, sessions: State.Sessions): Boolean = {
+      val old_sessions = read_sessions_domain(db)
+      val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList
+
+      for ((name, session) <- insert) {
+        db.using_statement(Sessions.table.insert()) { stmt =>
+          stmt.string(1) = name
+          stmt.string(2) = cat_lines(session.deps)
+          stmt.string(3) = cat_lines(session.ancestors)
+          stmt.string(4) = session.sources_shasum.toString
+          stmt.long(5) = session.timeout.ms
+          stmt.long(6) = session.old_time.ms
+          stmt.bytes(7) = session.old_command_timings_blob
+          stmt.string(8) = session.uuid
+          stmt.execute()
+        }
+      }
+
+      insert.nonEmpty
+    }
+
     def read_pending(db: SQL.Database): List[Entry] =
       db.using_statement(Pending.table.select(sql = SQL.order_by(List(Pending.name)))) { stmt =>
         List.from(
@@ -291,7 +359,7 @@
           })
       }
 
-    def update_pending(db: SQL.Database, pending: List[Entry]): Boolean = {
+    def update_pending(db: SQL.Database, pending: State.Pending): Boolean = {
       val old_pending = read_pending(db)
       val (delete, insert) = Library.symmetric_difference(old_pending, pending)
 
@@ -324,7 +392,7 @@
           })
       }
 
-    def update_running(db: SQL.Database, running: Map[String, Build_Job]): Boolean = {
+    def update_running(db: SQL.Database, running: State.Running): Boolean = {
       val old_running = read_running(db)
       val abs_running = running.valuesIterator.map(_.make_abstract).toList
 
@@ -348,36 +416,38 @@
       delete.nonEmpty || insert.nonEmpty
     }
 
+    def read_results_domain(db: SQL.Database): Set[String] =
+      db.using_statement(Results.table.select(List(Results.name)))(stmt =>
+        Set.from(stmt.execute_query().iterator(_.string(Results.name))))
+
     def read_results(db: SQL.Database, names: List[String] = Nil): Map[String, Build_Job.Result] =
       db.using_statement(
-        Results.table.select(sql = if_proper(names, Results.name.where_member(names)))) { stmt =>
-        Map.from(
-          stmt.execute_query().iterator { res =>
+        Results.table.select(sql = if_proper(names, Results.name.where_member(names)))
+      ) { stmt =>
+          Map.from(stmt.execute_query().iterator { res =>
             val name = res.string(Results.name)
             val hostname = res.string(Results.hostname)
             val numa_node = res.get_int(Results.numa_node)
             val rc = res.int(Results.rc)
             val out = res.string(Results.out)
             val err = res.string(Results.err)
-            val timing_elapsed = res.long(Results.timing_elapsed)
-            val timing_cpu = res.long(Results.timing_cpu)
-            val timing_gc = res.long(Results.timing_gc)
+            val timing =
+              res.timing(
+                Results.timing_elapsed,
+                Results.timing_cpu,
+                Results.timing_gc)
             val node_info = Host.Node_Info(hostname, numa_node)
             val process_result =
               Process_Result(rc,
                 out_lines = split_lines(out),
                 err_lines = split_lines(err),
-                timing = Timing(Time.ms(timing_elapsed), Time.ms(timing_cpu), Time.ms(timing_gc)))
+                timing = timing)
             name -> Build_Job.Result(node_info, process_result)
           })
-      }
+        }
 
-    def read_results_name(db: SQL.Database): Set[String] =
-      db.using_statement(Results.table.select(List(Results.name)))(stmt =>
-        Set.from(stmt.execute_query().iterator(_.string(Results.name))))
-
-    def update_results(db: SQL.Database, results: Map[String, Build_Process.Result]): Boolean = {
-      val old_results = read_results_name(db)
+    def update_results(db: SQL.Database, results: State.Results): Boolean = {
+      val old_results = read_results_domain(db)
       val insert = results.iterator.filterNot(p => old_results.contains(p._1)).toList
 
       for ((name, result) <- insert) {
@@ -405,6 +475,7 @@
         List(
           Base.table,
           Serial.table,
+          Sessions.table,
           Pending.table,
           Running.table,
           Results.table,
@@ -436,6 +507,7 @@
     ): State = {
       val changed =
         List(
+          update_sessions(db, state.sessions),
           update_pending(db, state.pending),
           update_running(db, state.running),
           update_results(db, state.results),
@@ -510,6 +582,12 @@
   /* policy operations */
 
   protected def init_state(state: Build_Process.State): Build_Process.State = {
+    val sessions1 =
+      build_context.sessions.foldLeft(state.sessions) { case (map, (name, session)) =>
+        if (state.sessions.isDefinedAt(name)) map
+        else map + (name -> session)
+      }
+
     val old_pending = state.pending.iterator.map(_.name).toSet
     val new_pending =
       List.from(
@@ -517,7 +595,9 @@
           (name, session_context) <- build_context.sessions.iterator
           if !old_pending(name)
         } yield Build_Process.Entry(name, session_context.deps))
-    state.copy(pending = new_pending ::: state.pending)
+    val pending1 = new_pending ::: state.pending
+
+    state.copy(sessions = sessions1, pending = pending1)
   }
 
   protected def next_job(state: Build_Process.State): Option[String] =