proper IPC for scheduled builds, following 7ae25372ab04;
authorFabian Huch <huch@in.tum.de>
Thu, 14 Mar 2024 17:19:01 +0100
changeset 79896 2c9c5ae99a09
parent 79895 4ec26ed6f481
child 79897 661fb7db57ca
proper IPC for scheduled builds, following 7ae25372ab04;
src/Pure/Build/build_schedule.scala
--- a/src/Pure/Build/build_schedule.scala	Thu Mar 14 15:46:39 2024 +0100
+++ b/src/Pure/Build/build_schedule.scala	Thu Mar 14 17:19:01 2024 +0100
@@ -527,6 +527,9 @@
         if graph.imm_preds(node.job_name).subsetOf(state.results.keySet)
       } yield task.name
 
+    def exists_next(hostname: String, state: Build_Process.State): Boolean = 
+      next(hostname, state).nonEmpty
+    
     def update(state: Build_Process.State): Schedule = {
       val start1 = Date.now()
 
@@ -902,6 +905,24 @@
 
     override def next_jobs(state: Build_Process.State): List[String] =
       if (progress.stopped || _schedule.is_empty) Nil else _schedule.next(hostname, state)
+
+    private var _build_tick: Long = 0L
+
+    protected override def build_action(): Boolean =
+      Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
+        val received = build_receive(n => n.channel == Build_Process.private_data.channel)
+        val ready = received.contains(Build_Schedule.private_data.channel_ready(hostname))
+
+        val finished = synchronized { _state.finished_running() }
+
+        def sleep: Boolean = {
+          build_delay.sleep()
+          val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 }
+          expired || ready || progress.stopped
+        }
+
+        finished || sleep
+      }
   }
 
   abstract class Scheduler_Build_Process(
@@ -1073,14 +1094,47 @@
       }
 
     override def run(): Build.Results = {
-      for (db <- _build_database)
-        Build_Process.private_data.transaction_lock(db, label = "Scheduler_Build_Process.init") {
-          Build_Process.private_data.clean_build(db)
+      val vacuous =
+        synchronized_database("Scheduler_Build_Process.init") {
+          for (db <- _build_database) Build_Process.private_data.clean_build(db)
+          init_unsynchronized()
+          _state.pending.isEmpty
+        }
+      if (vacuous) {
+        progress.echo_warning("Nothing to build")
+        stop_build()
+        Build.Results(build_context)
+      }
+      else {
+        start_worker()
+        _build_cluster.start()
+
+        try {
+          while (!finished()) {
+            synchronized_database("Scheduler_Build_Process.main") {
+              if (progress.stopped) _state.build_running.foreach(_.cancel())
+              main_unsynchronized()
+              for {
+                host <- build_context.build_hosts
+                if _schedule.exists_next(host.name, _state)
+              } build_send(Build_Schedule.private_data.channel_ready(host.name))
+            }
+            while(!build_action()) {}
+          }
+        }
+        finally {
+          _build_cluster.stop()
+          stop_worker()
+          stop_build()
         }
 
-      val results = super.run()
-      write_build_log(results, snapshot().results)
-      results
+        val results = synchronized_database("Scheduler_Build_Process.result") {
+          val results = for ((name, result) <- _state.results) yield name -> result.process_result
+          Build.Results(build_context, results = results, other_rc = _build_cluster.rc)
+        }
+        write_build_log(results, _state.results)
+        results
+      }
     }
   }
 
@@ -1089,6 +1143,7 @@
 
   object private_data extends SQL.Data("isabelle_build") {
     import Build_Process.private_data.{Base, Generic}
+    /* tables */
 
     override lazy val tables: SQL.Tables =
       SQL.Tables(Schedules.table, Nodes.table)
@@ -1096,6 +1151,11 @@
     lazy val all_tables: SQL.Tables =
       SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list)
 
+    /* notifications */
+
+    def channel_ready(hostname: String): SQL.Notification =
+      SQL.Notification(Build_Process.private_data.channel, payload = hostname)
+
 
     /* schedule */