# HG changeset patch # User wenzelm # Date 1710347795 -3600 # Node ID 7ae25372ab043eaca7b9675273025d4b0a389e36 # Parent 70d4dcede0dc680c69d0810a51629c3b31726693 database performance tuning: prefer light-weight IPC over heavy-duty transactions, following bf377e10ff3b; diff -r 70d4dcede0dc -r 7ae25372ab04 etc/options --- a/etc/options Wed Mar 13 16:14:23 2024 +0100 +++ b/etc/options Wed Mar 13 17:36:35 2024 +0100 @@ -204,8 +204,11 @@ option build_delay : real = 0.2 -- "delay build process main loop (local)" -option build_cluster_delay : real = 1.0 - -- "delay build process main loop (cluster)" +option build_delay_master : real = 1.0 + -- "delay build process main loop (cluster master)" + +option build_delay_worker : real = 0.5 + -- "delay build process main loop (cluster worker)" option build_cluster_expire : int = 50 -- "enforce database synchronization after given number of delay loops" diff -r 70d4dcede0dc -r 7ae25372ab04 src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala Wed Mar 13 16:14:23 2024 +0100 +++ b/src/Pure/Build/build_process.scala Wed Mar 13 17:36:35 2024 +0100 @@ -234,6 +234,8 @@ def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name) def next_ready: List[Task] = ready.filter(task => !is_running(task.name)) + def exists_ready: Boolean = + pending.valuesIterator.exists(task => task.is_ready && !is_running(task.name)) def remove_pending(a: String): State = copy(pending = @@ -250,6 +252,11 @@ def is_running(name: String): Boolean = running.isDefinedAt(name) + def finished_running(): Boolean = running.valuesIterator.exists(_.is_finished) + + def busy_running(jobs: Int): Boolean = + jobs <= 0 || jobs <= running.valuesIterator.flatMap(_.build).length + def build_running: List[Job] = List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job) @@ -305,6 +312,12 @@ tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t)) + /* notifications */ + + lazy val channel: String = Base.table.name + lazy val channel_ready: SQL.Notification = SQL.Notification(channel, payload = "ready") + + /* generic columns */ object Generic { @@ -946,6 +959,7 @@ build_start: Date ): Long = private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") { + db.listen(private_data.channel) val build_uuid = build_context.build_uuid val build_id = private_data.get_build_id(db, build_uuid) if (build_context.master) { @@ -1016,16 +1030,27 @@ } catch { case exn: Throwable => close(); throw exn } - protected val build_delay: Time = { - val option = - _build_database match { - case Some(db) if db.is_postgresql => "build_cluster_delay" - case _ => "build_delay" - } - build_options.seconds(option) - } + protected def build_receive(filter: SQL.Notification => Boolean): List[SQL.Notification] = + _build_database.flatMap(_.receive(filter)).getOrElse(Nil) + + protected def build_send(notification: SQL.Notification): Unit = + _build_database.foreach(_.send(notification)) - protected val build_expire: Int = build_options.int("build_cluster_expire") + protected def build_cluster: Boolean = + _build_database match { + case Some(db) => db.is_postgresql + case None => false + } + + protected val build_delay: Time = + build_options.seconds( + if (!build_cluster) "build_delay" + else if (build_context.master) "build_delay_master" + else "build_delay_worker") + + protected val build_expire: Int = + if (!build_cluster || build_context.master) 1 + else build_options.int("build_cluster_expire").max(1) protected val _host_database: SQL.Database = try { store.open_build_database(path = Host.private_data.database, server = server) } @@ -1256,8 +1281,24 @@ else _state.pending.isEmpty } - protected def sleep(): Unit = - Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() } + private var _build_tick: Long = 0L + + protected def build_action(): Boolean = + Isabelle_Thread.interrupt_handler(_ => progress.stop()) { + val received = build_receive(n => n.channel == Build_Process.private_data.channel) + val ready = received.contains(Build_Process.private_data.channel_ready) + val reactive = ready && synchronized { !_state.busy_running(build_context.jobs) } + + val finished = synchronized { _state.finished_running() } + + def sleep: Boolean = { + build_delay.sleep() + val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 } + expired || reactive || progress.stopped + } + + finished || sleep + } protected def init_unsynchronized(): Unit = { if (build_context.master) { @@ -1325,8 +1366,11 @@ synchronized_database("Build_Process.main") { if (progress.stopped) _state.build_running.foreach(_.cancel()) main_unsynchronized() + if (build_context.master && _state.exists_ready) { + build_send(Build_Process.private_data.channel_ready) + } } - sleep() + while(!build_action()) {} } } finally {