clarified static Build_Process.Context vs. dynamic Build_Process.State;
authorwenzelm
Sat, 01 Jul 2023 16:42:57 +0200
changeset 78237 c2c59de57df9
parent 78236 f3a6140fa3b1
child 78238 8c0d3c879f7c
clarified static Build_Process.Context vs. dynamic Build_Process.State; more dynamic Build_Process.Sessions, to accomodate multiple workers (and multiple builds);
src/Pure/Tools/build.scala
src/Pure/Tools/build_job.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/Tools/build.scala	Sat Jul 01 16:32:46 2023 +0200
+++ b/src/Pure/Tools/build.scala	Sat Jul 01 16:42:57 2023 +0200
@@ -182,10 +182,10 @@
     /* build process and results */
 
     val build_context =
-      Build_Process.init_context(store, build_deps, progress = progress,
-        hostname = hostname(build_options), build_heap = build_heap,
-        numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build,
-        no_build = no_build, session_setup = session_setup, master = true)
+      Build_Process.Context(store, build_deps, hostname = hostname(build_options),
+        build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs,
+        fresh_build = fresh_build, no_build = no_build, session_setup = session_setup,
+        master = true)
 
     if (clean_build) {
       using_optional(store.maybe_open_database_server()) { database_server =>
@@ -453,9 +453,8 @@
       Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors
 
     val build_context =
-      Build_Process.init_context(store, build_deps, progress = progress,
-        hostname = hostname(build_options), numa_shuffling = numa_shuffling, max_jobs = max_jobs,
-        build_uuid = build_master.build_uuid)
+      Build_Process.Context(store, build_deps, hostname = hostname(build_options),
+        numa_shuffling = numa_shuffling, max_jobs = max_jobs, build_uuid = build_master.build_uuid)
 
     build_engine.run(build_context, progress)
   }
--- a/src/Pure/Tools/build_job.scala	Sat Jul 01 16:32:46 2023 +0200
+++ b/src/Pure/Tools/build_job.scala	Sat Jul 01 16:42:57 2023 +0200
@@ -21,15 +21,18 @@
 
   def start_session(
     build_context: Build_Process.Context,
+    session_context: Session_Context,
     progress: Progress,
     log: Logger,
     database_server: Option[SQL.Database],
     session_background: Sessions.Background,
+    sources_shasum: SHA1.Shasum,
     input_shasum: SHA1.Shasum,
-    node_info: Host.Node_Info
+    node_info: Host.Node_Info,
+    store_heap: Boolean
   ): Session_Job = {
-    new Session_Job(build_context, progress, log, database_server,
-      session_background, input_shasum, node_info)
+    new Session_Job(build_context, session_context, progress, log, database_server,
+      session_background, sources_shasum, input_shasum, node_info, store_heap)
   }
 
   object Session_Context {
@@ -93,12 +96,15 @@
 
   class Session_Job private[Build_Job](
     build_context: Build_Process.Context,
+    session_context: Session_Context,
     progress: Progress,
     log: Logger,
     database_server: Option[SQL.Database],
     session_background: Sessions.Background,
+    sources_shasum: SHA1.Shasum,
     input_shasum: SHA1.Shasum,
-    node_info: Host.Node_Info
+    node_info: Host.Node_Info,
+    store_heap: Boolean
   ) extends Build_Job {
     private val store = build_context.store
 
@@ -110,8 +116,6 @@
     private val session_sources =
       Store.Sources.load(session_background.base, cache = store.cache.compress)
 
-    private val store_heap = build_context.store_heap(session_name)
-
     private val future_result: Future[(Process_Result, SHA1.Shasum)] =
       Future.thread("build", uninterruptible = true) {
         val env =
@@ -159,7 +163,8 @@
 
         val resources =
           new Resources(session_background, log = log,
-            command_timings = build_context.old_command_timings(session_name))
+            command_timings =
+              Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache))
 
         val session =
           new Session(options, resources) {
@@ -485,7 +490,7 @@
               if (process_result.timeout) build_log.error("Timeout") else build_log,
             build =
               Store.Build_Info(
-                sources = build_context.sources_shasum(session_name),
+                sources = sources_shasum,
                 input_heaps = input_shasum,
                 output_heap = output_shasum,
                 process_result.rc,
--- a/src/Pure/Tools/build_process.scala	Sat Jul 01 16:32:46 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Sat Jul 01 16:42:57 2023 +0200
@@ -16,10 +16,9 @@
 object Build_Process {
   /** static context **/
 
-  def init_context(
+  sealed case class Context(
     store: Store,
-    build_deps: Sessions.Deps,
-    progress: Progress = new Progress,
+    build_deps: isabelle.Sessions.Deps,
     ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"),
     hostname: String = Isabelle_System.hostname(),
     numa_shuffling: Boolean = false,
@@ -29,79 +28,7 @@
     no_build: Boolean = false,
     session_setup: (String, Session) => Unit = (_, _) => (),
     build_uuid: String = UUID.random().toString,
-    master: Boolean = false,
-  ): Context = {
-    val sessions_structure = build_deps.sessions_structure
-    val build_graph = sessions_structure.build_graph
-
-    val sessions =
-      Map.from(
-        for ((name, (info, _)) <- build_graph.iterator)
-        yield {
-          val deps = info.parent.toList
-          val ancestors = sessions_structure.build_requirements(deps)
-          val sources_shasum = build_deps.sources_shasum(name)
-          val session_context =
-            Build_Job.Session_Context.load(
-              build_uuid, name, deps, ancestors, info.session_prefs, sources_shasum,
-              info.timeout, store, progress = progress)
-          name -> session_context
-        })
-
-    val sessions_time = {
-      val maximals = build_graph.maximals.toSet
-      def descendants_time(name: String): Double = {
-        if (maximals.contains(name)) sessions(name).old_time.seconds
-        else {
-          val descendants = build_graph.all_succs(List(name)).toSet
-          val g = build_graph.restrict(descendants)
-          (0.0 :: g.maximals.flatMap { desc =>
-            val ps = g.all_preds(List(desc))
-            if (ps.exists(p => !sessions.isDefinedAt(p))) None
-            else Some(ps.map(p => sessions(p).old_time.seconds).sum)
-          }).max
-        }
-      }
-      Map.from(
-        for (name <- sessions.keysIterator)
-        yield name -> descendants_time(name)).withDefaultValue(0.0)
-    }
-
-    val ordering =
-      new Ordering[String] {
-        def compare(name1: String, name2: String): Int =
-          sessions_time(name2) compare sessions_time(name1) match {
-            case 0 =>
-              sessions(name2).timeout compare sessions(name1).timeout match {
-                case 0 => name1 compare name2
-                case ord => ord
-              }
-            case ord => ord
-          }
-      }
-
-    val numa_nodes = Host.numa_nodes(enabled = numa_shuffling)
-
-    new Context(store, build_deps, sessions, ordering, ml_platform, hostname, numa_nodes,
-      build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build,
-      no_build = no_build, session_setup, build_uuid = build_uuid, master = master)
-  }
-
-  final class Context private[Build_Process](
-    val store: Store,
-    val build_deps: Sessions.Deps,
-    val sessions: State.Sessions,
-    val ordering: Ordering[String],
-    val ml_platform: String,
-    val hostname: String,
-    val numa_nodes: List[Int],
-    val build_heap: Boolean,
-    val max_jobs: Int,
-    val fresh_build: Boolean,
-    val no_build: Boolean,
-    val session_setup: (String, Session) => Unit,
-    val build_uuid: String,
-    val master: Boolean
+    master: Boolean = false
   ) {
     override def toString: String =
       "Build_Process.Context(build_uuid = " + quote(build_uuid) +
@@ -109,20 +36,7 @@
 
     def build_options: Options = store.options
 
-    def sessions_structure: Sessions.Structure = build_deps.sessions_structure
-
-    def sources_shasum(name: String): SHA1.Shasum = sessions(name).sources_shasum
-
-    def old_command_timings(name: String): List[Properties.T] =
-      sessions.get(name) match {
-        case Some(session_context) =>
-          Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache)
-        case None => Nil
-      }
-
-    def store_heap(name: String): Boolean =
-      build_heap || Sessions.is_pure(name) ||
-      sessions.valuesIterator.exists(_.ancestors.contains(name))
+    def sessions_structure: isabelle.Sessions.Structure = build_deps.sessions_structure
 
     def worker_active: Boolean = max_jobs > 0
   }
@@ -184,16 +98,122 @@
     def ok: Boolean = process_result.ok
   }
 
+  object Sessions {
+    type Graph = isabelle.Graph[String, Build_Job.Session_Context]
+    val empty: Sessions = new Sessions(Graph.string)
+  }
+
+  final class Sessions private(val graph: Sessions.Graph) {
+    override def toString: String = graph.toString
+
+    def apply(name: String): Build_Job.Session_Context = graph.get_node(name)
+
+    def iterator: Iterator[Build_Job.Session_Context] =
+      for (name <- graph.topological_order.iterator) yield apply(name)
+
+    def make(new_graph: Sessions.Graph): Sessions =
+      if (graph == new_graph) this
+      else {
+        new Sessions(
+          new_graph.iterator.foldLeft(new_graph) {
+            case (g, (name, (session, _))) => g.add_deps_acyclic(name, session.deps)
+          })
+      }
+
+    def pull(
+      data_domain: Set[String],
+      data: Set[String] => List[Build_Job.Session_Context]
+    ): Sessions = {
+      val dom = data_domain -- iterator.map(_.name)
+      make(data(dom).foldLeft(graph.restrict(dom)) { case (g, e) => g.new_node(e.name, e) })
+    }
+
+    def init(build_context: Context, progress: Progress = new Progress): Sessions = {
+      val sessions_structure = build_context.sessions_structure
+      make(
+        sessions_structure.build_graph.iterator.foldLeft(graph) {
+          case (graph0, (name, (info, _))) =>
+            val deps = info.parent.toList
+            val prefs = info.session_prefs
+            val ancestors = sessions_structure.build_requirements(deps)
+            val sources_shasum = build_context.build_deps.sources_shasum(name)
+
+            if (graph0.defined(name)) {
+              val session0 = graph0.get_node(name)
+              val prefs0 = session0.session_prefs
+              val ancestors0 = session0.ancestors
+              val sources_shasum0 = session0.sources_shasum
+
+              def err(msg: String, a: String, b: String): Nothing =
+                error("Conflicting dependencies for session " + quote(name) + ": " +
+                  msg + "\n" + a + "\nvs.\n" + b)
+
+              if (prefs0 != prefs) {
+                err("preferences disagree",
+                  Symbol.cartouche_decoded(prefs0), Symbol.cartouche_decoded(prefs))
+              }
+              if (ancestors0 != ancestors) {
+                err("ancestors disagree", commas_quote(ancestors0), commas_quote(ancestors))
+              }
+              if (sources_shasum0 != sources_shasum) {
+                val a = sources_shasum0 - sources_shasum
+                val b = sources_shasum - sources_shasum0
+                err("sources disagree", a.toString, b.toString)
+              }
+
+              graph0
+            }
+            else {
+              val session =
+                Build_Job.Session_Context.load(
+                  build_context.build_uuid, name, deps, ancestors, prefs, sources_shasum,
+                  info.timeout, build_context.store, progress = progress)
+              graph0.new_node(name, session)
+            }
+        }
+      )
+    }
+
+    lazy val max_time: Map[String, Double] = {
+      val maximals = graph.maximals.toSet
+      def descendants_time(name: String): Double = {
+        if (maximals.contains(name)) apply(name).old_time.seconds
+        else {
+          val descendants = graph.all_succs(List(name)).toSet
+          val g = graph.restrict(descendants)
+          (0.0 :: g.maximals.flatMap { desc =>
+            val ps = g.all_preds(List(desc))
+            if (ps.exists(p => !graph.defined(p))) None
+            else Some(ps.map(p => apply(p).old_time.seconds).sum)
+          }).max
+        }
+      }
+      Map.from(
+        for (name <- graph.keys_iterator)
+        yield name -> descendants_time(name)).withDefaultValue(0.0)
+    }
+
+    lazy val ordering: Ordering[String] =
+      (a: String, b: String) =>
+        max_time(b) compare max_time(a) match {
+          case 0 =>
+            apply(b).timeout compare apply(a).timeout match {
+              case 0 => a compare b
+              case ord => ord
+            }
+          case ord => ord
+        }
+  }
+
   sealed case class Snapshot(
     builds: List[Build],        // available build configurations
     workers: List[Worker],      // available worker processes
-    sessions: State.Sessions,   // static build targets
+    sessions: Sessions,         // static build targets
     pending: State.Pending,     // dynamic build "queue"
     running: State.Running,     // presently running jobs
     results: State.Results)     // finished results
 
   object State {
-    type Sessions = Map[String, Build_Job.Session_Context]
     type Pending = List[Task]
     type Running = Map[String, Job]
     type Results = Map[String, Result]
@@ -206,7 +226,8 @@
 
   sealed case class State(
     serial: Long = 0,
-    sessions: State.Sessions = Map.empty,
+    numa_nodes: List[Int] = Nil,
+    sessions: Sessions = Sessions.empty,
     pending: State.Pending = Nil,
     running: State.Running = Map.empty,
     results: State.Results = Map.empty
@@ -406,7 +427,7 @@
     def read_sessions(db: SQL.Database,
       names: Iterable[String] = Nil,
       build_uuid: String = ""
-    ): State.Sessions =
+    ): List[Build_Job.Session_Context] = {
       db.execute_query_statement(
         Sessions.table.select(
           sql =
@@ -414,7 +435,7 @@
               if_proper(names, Sessions.name.member(names)),
               if_proper(build_uuid, Sessions.build_uuid.equal(build_uuid)))
         ),
-        Map.from[String, Build_Job.Session_Context],
+        List.from[Build_Job.Session_Context],
         { res =>
           val name = res.string(Sessions.name)
           val deps = split_lines(res.string(Sessions.deps))
@@ -425,19 +446,20 @@
           val old_time = Time.ms(res.long(Sessions.old_time))
           val old_command_timings_blob = res.bytes(Sessions.old_command_timings)
           val build_uuid = res.string(Sessions.build_uuid)
-          name -> Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum,
+          Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum,
             timeout, old_time, old_command_timings_blob, build_uuid)
         }
       )
+    }
 
-    def update_sessions(db: SQL.Database, sessions: State.Sessions): Boolean = {
+    def update_sessions(db: SQL.Database, sessions: Build_Process.Sessions): Boolean = {
       val old_sessions = read_sessions_domain(db)
-      val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList
+      val insert = sessions.iterator.filterNot(s => old_sessions.contains(s.name)).toList
 
-      for ((name, session) <- insert) {
+      for (session <- insert) {
         db.execute_statement(Sessions.table.insert(), body =
           { stmt =>
-            stmt.string(1) = name
+            stmt.string(1) = session.name
             stmt.string(2) = cat_lines(session.deps)
             stmt.string(3) = cat_lines(session.ancestors)
             stmt.string(4) = session.session_prefs
@@ -756,7 +778,7 @@
         val serial = serial_db max state.serial
         stamp_worker(db, worker_uuid, serial)
 
-        val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions)
+        val sessions = state.sessions.pull(read_sessions_domain(db), read_sessions(db, _))
         val pending = read_pending(db)
         val running = pull0(read_running(db), state.running)
         val results = pull1(read_results_domain(db), read_results(db, _), state.results)
@@ -805,7 +827,7 @@
 
   protected final val store: Store = build_context.store
   protected final val build_options: Options = store.options
-  protected final val build_deps: Sessions.Deps = build_context.build_deps
+  protected final val build_deps: isabelle.Sessions.Deps = build_context.build_deps
   protected final val hostname: String = build_context.hostname
   protected final val build_uuid: String = build_context.build_uuid
 
@@ -888,46 +910,47 @@
   /* policy operations */
 
   protected def init_state(state: Build_Process.State): Build_Process.State = {
-    val sessions1 =
-      build_context.sessions.foldLeft(state.sessions) { case (map, (name, session)) =>
-        if (state.sessions.isDefinedAt(name)) map
-        else map + (name -> session)
-      }
+    val sessions1 = state.sessions.init(build_context, progress = build_progress)
 
     val old_pending = state.pending.iterator.map(_.name).toSet
     val new_pending =
       List.from(
-        for {
-          (name, session_context) <- build_context.sessions.iterator
-          if !old_pending(name)
-        } yield Build_Process.Task(name, session_context.deps, JSON.Object.empty, build_uuid))
+        for (session <- sessions1.iterator if !old_pending(session.name))
+          yield Build_Process.Task(session.name, session.deps, JSON.Object.empty, build_uuid))
     val pending1 = new_pending ::: state.pending
 
-    state.copy(sessions = sessions1, pending = pending1)
+    state.copy(
+      numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling),
+      sessions = sessions1,
+      pending = pending1)
   }
 
   protected def next_job(state: Build_Process.State): Option[String] =
     if (progress.stopped || state.running.size < build_context.max_jobs) {
       state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name))
-        .sortBy(_.name)(build_context.ordering)
+        .sortBy(_.name)(state.sessions.ordering)
         .headOption.map(_.name)
     }
     else None
 
   protected def start_session(state: Build_Process.State, session_name: String): Build_Process.State = {
     val ancestor_results =
-      for (a <- build_context.sessions(session_name).ancestors) yield state.results(a)
+      for (a <- state.sessions(session_name).ancestors) yield state.results(a)
+
+    val sources_shasum = state.sessions(session_name).sources_shasum
 
     val input_shasum =
       if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum()
       else SHA1.flat_shasum(ancestor_results.map(_.output_shasum))
 
-    val store_heap = build_context.store_heap(session_name)
+    val store_heap =
+      build_context.build_heap || Sessions.is_pure(session_name) ||
+      state.sessions.iterator.exists(_.ancestors.contains(session_name))
 
     val (current, output_shasum) =
       store.check_output(session_name,
         session_options = build_context.sessions_structure(session_name).options,
-        sources_shasum = build_context.sources_shasum(session_name),
+        sources_shasum = sources_shasum,
         input_shasum = input_shasum,
         fresh_build = build_context.fresh_build,
         store_heap = store_heap)
@@ -966,7 +989,7 @@
       val numa_node =
         for {
           db <- _host_database
-          n <- Host.next_numa_node(db, hostname, build_context.numa_nodes, used_nodes)
+          n <- Host.next_numa_node(db, hostname, state.numa_nodes, used_nodes)
         } yield n
       val node_info = Host.Node_Info(hostname, numa_node)
 
@@ -976,9 +999,11 @@
 
       store.clean_output(_database_server, session_name, session_init = true)
 
+      val session = state.sessions(session_name)
+
       val build =
-        Build_Job.start_session(build_context, progress, log, _database_server,
-          build_deps.background(session_name), input_shasum, node_info)
+        Build_Job.start_session(build_context, session, progress, log, _database_server,
+          build_deps.background(session_name), sources_shasum, input_shasum, node_info, store_heap)
 
       val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))