separate build processes for scheduler and scheduled;
authorFabian Huch <huch@in.tum.de>
Thu, 14 Dec 2023 13:14:25 +0100
changeset 79290 9deadc9d8872
parent 79289 7c1faa16554b
child 79291 e9a788a75775
separate build processes for scheduler and scheduled;
src/Pure/Tools/build_schedule.scala
--- a/src/Pure/Tools/build_schedule.scala	Thu Dec 14 13:10:01 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala	Thu Dec 14 13:14:25 2023 +0100
@@ -803,28 +803,15 @@
   }
 
 
-  /* process for scheduled build */
+  /* master and slave processes for scheduled build */
 
-  abstract class Scheduled_Build_Process(
+  class Scheduled_Build_Process(
     build_context: Build.Context,
     build_progress: Progress,
     server: SSH.Server,
   ) extends Build_Process(build_context, build_progress, server) {
-    protected val start_date = Date.now()
-
-    def init_scheduler(timing_data: Timing_Data): Scheduler
-
     /* global resources with common close() operation */
 
-    private final lazy val _log_store: Build_Log.Store = Build_Log.store(build_options)
-    private final lazy val _log_database: SQL.Database =
-      try {
-        val db = _log_store.open_database(server = this.server)
-        _log_store.init_database(db)
-        db
-      }
-      catch { case exn: Throwable => close(); throw exn }
-
     private val _build_database: Option[SQL.Database] =
       try {
         for (db <- store.maybe_open_build_database(server = server)) yield {
@@ -842,7 +829,6 @@
       catch { case exn: Throwable => close(); throw exn }
 
     override def close(): Unit = {
-      Option(_log_database).foreach(_.close())
       Option(_build_database).flatten.foreach(_.close())
       super.close()
     }
@@ -850,7 +836,7 @@
 
     /* global state: internal var vs. external database */
 
-    private var _schedule = Schedule.init(build_uuid)
+    protected var _schedule = Schedule.init(build_uuid)
 
     override protected def synchronized_database[A](label: String)(body: => A): A =
       super.synchronized_database(label) {
@@ -868,12 +854,51 @@
       }
 
 
+    /* build process */
+
+    override def next_node_info(state: Build_Process.State, session_name: String): Node_Info =
+      _schedule.graph.get_node(session_name).node_info
+
+    override def next_jobs(state: Build_Process.State): List[String] =
+      if (progress.stopped || _schedule.is_outdated(build_options, state)) Nil
+      else _schedule.next(hostname, state)
+  }
+
+  abstract class Scheduler_Build_Process(
+    build_context: Build.Context,
+    build_progress: Progress,
+    server: SSH.Server,
+  ) extends Scheduled_Build_Process(build_context, build_progress, server) {
+    require(build_context.master)
+
+    protected val start_date = Date.now()
+
+    def init_scheduler(timing_data: Timing_Data): Scheduler
+
+
+    /* global resources with common close() operation */
+
+    private final lazy val _log_store: Build_Log.Store = Build_Log.store(build_options)
+    private final lazy val _log_database: SQL.Database =
+      try {
+        val db = _log_store.open_database(server = this.server)
+        _log_store.init_database(db)
+        db
+      }
+      catch { case exn: Throwable => close(); throw exn }
+
+    override def close(): Unit = {
+      Option(_log_database).foreach(_.close())
+      super.close()
+    }
+
+
     /* previous results via build log */
 
     override def open_build_cluster(): Build_Cluster = {
       val build_cluster = super.open_build_cluster()
       build_cluster.init()
-      if (build_context.master && build_context.max_jobs > 0) {
+      if (build_context.max_jobs > 0) {
         val benchmark_options = build_options.string("build_hostname") = hostname
         Benchmark.benchmark(benchmark_options, progress)
       }
@@ -948,9 +973,6 @@
 
     /* build process */
 
-    override def next_node_info(state: Build_Process.State, session_name: String): Node_Info =
-      _schedule.graph.get_node(session_name).node_info
-
     def is_current(state: Build_Process.State, session_name: String): Boolean =
       state.ancestor_results(session_name) match {
         case Some(ancestor_results) if ancestor_results.forall(_.current) =>
@@ -975,10 +997,8 @@
     }
 
     override def next_jobs(state: Build_Process.State): List[String] =
-      if (!progress.stopped && !_schedule.is_outdated(build_options, state))
-        _schedule.next(hostname, state)
-      else if (!build_context.master) Nil
-      else if (progress.stopped) state.next_ready.map(_.name)
+      if (progress.stopped) state.next_ready.map(_.name)
+      else if (!_schedule.is_outdated(build_options, state)) _schedule.next(hostname, state)
       else {
         val current = state.next_ready.filter(task => is_current(state, task.name))
         if (current.nonEmpty) current.map(_.name)
@@ -1002,7 +1022,7 @@
 
     override def run(): Build.Results = {
       val results = super.run()
-      if (build_context.master) write_build_log(results, snapshot().results)
+      write_build_log(results, snapshot().results)
       results
     }
   }
@@ -1212,7 +1232,8 @@
       progress: Progress,
       server: SSH.Server
     ): Build_Process =
-      new Scheduled_Build_Process(context, progress, server) {
+      if (!context.master) new Scheduled_Build_Process(context, progress, server)
+      else new Scheduler_Build_Process(context, progress, server) {
         def init_scheduler(timing_data: Timing_Data): Scheduler = scheduler(timing_data, context)
       }
   }