proper synchronized access to mutable state, to support concurrency eventually;
authorwenzelm
Mon, 13 Feb 2023 11:35:46 +0100
changeset 77289 c7d893278aec
parent 77288 e9f1fcb9b358
child 77290 12fd873af77c
proper synchronized access to mutable state, to support concurrency eventually;
src/Pure/Tools/build_process.scala
--- a/src/Pure/Tools/build_process.scala	Mon Feb 13 11:25:01 2023 +0100
+++ b/src/Pure/Tools/build_process.scala	Mon Feb 13 11:35:46 2023 +0100
@@ -174,18 +174,20 @@
   private var _running = Map.empty[String, Build_Job]
   private var _results = Map.empty[String, Build_Process.Result]
 
-  private def remove_pending(name: String): Unit = {
+  private def remove_pending(name: String): Unit = synchronized {
     _build_graph = _build_graph.del_node(name)
     _build_order = _build_order - name
   }
 
-  private def next_pending(): Option[String] =
+  private def next_pending(): Option[String] = synchronized {
     _build_order.iterator
       .dropWhile(name => _running.isDefinedAt(name) || !_build_graph.is_minimal(name))
       .nextOption()
+  }
 
-  private def used_node(i: Int): Boolean =
+  private def used_node(i: Int): Boolean = synchronized {
     _running.valuesIterator.exists(job => job.numa_node.isDefined && job.numa_node.get == i)
+  }
 
   private def session_finished(session_name: String, process_result: Process_Result): String =
     "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
@@ -250,9 +252,11 @@
       if (!process_result.interrupted) progress.echo(process_result_tail.out)
     }
 
-    remove_pending(session_name)
-    _running -= session_name
-    _results += (session_name -> Build_Process.Result(false, output_heap, process_result_tail))
+    synchronized {
+      remove_pending(session_name)
+      _running -= session_name
+      _results += (session_name -> Build_Process.Result(false, output_heap, process_result_tail))
+    }
   }
 
   private def start_job(session_name: String): Unit = {
@@ -288,13 +292,17 @@
     val all_current = current && ancestor_results.forall(_.current)
 
     if (all_current) {
-      remove_pending(session_name)
-      _results += (session_name -> Build_Process.Result(true, output_heap, Process_Result.ok))
+      synchronized {
+        remove_pending(session_name)
+        _results += (session_name -> Build_Process.Result(true, output_heap, Process_Result.ok))
+      }
     }
     else if (no_build) {
       progress.echo_if(verbose, "Skipping " + session_name + " ...")
-      remove_pending(session_name)
-      _results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.error))
+      synchronized {
+        remove_pending(session_name)
+        _results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.error))
+      }
     }
     else if (ancestor_results.forall(_.ok) && !progress.stopped) {
       progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...")
@@ -308,16 +316,20 @@
         new Resources(session_background, log = log,
           command_timings = build_context(session_name).old_command_timings)
 
-      val numa_node = _numa_nodes.next(used_node)
-      val job =
-        new Build_Job(progress, session_background, store, do_store,
-          resources, session_setup, input_heaps, numa_node)
-      _running += (session_name -> job)
+      synchronized {
+        val numa_node = _numa_nodes.next(used_node)
+        val job =
+          new Build_Job(progress, session_background, store, do_store,
+            resources, session_setup, input_heaps, numa_node)
+        _running += (session_name -> job)
+      }
     }
     else {
       progress.echo(session_name + " CANCELLED")
-      remove_pending(session_name)
-      _results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.undefined))
+      synchronized {
+        remove_pending(session_name)
+        _results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.undefined))
+      }
     }
   }
 
@@ -327,12 +339,12 @@
     }
 
   def run(): Map[String, Build_Process.Result] = {
-    while (!_build_graph.is_empty) {
-      if (progress.stopped) _running.valuesIterator.foreach(_.terminate())
+    while (synchronized { !_build_graph.is_empty }) {
+      if (progress.stopped) synchronized { _running.valuesIterator.foreach(_.terminate()) }
 
-      _running.find({ case (_, job) => job.is_finished }) match {
+      synchronized { _running } .find({ case (_, job) => job.is_finished }) match {
         case Some((session_name, job)) => finish_job(session_name, job)
-        case None if _running.size < (max_jobs max 1) =>
+        case None if synchronized { _running.size } < (max_jobs max 1) =>
           next_pending() match {
             case Some(session_name) => start_job(session_name)
             case None => sleep()
@@ -341,6 +353,6 @@
       }
     }
 
-    _results
+    synchronized { _results }
   }
 }