database performance tuning: prefer light-weight IPC over heavy-duty transactions;
authorwenzelm
Sun, 25 Feb 2024 20:40:21 +0100
changeset 79729 bf377e10ff3b
parent 79728 df4eb4b05ecd
child 79730 4031aafc2dda
database performance tuning: prefer light-weight IPC over heavy-duty transactions;
src/Pure/System/progress.scala
--- a/src/Pure/System/progress.scala	Sun Feb 25 20:13:08 2024 +0100
+++ b/src/Pure/System/progress.scala	Sun Feb 25 20:40:21 2024 +0100
@@ -97,6 +97,10 @@
       val table = make_table(List(context, serial, kind, text, verbose), name = "messages")
     }
 
+    val channel: String = Base.table.name
+    val channel_ping: SQL.Notification = SQL.Notification(channel, payload = "ping")
+    val channel_output: SQL.Notification = SQL.Notification(channel, payload = "output")
+
     def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] =
       db.execute_query_statementO(
         Base.table.select(List(Base.context),
@@ -275,7 +279,8 @@
   kind: String = "progress",
   hostname: String = Isabelle_System.hostname(),
   context_uuid: String = UUID.random_string(),
-  timeout: Option[Time] = None)
+  timeout: Option[Time] = None,
+  tick_expire: Int = 50)
 extends Progress {
   database_progress =>
 
@@ -286,6 +291,7 @@
     error("Bad Database_Progress.context_uuid: " + quote(context_uuid))
   }
 
+  private var _tick: Long = 0
   private var _agent_uuid: String = ""
   private var _context: Long = -1
   private var _serial: Long = 0
@@ -295,6 +301,7 @@
   def agent_uuid: String = synchronized { _agent_uuid }
 
   private def init(): Unit = synchronized {
+    db.listen(Progress.private_data.channel)
     Progress.private_data.transaction_lock(db, create = true, label = "Database_Progress.init") {
       Progress.private_data.read_progress_context(db, context_uuid) match {
         case Some(context) =>
@@ -328,24 +335,38 @@
     }
     if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list)
 
-    def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = sync_database {
-      if (bulk_output.nonEmpty) {
-        for (out <- bulk_output) {
-          out match {
-            case message: Progress.Message =>
-              if (do_output(message)) base_progress.output(message)
-            case theory: Progress.Theory => base_progress.theory(theory)
+    def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = {
+      val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
+      val received = db.receive(n => n.channel == Progress.private_data.channel)
+      val ok =
+        bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
+        received.isEmpty ||
+        received.get.contains(Progress.private_data.channel_ping) ||
+        input_messages && received.get.contains(Progress.private_data.channel_output)
+      if (ok) {
+        sync_database {
+          if (bulk_output.nonEmpty) {
+            for (out <- bulk_output) {
+              out match {
+                case message: Progress.Message =>
+                  if (do_output(message)) base_progress.output(message)
+                case theory: Progress.Theory => base_progress.theory(theory)
+              }
+            }
+
+            val messages =
+              for ((out, i) <- bulk_output.zipWithIndex)
+                yield (_serial + i + 1) -> out.message
+
+            Progress.private_data.write_messages(db, _context, messages)
+            _serial = messages.last._1
+
+            db.send(Progress.private_data.channel_output)
           }
+          bulk_output.map(_ => Exn.Res(()))
         }
-
-        val messages =
-          for ((out, i) <- bulk_output.zipWithIndex)
-            yield (_serial + i + 1) -> out.message
-
-        Progress.private_data.write_messages(db, _context, messages)
-        _serial = messages.last._1
       }
-      bulk_output.map(_ => Exn.Res(()))
+      else Nil
     }
 
     _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
@@ -384,6 +405,7 @@
       if (_stopped_db && !base_progress.stopped) base_progress.stop()
       if (!_stopped_db && base_progress.stopped && output_stopped) {
         Progress.private_data.write_progress_stopped(db, _context, true)
+        db.send(Progress.private_data.channel_ping)
       }
 
       val serial0 = _serial