--- a/etc/options Wed Mar 13 11:54:06 2024 +0100
+++ b/etc/options Wed Mar 13 23:27:44 2024 +0100
@@ -204,8 +204,14 @@
option build_delay : real = 0.2
-- "delay build process main loop (local)"
-option build_cluster_delay : real = 1.0
- -- "delay build process main loop (cluster)"
+option build_delay_master : real = 1.0
+ -- "delay build process main loop (cluster master)"
+
+option build_delay_worker : real = 0.5
+ -- "delay build process main loop (cluster worker)"
+
+option build_cluster_expire : int = 50
+ -- "enforce database synchronization after given number of delay loops"
option build_cluster_root : string = "$USER_HOME/.isabelle/build_cluster"
-- "root directory for remote build cluster sessions"
--- a/src/Pure/Build/build_benchmark.scala Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/Build/build_benchmark.scala Wed Mar 13 23:27:44 2024 +0100
@@ -82,15 +82,13 @@
val local_build_context = build_context.copy(store = Store(local_options))
- val build =
+ val result =
Build_Job.start_session(local_build_context, session, progress, new Logger, server,
- background, session.sources_shasum, input_shasum, node_info, false)
+ background, session.sources_shasum, input_shasum, node_info, false).join
val timing =
- build.join match {
- case Some(result) if result.process_result.ok => result.process_result.timing
- case _ => error("Failed to build benchmark session")
- }
+ if (result.process_result.ok) result.process_result.timing
+ else error("Failed to build benchmark session")
val score = Time.seconds(1000).ms.toDouble / (1 + timing.elapsed.ms)
progress.echo(
--- a/src/Pure/Build/build_job.scala Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/Build/build_job.scala Wed Mar 13 23:27:44 2024 +0100
@@ -13,11 +13,12 @@
trait Build_Job {
def cancel(): Unit = ()
def is_finished: Boolean = false
- def join: Option[Build_Job.Result] = None
+ def join: Build_Job.Result = Build_Job.no_result
}
object Build_Job {
sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum)
+ val no_result: Result = Result(Process_Result.undefined, SHA1.no_shasum)
/* build session */
@@ -114,7 +115,7 @@
) extends Build_Job {
def session_name: String = session_background.session_name
- private val future_result: Future[Option[Result]] =
+ private val future_result: Future[Result] =
Future.thread("build", uninterruptible = true) {
val info = session_background.sessions_structure(session_name)
val options = Host.node_options(info.options, node_info)
@@ -508,15 +509,10 @@
process_result.rc,
build_context.build_uuid))
- val valid =
- if (progress.stopped_local) false
- else {
- database_server match {
- case Some(db) => write_info(db)
- case None => using(store.open_database(session_name, output = true))(write_info)
- }
- true
- }
+ database_server match {
+ case Some(db) => write_info(db)
+ case None => using(store.open_database(session_name, output = true))(write_info)
+ }
using_optional(store.maybe_open_heaps_database(database_server, server = server)) {
heaps_database =>
@@ -554,13 +550,12 @@
}
}
- if (valid) Some(Result(process_result.copy(out_lines = log_lines), output_shasum))
- else None
+ Result(process_result.copy(out_lines = log_lines), output_shasum)
}
}
override def cancel(): Unit = future_result.cancel()
override def is_finished: Boolean = future_result.is_finished
- override def join: Option[Result] = future_result.join
+ override def join: Result = future_result.join
}
}
--- a/src/Pure/Build/build_process.scala Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/Build/build_process.scala Wed Mar 13 23:27:44 2024 +0100
@@ -65,11 +65,7 @@
node_info: Host.Node_Info,
start_date: Date,
build: Option[Build_Job]
- ) extends Library.Named {
- def cancel(): Unit = build.foreach(_.cancel())
- def is_finished: Boolean = build.isDefined && build.get.is_finished
- def join_build: Option[Build_Job.Result] = build.flatMap(_.join)
- }
+ ) extends Library.Named
sealed case class Result(
name: String,
@@ -233,7 +229,9 @@
def next_serial: Long = State.inc_serial(serial)
def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name)
- def next_ready: List[Task] = ready.filter(entry => !is_running(entry.name))
+ def next_ready: List[Task] = ready.filter(task => !is_running(task.name))
+ def exists_ready: Boolean =
+ pending.valuesIterator.exists(task => task.is_ready && !is_running(task.name))
def remove_pending(a: String): State =
copy(pending =
@@ -250,8 +248,14 @@
def is_running(name: String): Boolean = running.isDefinedAt(name)
- def build_running: List[Job] =
- List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job)
+ def build_running: List[Build_Job] =
+ running.valuesIterator.flatMap(_.build).toList
+
+ def finished_running(): Boolean =
+ build_running.exists(_.is_finished)
+
+ def busy_running(jobs: Int): Boolean =
+ jobs <= 0 || jobs <= build_running.length
def add_running(job: Job): State =
copy(running = running + (job.name -> job))
@@ -287,6 +291,9 @@
object private_data extends SQL.Data("isabelle_build") {
val database: Path = Path.explode("$ISABELLE_HOME_USER/build.db")
+
+ /* tables */
+
override lazy val tables: SQL.Tables =
SQL.Tables(
Updates.table,
@@ -301,6 +308,15 @@
private lazy val build_id_tables =
tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t))
+
+ /* notifications */
+
+ lazy val channel: String = Base.table.name
+ lazy val channel_ready: SQL.Notification = SQL.Notification(channel, payload = "ready")
+
+
+ /* generic columns */
+
object Generic {
val build_id = SQL.Column.long("build_id")
val build_uuid = SQL.Column.string("build_uuid")
@@ -940,6 +956,7 @@
build_start: Date
): Long =
private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") {
+ db.listen(private_data.channel)
val build_uuid = build_context.build_uuid
val build_id = private_data.get_build_id(db, build_uuid)
if (build_context.master) {
@@ -1010,14 +1027,27 @@
}
catch { case exn: Throwable => close(); throw exn }
- protected val build_delay: Time = {
- val option =
- _build_database match {
- case Some(db) if db.is_postgresql => "build_cluster_delay"
- case _ => "build_delay"
- }
- build_options.seconds(option)
- }
+ protected def build_receive(filter: SQL.Notification => Boolean): List[SQL.Notification] =
+ _build_database.flatMap(_.receive(filter)).getOrElse(Nil)
+
+ protected def build_send(notification: SQL.Notification): Unit =
+ _build_database.foreach(_.send(notification))
+
+ protected def build_cluster: Boolean =
+ _build_database match {
+ case Some(db) => db.is_postgresql
+ case None => false
+ }
+
+ protected val build_delay: Time =
+ build_options.seconds(
+ if (!build_cluster) "build_delay"
+ else if (build_context.master) "build_delay_master"
+ else "build_delay_worker")
+
+ protected val build_expire: Int =
+ if (!build_cluster || build_context.master) 1
+ else build_options.int("build_cluster_expire").max(1)
protected val _host_database: SQL.Database =
try { store.open_build_database(path = Host.private_data.database, server = server) }
@@ -1031,11 +1061,11 @@
val progress =
new Database_Progress(db, build_progress,
input_messages = build_context.master,
- output_stopped = build_context.master,
hostname = hostname,
context_uuid = build_uuid,
kind = "build_process",
- timeout = Some(build_delay))
+ timeout = Some(build_delay),
+ tick_expire = build_expire)
(progress, progress.agent_uuid)
}
catch { case exn: Throwable => close(); throw exn }
@@ -1166,13 +1196,10 @@
make_result(result_name, Process_Result.error, output_shasum)
}
else if (cancelled) {
- if (build_context.master) {
- progress.echo(session_name + " CANCELLED")
- state
- .remove_pending(session_name)
- .make_result(result_name, Process_Result.undefined, output_shasum)
- }
- else state
+ progress.echo(session_name + " CANCELLED")
+ state
+ .remove_pending(session_name)
+ .make_result(result_name, Process_Result.undefined, output_shasum)
}
else {
val build_log_verbose = build_options.bool("build_log_verbose")
@@ -1247,8 +1274,24 @@
else _state.pending.isEmpty
}
- protected def sleep(): Unit =
- Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() }
+ private var _build_tick: Long = 0L
+
+ protected def build_action(): Boolean =
+ Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
+ val received = build_receive(n => n.channel == Build_Process.private_data.channel)
+ val ready = received.contains(Build_Process.private_data.channel_ready)
+ val reactive = ready && synchronized { !_state.busy_running(build_context.jobs) }
+
+ val finished = synchronized { _state.finished_running() }
+
+ def sleep: Boolean = {
+ build_delay.sleep()
+ val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 }
+ expired || reactive || progress.stopped
+ }
+
+ finished || sleep
+ }
protected def init_unsynchronized(): Unit = {
if (build_context.master) {
@@ -1268,17 +1311,16 @@
}
protected def main_unsynchronized(): Unit = {
- for (job <- _state.build_running.filter(_.is_finished)) {
- _state = _state.remove_running(job.name)
- for (result <- job.join_build) {
- val result_name = (job.name, worker_uuid, build_uuid)
- _state = _state.
- remove_pending(job.name).
- make_result(result_name,
- result.process_result,
- result.output_shasum,
- node_info = job.node_info)
- }
+ for (job <- _state.running.valuesIterator; build <- job.build if build.is_finished) {
+ val result = build.join
+ val result_name = (job.name, worker_uuid, build_uuid)
+ _state = _state.
+ remove_pending(job.name).
+ remove_running(job.name).
+ make_result(result_name,
+ result.process_result,
+ result.output_shasum,
+ node_info = job.node_info)
}
for (name <- next_jobs(_state)) {
@@ -1316,8 +1358,11 @@
synchronized_database("Build_Process.main") {
if (progress.stopped) _state.build_running.foreach(_.cancel())
main_unsynchronized()
+ if (build_context.master && _state.exists_ready) {
+ build_send(Build_Process.private_data.channel_ready)
+ }
}
- sleep()
+ while(!build_action()) {}
}
}
finally {
--- a/src/Pure/Build/database_progress.scala Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/Build/database_progress.scala Wed Mar 13 23:27:44 2024 +0100
@@ -151,7 +151,6 @@
db: SQL.Database,
base_progress: Progress,
input_messages: Boolean = false,
- output_stopped: Boolean = false,
kind: String = "progress",
hostname: String = Isabelle_System.hostname(),
context_uuid: String = UUID.random_string(),
@@ -171,7 +170,6 @@
private var _agent_uuid: String = ""
private var _context: Long = -1
private var _serial: Long = 0
- private var _stopped_db: Boolean = false
private var _consumer: Consumer_Thread[Progress.Output] = null
def agent_uuid: String = synchronized { _agent_uuid }
@@ -218,7 +216,7 @@
val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
val received = db.receive(n => n.channel == Database_Progress.private_data.channel)
val ok =
- bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
+ bulk_output.nonEmpty || expired || base_progress.stopped ||
received.isEmpty ||
received.get.contains(Database_Progress.private_data.channel_ping) ||
input_messages && received.get.contains(Database_Progress.private_data.channel_output)
@@ -249,7 +247,8 @@
}
_consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
- bulk = _ => true, timeout = timeout,
+ bulk = _ => true,
+ timeout = timeout,
consume = { bulk_output =>
val results =
if (bulk_output.isEmpty) consume(Nil)
@@ -279,10 +278,10 @@
private def sync_database[A](body: => A): A = synchronized {
Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") {
- _stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
+ val stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
- if (_stopped_db && !base_progress.stopped) base_progress.stop()
- if (!_stopped_db && base_progress.stopped && output_stopped) {
+ if (stopped_db && !base_progress.stopped) base_progress.stop()
+ if (!stopped_db && base_progress.stopped) {
Database_Progress.private_data.write_progress_stopped(db, _context, true)
db.send(Database_Progress.private_data.channel_ping)
}
@@ -319,7 +318,6 @@
override def stop(): Unit = sync_context { base_progress.stop(); sync() }
override def stopped: Boolean = sync_context { base_progress.stopped }
- override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db }
override def toString: String = super.toString + ": database " + db
--- a/src/Pure/General/sql.scala Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/General/sql.scala Wed Mar 13 23:27:44 2024 +0100
@@ -406,7 +406,7 @@
/* notifications: IPC via database server */
sealed case class Notification(channel: String, payload: String = "") {
- override def toString =
+ override def toString: String =
"Notification(" + channel + if_proper(payload, "," + payload) + ")"
}
--- a/src/Pure/System/progress.scala Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/System/progress.scala Wed Mar 13 23:27:44 2024 +0100
@@ -88,7 +88,6 @@
if (Thread.interrupted()) is_stopped = true
is_stopped
}
- def stopped_local: Boolean = false
final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e }
final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt()