--- a/src/Pure/Build/build_schedule.scala Thu Mar 14 15:46:39 2024 +0100
+++ b/src/Pure/Build/build_schedule.scala Thu Mar 14 17:19:01 2024 +0100
@@ -527,6 +527,9 @@
if graph.imm_preds(node.job_name).subsetOf(state.results.keySet)
} yield task.name
+ def exists_next(hostname: String, state: Build_Process.State): Boolean =
+ next(hostname, state).nonEmpty
+
def update(state: Build_Process.State): Schedule = {
val start1 = Date.now()
@@ -902,6 +905,24 @@
override def next_jobs(state: Build_Process.State): List[String] =
if (progress.stopped || _schedule.is_empty) Nil else _schedule.next(hostname, state)
+
+ private var _build_tick: Long = 0L
+
+ protected override def build_action(): Boolean =
+ Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
+ val received = build_receive(n => n.channel == Build_Process.private_data.channel)
+ val ready = received.contains(Build_Schedule.private_data.channel_ready(hostname))
+
+ val finished = synchronized { _state.finished_running() }
+
+ def sleep: Boolean = {
+ build_delay.sleep()
+ val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 }
+ expired || ready || progress.stopped
+ }
+
+ finished || sleep
+ }
}
abstract class Scheduler_Build_Process(
@@ -1073,14 +1094,47 @@
}
override def run(): Build.Results = {
- for (db <- _build_database)
- Build_Process.private_data.transaction_lock(db, label = "Scheduler_Build_Process.init") {
- Build_Process.private_data.clean_build(db)
+ val vacuous =
+ synchronized_database("Scheduler_Build_Process.init") {
+ for (db <- _build_database) Build_Process.private_data.clean_build(db)
+ init_unsynchronized()
+ _state.pending.isEmpty
+ }
+ if (vacuous) {
+ progress.echo_warning("Nothing to build")
+ stop_build()
+ Build.Results(build_context)
+ }
+ else {
+ start_worker()
+ _build_cluster.start()
+
+ try {
+ while (!finished()) {
+ synchronized_database("Scheduler_Build_Process.main") {
+ if (progress.stopped) _state.build_running.foreach(_.cancel())
+ main_unsynchronized()
+ for {
+ host <- build_context.build_hosts
+ if _schedule.exists_next(host.name, _state)
+ } build_send(Build_Schedule.private_data.channel_ready(host.name))
+ }
+ while(!build_action()) {}
+ }
+ }
+ finally {
+ _build_cluster.stop()
+ stop_worker()
+ stop_build()
}
- val results = super.run()
- write_build_log(results, snapshot().results)
- results
+ val results = synchronized_database("Scheduler_Build_Process.result") {
+ val results = for ((name, result) <- _state.results) yield name -> result.process_result
+ Build.Results(build_context, results = results, other_rc = _build_cluster.rc)
+ }
+ write_build_log(results, _state.results)
+ results
+ }
}
}
@@ -1089,6 +1143,7 @@
object private_data extends SQL.Data("isabelle_build") {
import Build_Process.private_data.{Base, Generic}
+ /* tables */
override lazy val tables: SQL.Tables =
SQL.Tables(Schedules.table, Nodes.table)
@@ -1096,6 +1151,11 @@
lazy val all_tables: SQL.Tables =
SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list)
+ /* notifications */
+
+ def channel_ready(hostname: String): SQL.Notification =
+ SQL.Notification(Build_Process.private_data.channel, payload = hostname)
+
/* schedule */