clarified state: more explicit type as plain value, which is also easier to sync with external db;
authorwenzelm
Tue, 21 Feb 2023 12:42:08 +0100
changeset 77334 0231e62956a6
parent 77333 0bec014c0f9f
child 77335 05b97b54cb3b
clarified state: more explicit type as plain value, which is also easier to sync with external db;
src/Pure/Tools/build_process.scala
--- a/src/Pure/Tools/build_process.scala	Tue Feb 21 12:13:35 2023 +0100
+++ b/src/Pure/Tools/build_process.scala	Tue Feb 21 12:42:08 2023 +0100
@@ -13,7 +13,7 @@
 
 
 object Build_Process {
-  /* static information */
+  /* static context */
 
   object Session_Context {
     def empty(session: String, timeout: Time): Session_Context =
@@ -150,7 +150,7 @@
   }
 
 
-  /* main */
+  /* dynamic state */
 
   case class Entry(name: String, deps: List[String]) {
     def is_ready: Boolean = deps.isEmpty
@@ -166,6 +166,55 @@
     def ok: Boolean = process_result.ok
   }
 
+  sealed case class State(
+    numa_index: Int = 0,
+    pending: List[Entry] = Nil,
+    running: Map[String, Build_Job] = Map.empty,
+    results: Map[String, Build_Process.Result] = Map.empty
+  ) {
+    def is_pending: Boolean = pending.nonEmpty
+
+    def remove_pending(name: String): State =
+      copy(pending = pending.flatMap(
+        entry => if (entry.name == name) None else Some(entry.resolve(name))))
+
+    def is_running(name: String): Boolean = running.isDefinedAt(name)
+
+    def numa_running: Set[Int] =
+      Set.from(for (job <- running.valuesIterator; i <- job.numa_node) yield i)
+
+    def stop_running(): Unit = running.valuesIterator.foreach(_.terminate())
+
+    def finished_running(): List[Build_Job.Build_Session] =
+      List.from(
+        running.valuesIterator.flatMap {
+          case job: Build_Job.Build_Session if job.is_finished => Some(job)
+          case _ => None
+        })
+
+    def add_running(name: String, job: Build_Job): State =
+      copy(running = running + (name -> job))
+
+    def remove_running(name: String): State =
+      copy(running = running - name)
+
+    def add_result(
+      name: String,
+      current: Boolean,
+      output_heap: SHA1.Shasum,
+      process_result: Process_Result
+    ): State = {
+      val result = Build_Process.Result(current, output_heap, process_result)
+      copy(results = results + (name -> result))
+    }
+
+    def get_results(names: List[String]): List[Build_Process.Result] =
+      names.map(results.apply)
+  }
+
+
+  /* main process */
+
   def session_finished(session_name: String, process_result: Process_Result): String =
     "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
 
@@ -192,24 +241,18 @@
     }
 
   // global state
-  protected var _numa_index = 0
-  protected var _pending: List[Build_Process.Entry] = init_pending()
-  protected var _running = Map.empty[String, Build_Job]
-  protected var _results = Map.empty[String, Build_Process.Result]
+  protected var _state = init_state()
 
-  protected def init_pending(): List[Build_Process.Entry] =
-    (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator)
-      yield Build_Process.Entry(name, preds.toList)).toList
+  protected def init_state(): Build_Process.State =
+    Build_Process.State(pending =
+      (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator)
+        yield Build_Process.Entry(name, preds.toList)).toList)
 
