--- a/src/Pure/System/progress.scala Thu Aug 17 16:15:25 2023 +0200
+++ b/src/Pure/System/progress.scala Thu Aug 17 19:01:40 2023 +0200
@@ -139,12 +139,12 @@
})
}
- def next_messages_serial(db: SQL.Database, context: Long): Long =
+ def read_messages_serial(db: SQL.Database, context: Long): Long =
db.execute_query_statementO(
Messages.table.select(
List(Messages.serial.max), sql = Base.context.where_equal(context)),
_.long(Messages.serial)
- ).getOrElse(0L) + 1L
+ ).getOrElse(0L)
def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T =
db.execute_query_statement(
@@ -268,7 +268,8 @@
output_stopped: Boolean = false,
kind: String = "progress",
hostname: String = Isabelle_System.hostname(),
- context_uuid: String = UUID.random().toString)
+ context_uuid: String = UUID.random().toString,
+ timeout: Option[Time] = None)
extends Progress {
database_progress =>
@@ -276,6 +277,7 @@
private var _context: Long = -1
private var _serial: Long = 0
private var _stopped_db: Boolean = false
+ private var _consumer: Consumer_Thread[Progress.Output] = null
def agent_uuid: String = synchronized { _agent_uuid }
@@ -313,10 +315,37 @@
})
}
if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list)
+
+ _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
+ bulk = _ => true, timeout = timeout,
+ consume = { bulk_output =>
+ sync_database {
+ _serial = _serial max Progress.private_data.read_messages_serial(db, _context)
+ val serial0 = _serial
+
+ for (out <- bulk_output) {
+ _serial += 1L
+ Progress.private_data.write_messages(db, _context, _serial, out.message)
+
+ out match {
+ case message: Progress.Message =>
+ if (do_output(message)) base_progress.output(message)
+ case theory: Progress.Theory => base_progress.theory(theory)
+ }
+ }
+
+ if (_serial != serial0) Progress.private_data.update_agent(db, _agent_uuid, _serial)
+
+ (bulk_output.map(_ => Exn.Res(())), true)
+ }
+ })
}
def exit(close: Boolean = false): Unit = synchronized {
if (_context > 0) {
+ _consumer.shutdown()
+ _consumer = null
+
Progress.private_data.transaction_lock(db, label = "Database_Progress.exit") {
Progress.private_data.update_agent(db, _agent_uuid, _serial, stop_now = true)
}
@@ -325,10 +354,14 @@
if (close) db.close()
}
- private def sync_database[A](body: => A): A = synchronized {
+ private def sync_context[A](body: => A): A = synchronized {
if (_context < 0) throw new IllegalStateException("Database_Progress before init")
if (_context == 0) throw new IllegalStateException("Database_Progress after exit")
+ body
+ }
+
+ private def sync_database[A](body: => A): A = {
Progress.private_data.transaction_lock(db, label = "Database_Progress.sync") {
_stopped_db = Progress.private_data.read_progress_stopped(db, _context)
@@ -348,32 +381,17 @@
}
}
- def sync(): Unit = sync_database {}
-
- private def output_database(out: Progress.Output): Unit =
- sync_database {
- _serial = _serial max Progress.private_data.next_messages_serial(db, _context)
-
- Progress.private_data.write_messages(db, _context, _serial, out.message)
+ private def sync(): Unit = sync_database {}
- out match {
- case message: Progress.Message =>
- if (do_output(message)) base_progress.output(message)
- case theory: Progress.Theory => base_progress.theory(theory)
- }
-
- Progress.private_data.update_agent(db, _agent_uuid, _serial)
- }
-
- override def output(message: Progress.Message): Unit = output_database(message)
- override def theory(theory: Progress.Theory): Unit = output_database(theory)
+ override def output(message: Progress.Message): Unit = sync_context { _consumer.send(message) }
+ override def theory(theory: Progress.Theory): Unit = sync_context { _consumer.send(theory) }
override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit =
base_progress.nodes_status(nodes_status)
override def verbose: Boolean = base_progress.verbose
- override def stop(): Unit = synchronized { base_progress.stop(); sync() }
+ override def stop(): Unit = sync_context { base_progress.stop(); sync() }
override def stopped: Boolean = sync_database { base_progress.stopped }
override def stopped_local: Boolean = sync_database { base_progress.stopped && !_stopped_db }
--- a/src/Pure/Tools/build_process.scala Thu Aug 17 16:15:25 2023 +0200
+++ b/src/Pure/Tools/build_process.scala Thu Aug 17 19:01:40 2023 +0200
@@ -885,7 +885,8 @@
output_stopped = build_context.master,
hostname = hostname,
context_uuid = build_uuid,
- kind = "build_process")
+ kind = "build_process",
+ timeout = Some(build_delay))
(progress, progress.agent_uuid)
}
catch { case exn: Throwable => close(); throw exn }
@@ -929,7 +930,6 @@
_build_database match {
case None => body
case Some(db) =>
- progress.asInstanceOf[Database_Progress].sync()
Build_Process.private_data.transaction_lock(db, label = label) {
_state = Build_Process.private_data.pull_database(db, worker_uuid, hostname, _state)
val res = body