# HG changeset patch # User Fabian Huch # Date 1710433141 -3600 # Node ID 2c9c5ae99a09aa21ffbe051c950f3af17ce8cf7f # Parent 4ec26ed6f481ce118a3c404c4cfa7c59296343ee proper IPC for scheduled builds, following 7ae25372ab04; diff -r 4ec26ed6f481 -r 2c9c5ae99a09 src/Pure/Build/build_schedule.scala --- a/src/Pure/Build/build_schedule.scala Thu Mar 14 15:46:39 2024 +0100 +++ b/src/Pure/Build/build_schedule.scala Thu Mar 14 17:19:01 2024 +0100 @@ -527,6 +527,9 @@ if graph.imm_preds(node.job_name).subsetOf(state.results.keySet) } yield task.name + def exists_next(hostname: String, state: Build_Process.State): Boolean = + next(hostname, state).nonEmpty + def update(state: Build_Process.State): Schedule = { val start1 = Date.now() @@ -902,6 +905,24 @@ override def next_jobs(state: Build_Process.State): List[String] = if (progress.stopped || _schedule.is_empty) Nil else _schedule.next(hostname, state) + + private var _build_tick: Long = 0L + + protected override def build_action(): Boolean = + Isabelle_Thread.interrupt_handler(_ => progress.stop()) { + val received = build_receive(n => n.channel == Build_Process.private_data.channel) + val ready = received.contains(Build_Schedule.private_data.channel_ready(hostname)) + + val finished = synchronized { _state.finished_running() } + + def sleep: Boolean = { + build_delay.sleep() + val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 } + expired || ready || progress.stopped + } + + finished || sleep + } } abstract class Scheduler_Build_Process( @@ -1073,14 +1094,47 @@ } override def run(): Build.Results = { - for (db <- _build_database) - Build_Process.private_data.transaction_lock(db, label = "Scheduler_Build_Process.init") { - Build_Process.private_data.clean_build(db) + val vacuous = + synchronized_database("Scheduler_Build_Process.init") { + for (db <- _build_database) Build_Process.private_data.clean_build(db) + init_unsynchronized() + _state.pending.isEmpty + } + if (vacuous) { + progress.echo_warning("Nothing to build") + stop_build() + Build.Results(build_context) + } + else { + start_worker() + _build_cluster.start() + + try { + while (!finished()) { + synchronized_database("Scheduler_Build_Process.main") { + if (progress.stopped) _state.build_running.foreach(_.cancel()) + main_unsynchronized() + for { + host <- build_context.build_hosts + if _schedule.exists_next(host.name, _state) + } build_send(Build_Schedule.private_data.channel_ready(host.name)) + } + while(!build_action()) {} + } + } + finally { + _build_cluster.stop() + stop_worker() + stop_build() } - val results = super.run() - write_build_log(results, snapshot().results) - results + val results = synchronized_database("Scheduler_Build_Process.result") { + val results = for ((name, result) <- _state.results) yield name -> result.process_result + Build.Results(build_context, results = results, other_rc = _build_cluster.rc) + } + write_build_log(results, _state.results) + results + } } } @@ -1089,6 +1143,7 @@ object private_data extends SQL.Data("isabelle_build") { import Build_Process.private_data.{Base, Generic} + /* tables */ override lazy val tables: SQL.Tables = SQL.Tables(Schedules.table, Nodes.table) @@ -1096,6 +1151,11 @@ lazy val all_tables: SQL.Tables = SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list) + /* notifications */ + + def channel_ready(hostname: String): SQL.Notification = + SQL.Notification(Build_Process.private_data.channel, payload = hostname) + /* schedule */