database performance tuning: prefer light-weight IPC over heavy-duty transactions, following bf377e10ff3b;
--- a/etc/options Wed Mar 13 16:14:23 2024 +0100
+++ b/etc/options Wed Mar 13 17:36:35 2024 +0100
@@ -204,8 +204,11 @@
option build_delay : real = 0.2
-- "delay build process main loop (local)"
-option build_cluster_delay : real = 1.0
- -- "delay build process main loop (cluster)"
+option build_delay_master : real = 1.0
+ -- "delay build process main loop (cluster master)"
+
+option build_delay_worker : real = 0.5
+ -- "delay build process main loop (cluster worker)"
option build_cluster_expire : int = 50
-- "enforce database synchronization after given number of delay loops"
--- a/src/Pure/Build/build_process.scala Wed Mar 13 16:14:23 2024 +0100
+++ b/src/Pure/Build/build_process.scala Wed Mar 13 17:36:35 2024 +0100
@@ -234,6 +234,8 @@
def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name)
def next_ready: List[Task] = ready.filter(task => !is_running(task.name))
+ def exists_ready: Boolean =
+ pending.valuesIterator.exists(task => task.is_ready && !is_running(task.name))
def remove_pending(a: String): State =
copy(pending =
@@ -250,6 +252,11 @@
def is_running(name: String): Boolean = running.isDefinedAt(name)
+ def finished_running(): Boolean = running.valuesIterator.exists(_.is_finished)
+
+ def busy_running(jobs: Int): Boolean =
+ jobs <= 0 || jobs <= running.valuesIterator.flatMap(_.build).length
+
def build_running: List[Job] =
List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job)
@@ -305,6 +312,12 @@
tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t))
+ /* notifications */
+
+ lazy val channel: String = Base.table.name
+ lazy val channel_ready: SQL.Notification = SQL.Notification(channel, payload = "ready")
+
+
/* generic columns */
object Generic {
@@ -946,6 +959,7 @@
build_start: Date
): Long =
private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") {
+ db.listen(private_data.channel)
val build_uuid = build_context.build_uuid
val build_id = private_data.get_build_id(db, build_uuid)
if (build_context.master) {
@@ -1016,16 +1030,27 @@
}
catch { case exn: Throwable => close(); throw exn }
- protected val build_delay: Time = {
- val option =
- _build_database match {
- case Some(db) if db.is_postgresql => "build_cluster_delay"
- case _ => "build_delay"
- }
- build_options.seconds(option)
- }
+ protected def build_receive(filter: SQL.Notification => Boolean): List[SQL.Notification] =
+ _build_database.flatMap(_.receive(filter)).getOrElse(Nil)
+
+ protected def build_send(notification: SQL.Notification): Unit =
+ _build_database.foreach(_.send(notification))
- protected val build_expire: Int = build_options.int("build_cluster_expire")
+ protected def build_cluster: Boolean =
+ _build_database match {
+ case Some(db) => db.is_postgresql
+ case None => false
+ }
+
+ protected val build_delay: Time =
+ build_options.seconds(
+ if (!build_cluster) "build_delay"
+ else if (build_context.master) "build_delay_master"
+ else "build_delay_worker")
+
+ protected val build_expire: Int =
+ if (!build_cluster || build_context.master) 1
+ else build_options.int("build_cluster_expire").max(1)
protected val _host_database: SQL.Database =
try { store.open_build_database(path = Host.private_data.database, server = server) }
@@ -1256,8 +1281,24 @@
else _state.pending.isEmpty
}
- protected def sleep(): Unit =
- Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() }
+ private var _build_tick: Long = 0L
+
+ protected def build_action(): Boolean =
+ Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
+ val received = build_receive(n => n.channel == Build_Process.private_data.channel)
+ val ready = received.contains(Build_Process.private_data.channel_ready)
+ val reactive = ready && synchronized { !_state.busy_running(build_context.jobs) }
+
+ val finished = synchronized { _state.finished_running() }
+
+ def sleep: Boolean = {
+ build_delay.sleep()
+ val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 }
+ expired || reactive || progress.stopped
+ }
+
+ finished || sleep
+ }
protected def init_unsynchronized(): Unit = {
if (build_context.master) {
@@ -1325,8 +1366,11 @@
synchronized_database("Build_Process.main") {
if (progress.stopped) _state.build_running.foreach(_.cancel())
main_unsynchronized()
+ if (build_context.master && _state.exists_ready) {
+ build_send(Build_Process.private_data.channel_ready)
+ }
}
- sleep()
+ while(!build_action()) {}
}
}
finally {