database performance tuning: prefer light-weight IPC over heavy-duty transactions;
--- a/src/Pure/System/progress.scala Sun Feb 25 20:13:08 2024 +0100
+++ b/src/Pure/System/progress.scala Sun Feb 25 20:40:21 2024 +0100
@@ -97,6 +97,10 @@
val table = make_table(List(context, serial, kind, text, verbose), name = "messages")
}
+ val channel: String = Base.table.name
+ val channel_ping: SQL.Notification = SQL.Notification(channel, payload = "ping")
+ val channel_output: SQL.Notification = SQL.Notification(channel, payload = "output")
+
def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] =
db.execute_query_statementO(
Base.table.select(List(Base.context),
@@ -275,7 +279,8 @@
kind: String = "progress",
hostname: String = Isabelle_System.hostname(),
context_uuid: String = UUID.random_string(),
- timeout: Option[Time] = None)
+ timeout: Option[Time] = None,
+ tick_expire: Int = 50)
extends Progress {
database_progress =>
@@ -286,6 +291,7 @@
error("Bad Database_Progress.context_uuid: " + quote(context_uuid))
}
+ private var _tick: Long = 0
private var _agent_uuid: String = ""
private var _context: Long = -1
private var _serial: Long = 0
@@ -295,6 +301,7 @@
def agent_uuid: String = synchronized { _agent_uuid }
private def init(): Unit = synchronized {
+ db.listen(Progress.private_data.channel)
Progress.private_data.transaction_lock(db, create = true, label = "Database_Progress.init") {
Progress.private_data.read_progress_context(db, context_uuid) match {
case Some(context) =>
@@ -328,24 +335,38 @@
}
if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list)
- def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = sync_database {
- if (bulk_output.nonEmpty) {
- for (out <- bulk_output) {
- out match {
- case message: Progress.Message =>
- if (do_output(message)) base_progress.output(message)
- case theory: Progress.Theory => base_progress.theory(theory)
+ def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = {
+ val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
+ val received = db.receive(n => n.channel == Progress.private_data.channel)
+ val ok =
+ bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
+ received.isEmpty ||
+ received.get.contains(Progress.private_data.channel_ping) ||
+ input_messages && received.get.contains(Progress.private_data.channel_output)
+ if (ok) {
+ sync_database {
+ if (bulk_output.nonEmpty) {
+ for (out <- bulk_output) {
+ out match {
+ case message: Progress.Message =>
+ if (do_output(message)) base_progress.output(message)
+ case theory: Progress.Theory => base_progress.theory(theory)
+ }
+ }
+
+ val messages =
+ for ((out, i) <- bulk_output.zipWithIndex)
+ yield (_serial + i + 1) -> out.message
+
+ Progress.private_data.write_messages(db, _context, messages)
+ _serial = messages.last._1
+
+ db.send(Progress.private_data.channel_output)
}
+ bulk_output.map(_ => Exn.Res(()))
}
-
- val messages =
- for ((out, i) <- bulk_output.zipWithIndex)
- yield (_serial + i + 1) -> out.message
-
- Progress.private_data.write_messages(db, _context, messages)
- _serial = messages.last._1
}
- bulk_output.map(_ => Exn.Res(()))
+ else Nil
}
_consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
@@ -384,6 +405,7 @@
if (_stopped_db && !base_progress.stopped) base_progress.stop()
if (!_stopped_db && base_progress.stopped && output_stopped) {
Progress.private_data.write_progress_stopped(db, _context, true)
+ db.send(Progress.private_data.channel_ping)
}
val serial0 = _serial