database performance tuning: prefer light-weight IPC over heavy-duty transactions, following bf377e10ff3b;
authorwenzelm
Wed, 13 Mar 2024 17:36:35 +0100
changeset 79886 7ae25372ab04
parent 79885 70d4dcede0dc
child 79887 17220dc05991
database performance tuning: prefer light-weight IPC over heavy-duty transactions, following bf377e10ff3b;
etc/options
src/Pure/Build/build_process.scala
--- a/etc/options	Wed Mar 13 16:14:23 2024 +0100
+++ b/etc/options	Wed Mar 13 17:36:35 2024 +0100
@@ -204,8 +204,11 @@
 option build_delay : real = 0.2
   -- "delay build process main loop (local)"
 
-option build_cluster_delay : real = 1.0
-  -- "delay build process main loop (cluster)"
+option build_delay_master : real = 1.0
+  -- "delay build process main loop (cluster master)"
+
+option build_delay_worker : real = 0.5
+  -- "delay build process main loop (cluster worker)"
 
 option build_cluster_expire : int = 50
   -- "enforce database synchronization after given number of delay loops"
--- a/src/Pure/Build/build_process.scala	Wed Mar 13 16:14:23 2024 +0100
+++ b/src/Pure/Build/build_process.scala	Wed Mar 13 17:36:35 2024 +0100
@@ -234,6 +234,8 @@
 
     def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name)
     def next_ready: List[Task] = ready.filter(task => !is_running(task.name))
+    def exists_ready: Boolean =
+      pending.valuesIterator.exists(task => task.is_ready && !is_running(task.name))
 
     def remove_pending(a: String): State =
       copy(pending =
@@ -250,6 +252,11 @@
 
     def is_running(name: String): Boolean = running.isDefinedAt(name)
 
+    def finished_running(): Boolean = running.valuesIterator.exists(_.is_finished)
+
+    def busy_running(jobs: Int): Boolean =
+      jobs <= 0 || jobs <= running.valuesIterator.flatMap(_.build).length
+
     def build_running: List[Job] =
       List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job)
 
@@ -305,6 +312,12 @@
       tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t))
 
 
+    /* notifications */
+
+    lazy val channel: String = Base.table.name
+    lazy val channel_ready: SQL.Notification = SQL.Notification(channel, payload = "ready")
+
+
     /* generic columns */
 
     object Generic {
@@ -946,6 +959,7 @@
     build_start: Date
   ): Long =
     private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") {
+      db.listen(private_data.channel)
       val build_uuid = build_context.build_uuid
       val build_id = private_data.get_build_id(db, build_uuid)
       if (build_context.master) {
@@ -1016,16 +1030,27 @@
     }
     catch { case exn: Throwable => close(); throw exn }
 
-  protected val build_delay: Time = {
-    val option =
-      _build_database match {
-        case Some(db) if db.is_postgresql => "build_cluster_delay"
-        case _ => "build_delay"
-      }
-    build_options.seconds(option)
-  }
+  protected def build_receive(filter: SQL.Notification => Boolean): List[SQL.Notification] =
+    _build_database.flatMap(_.receive(filter)).getOrElse(Nil)
+
+  protected def build_send(notification: SQL.Notification): Unit =
+    _build_database.foreach(_.send(notification))
 
-  protected val build_expire: Int = build_options.int("build_cluster_expire")
+  protected def build_cluster: Boolean =
+    _build_database match {
+      case Some(db) => db.is_postgresql
+      case None => false
+    }
+
+  protected val build_delay: Time =
+    build_options.seconds(
+      if (!build_cluster) "build_delay"
+      else if (build_context.master) "build_delay_master"
+      else "build_delay_worker")
+
+  protected val build_expire: Int =
+    if (!build_cluster || build_context.master) 1
+    else build_options.int("build_cluster_expire").max(1)
 
   protected val _host_database: SQL.Database =
     try { store.open_build_database(path = Host.private_data.database, server = server) }
@@ -1256,8 +1281,24 @@
     else _state.pending.isEmpty
   }
 
-  protected def sleep(): Unit =
-    Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() }
+  private var _build_tick: Long = 0L
+
+  protected def build_action(): Boolean =
+    Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
+      val received = build_receive(n => n.channel == Build_Process.private_data.channel)
+      val ready = received.contains(Build_Process.private_data.channel_ready)
+      val reactive = ready && synchronized { !_state.busy_running(build_context.jobs) }
+
+      val finished = synchronized { _state.finished_running() }
+
+      def sleep: Boolean = {
+        build_delay.sleep()
+        val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 }
+        expired || reactive || progress.stopped
+      }
+
+      finished || sleep
+    }
 
   protected def init_unsynchronized(): Unit = {
     if (build_context.master) {
@@ -1325,8 +1366,11 @@
           synchronized_database("Build_Process.main") {
             if (progress.stopped) _state.build_running.foreach(_.cancel())
             main_unsynchronized()
+            if (build_context.master && _state.exists_ready) {
+              build_send(Build_Process.private_data.channel_ready)
+            }
           }
-          sleep()
+          while(!build_action()) {}
         }
       }
       finally {