more scalable Database_Progress via asynchronous Consumer_Thread.fork_bulk;
authorwenzelm
Thu, 17 Aug 2023 19:01:40 +0200
changeset 78535 af37e1b4dce0
parent 78534 879e1ba3868b
child 78536 555bdac7c279
more scalable Database_Progress via asynchronous Consumer_Thread.fork_bulk;
src/Pure/System/progress.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/System/progress.scala	Thu Aug 17 16:15:25 2023 +0200
+++ b/src/Pure/System/progress.scala	Thu Aug 17 19:01:40 2023 +0200
@@ -139,12 +139,12 @@
         })
     }
 
-    def next_messages_serial(db: SQL.Database, context: Long): Long =
+    def read_messages_serial(db: SQL.Database, context: Long): Long =
       db.execute_query_statementO(
         Messages.table.select(
           List(Messages.serial.max), sql = Base.context.where_equal(context)),
         _.long(Messages.serial)
-      ).getOrElse(0L) + 1L
+      ).getOrElse(0L)
 
     def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T =
       db.execute_query_statement(
@@ -268,7 +268,8 @@
   output_stopped: Boolean = false,
   kind: String = "progress",
   hostname: String = Isabelle_System.hostname(),
-  context_uuid: String = UUID.random().toString)
+  context_uuid: String = UUID.random().toString,
+  timeout: Option[Time] = None)
 extends Progress {
   database_progress =>
 
@@ -276,6 +277,7 @@
   private var _context: Long = -1
   private var _serial: Long = 0
   private var _stopped_db: Boolean = false
+  private var _consumer: Consumer_Thread[Progress.Output] = null
 
   def agent_uuid: String = synchronized { _agent_uuid }
 
@@ -313,10 +315,37 @@
       })
     }
     if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list)
+
+    _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
+      bulk = _ => true, timeout = timeout,
+      consume = { bulk_output =>
+        sync_database {
+          _serial = _serial max Progress.private_data.read_messages_serial(db, _context)
+          val serial0 = _serial
+
+          for (out <- bulk_output) {
+            _serial += 1L
+            Progress.private_data.write_messages(db, _context, _serial, out.message)
+
+            out match {
+              case message: Progress.Message =>
+                if (do_output(message)) base_progress.output(message)
+              case theory: Progress.Theory => base_progress.theory(theory)
+            }
+          }
+
+          if (_serial != serial0) Progress.private_data.update_agent(db, _agent_uuid, _serial)
+
+          (bulk_output.map(_ => Exn.Res(())), true)
+        }
+      })
   }
 
   def exit(close: Boolean = false): Unit = synchronized {
     if (_context > 0) {
+      _consumer.shutdown()
+      _consumer = null
+
       Progress.private_data.transaction_lock(db, label = "Database_Progress.exit") {
         Progress.private_data.update_agent(db, _agent_uuid, _serial, stop_now = true)
       }
@@ -325,10 +354,14 @@
     if (close) db.close()
   }
 
-  private def sync_database[A](body: => A): A = synchronized {
+  private def sync_context[A](body: => A): A = synchronized {
     if (_context < 0) throw new IllegalStateException("Database_Progress before init")
     if (_context == 0) throw new IllegalStateException("Database_Progress after exit")
 
+    body
+  }
+
+  private def sync_database[A](body: => A): A = {
     Progress.private_data.transaction_lock(db, label = "Database_Progress.sync") {
       _stopped_db = Progress.private_data.read_progress_stopped(db, _context)
 
@@ -348,32 +381,17 @@
     }
   }
 
-  def sync(): Unit = sync_database {}
-
-  private def output_database(out: Progress.Output): Unit =
-    sync_database {
-      _serial = _serial max Progress.private_data.next_messages_serial(db, _context)
-
-      Progress.private_data.write_messages(db, _context, _serial, out.message)
+  private def sync(): Unit = sync_database {}
 
-      out match {
-        case message: Progress.Message =>
-          if (do_output(message)) base_progress.output(message)
-        case theory: Progress.Theory => base_progress.theory(theory)
-      }
-
-      Progress.private_data.update_agent(db, _agent_uuid, _serial)
-    }
-
-  override def output(message: Progress.Message): Unit = output_database(message)
-  override def theory(theory: Progress.Theory): Unit = output_database(theory)
+  override def output(message: Progress.Message): Unit = sync_context { _consumer.send(message) }
+  override def theory(theory: Progress.Theory): Unit = sync_context { _consumer.send(theory) }
 
   override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit =
     base_progress.nodes_status(nodes_status)
 
   override def verbose: Boolean = base_progress.verbose
 
-  override def stop(): Unit = synchronized { base_progress.stop(); sync() }
+  override def stop(): Unit = sync_context { base_progress.stop(); sync() }
   override def stopped: Boolean = sync_database { base_progress.stopped }
   override def stopped_local: Boolean = sync_database { base_progress.stopped && !_stopped_db }
 
--- a/src/Pure/Tools/build_process.scala	Thu Aug 17 16:15:25 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Thu Aug 17 19:01:40 2023 +0200
@@ -885,7 +885,8 @@
               output_stopped = build_context.master,
               hostname = hostname,
               context_uuid = build_uuid,
-              kind = "build_process")
+              kind = "build_process",
+              timeout = Some(build_delay))
           (progress, progress.agent_uuid)
         }
         catch { case exn: Throwable => close(); throw exn }
@@ -929,7 +930,6 @@
       _build_database match {
         case None => body
         case Some(db) =>
-          progress.asInstanceOf[Database_Progress].sync()
           Build_Process.private_data.transaction_lock(db, label = label) {
             _state = Build_Process.private_data.pull_database(db, worker_uuid, hostname, _state)
             val res = body