-  protected def is_pending(): Boolean = synchronized { _pending.nonEmpty }
-
-  protected def remove_pending(name: String): Unit = synchronized {
-    _pending = _pending.flatMap(entry => if (entry.name == name) None else Some(entry.resolve(name)))
-  }
+  protected def is_pending(): Boolean = synchronized { _state.is_pending }
 
   protected def next_pending(): Option[String] = synchronized {
-    if (_running.size < (build_context.max_jobs max 1)) {
-      _pending.filter(entry => entry.is_ready && !_running.isDefinedAt(entry.name))
+    if (_state.running.size < (build_context.max_jobs max 1)) {
+      _state.pending.filter(entry => entry.is_ready && !_state.is_running(entry.name))
         .sortBy(_.name)(build_context.ordering)
         .headOption.map(_.name)
     }
@@ -220,51 +263,28 @@
     val available = build_context.numa_nodes.zipWithIndex
     if (available.isEmpty) None
     else {
-      val used = Set.from(for (job <- _running.valuesIterator; i <- job.numa_node) yield i)
-      val index = _numa_index
+      val used = _state.numa_running
+      val index = _state.numa_index
       val candidates = available.drop(index) ::: available.take(index)
       val (n, i) =
         candidates.find({ case (n, i) => i == index && !used(n) }) orElse
         candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
-      _numa_index = (i + 1) % available.length
+      _state = _state.copy(numa_index = (i + 1) % available.length)
       Some(n)
     }
   }
 
-  protected def stop_running(): Unit = synchronized {
-    _running.valuesIterator.foreach(_.terminate())
-  }
+  protected def stop_running(): Unit = synchronized { _state.stop_running() }
 
   protected def finished_running(): List[Build_Job.Build_Session] = synchronized {
-    List.from(
-      _running.valuesIterator.flatMap {
-        case job: Build_Job.Build_Session if job.is_finished => Some(job)
-        case _ => None
-      })
+    _state.finished_running()
   }
 
   protected def job_running(name: String, job: Build_Job): Build_Job = synchronized {
-    _running += (name -> job)
+    _state = _state.add_running(name, job)
     job
   }
 
-  protected def remove_running(name: String): Unit = synchronized {
-    _running -= name
-  }
-
-  protected def add_result(
-    name: String,
-    current: Boolean,
-    output_heap: SHA1.Shasum,
-    process_result: Process_Result
-  ): Unit = synchronized {
-    _results += (name -> Build_Process.Result(current, output_heap, process_result))
-  }
-
-  protected def get_results(names: List[String]): List[Build_Process.Result] = synchronized {
-    names.map(_results.apply)
-  }
-
   protected def finish_job(job: Build_Job.Build_Session): Unit = {
     val session_name = job.session_name
     val process_result = job.join
@@ -315,17 +335,19 @@
     }
 
     synchronized {
-      remove_pending(session_name)
-      remove_running(session_name)
-      add_result(session_name, false, output_heap, process_result_tail)
+      _state = _state.
+        remove_pending(session_name).
+        remove_running(session_name).
+        add_result(session_name, false, output_heap, process_result_tail)
     }
   }
 
   protected def start_job(session_name: String): Unit = {
-    val ancestor_results =
-      get_results(
+    val ancestor_results = synchronized {
+      _state.get_results(
         build_deps.sessions_structure.build_requirements(List(session_name)).
           filterNot(_ == session_name))
+    }
     val input_heaps =
       if (ancestor_results.isEmpty) {
         SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE")))
@@ -356,15 +378,17 @@
 
     if (all_current) {
       synchronized {
-        remove_pending(session_name)
-        add_result(session_name, true, output_heap, Process_Result.ok)
+        _state = _state.
+          remove_pending(session_name).
+          add_result(session_name, true, output_heap, Process_Result.ok)
       }
     }
     else if (build_context.no_build) {
       progress.echo_if(verbose, "Skipping " + session_name + " ...")
       synchronized {
-        remove_pending(session_name)
-        add_result(session_name, false, output_heap, Process_Result.error)
+        _state = _state.
+          remove_pending(session_name).
+          add_result(session_name, false, output_heap, Process_Result.error)
       }
     }
     else if (ancestor_results.forall(_.ok) && !progress.stopped) {
@@ -391,8 +415,9 @@
     else {
       progress.echo(session_name + " CANCELLED")
       synchronized {
-        remove_pending(session_name)
-        add_result(session_name, false, output_heap, Process_Result.undefined)
+        _state = _state.
+          remove_pending(session_name).
+          add_result(session_name, false, output_heap, Process_Result.undefined)
       }
     }
   }
@@ -415,7 +440,7 @@
         }
       }
       synchronized {
-        for ((name, result) <- _results) yield name -> result.process_result
+        for ((name, result) <- _state.results) yield name -> result.process_result
       }
     }
     else {