--- a/etc/build.props Tue Mar 12 15:31:44 2024 +0100
+++ b/etc/build.props Tue Mar 12 15:57:25 2024 +0100
@@ -56,6 +56,7 @@
src/Pure/Build/build_job.scala \
src/Pure/Build/build_process.scala \
src/Pure/Build/build_schedule.scala \
+ src/Pure/Build/database_progress.scala \
src/Pure/Build/export.scala \
src/Pure/Build/export_theory.scala \
src/Pure/Build/file_format.scala \
@@ -190,6 +191,7 @@
src/Pure/System/platform.scala \
src/Pure/System/posix_interrupt.scala \
src/Pure/System/process_result.scala \
+ src/Pure/System/program_progress.scala \
src/Pure/System/progress.scala \
src/Pure/System/registry.scala \
src/Pure/System/scala.scala \
--- a/src/Pure/Build/build.scala Tue Mar 12 15:31:44 2024 +0100
+++ b/src/Pure/Build/build.scala Tue Mar 12 15:57:25 2024 +0100
@@ -522,7 +522,7 @@
val tables0 =
ML_Heap.private_data.tables.list :::
Store.private_data.tables.list :::
- Progress.private_data.tables.list :::
+ Database_Progress.private_data.tables.list :::
Build_Process.private_data.tables.list
val tables = tables0.filter(t => db.exists_table(t.name)).sortBy(_.name)
if (tables.nonEmpty) {
--- a/src/Pure/Build/build_process.scala Tue Mar 12 15:31:44 2024 +0100
+++ b/src/Pure/Build/build_process.scala Tue Mar 12 15:57:25 2024 +0100
@@ -1027,7 +1027,7 @@
if (_build_database.isEmpty) (build_progress, UUID.random_string())
else {
try {
- val db = store.open_build_database(Progress.private_data.database, server = server)
+ val db = store.open_build_database(Database_Progress.private_data.database, server = server)
val progress =
new Database_Progress(db, build_progress,
input_messages = build_context.master,
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Build/database_progress.scala Tue Mar 12 15:57:25 2024 +0100
@@ -0,0 +1,328 @@
+/* Title: Pure/Build/database_progress.scala
+ Author: Makarius
+
+System progress backed by shared database: local SQLite or remote PostgreSQL.
+*/
+
+package isabelle
+
+
+import scala.collection.immutable.SortedMap
+
+
+object Database_Progress {
+ /* SQL data model */
+
+ object private_data extends SQL.Data("isabelle_progress") {
+ val database: Path = Path.explode("$ISABELLE_HOME_USER/progress.db")
+
+ override lazy val tables: SQL.Tables =
+ SQL.Tables(Base.table, Agents.table, Messages.table)
+
+ object Base {
+ val context_uuid = SQL.Column.string("context_uuid").make_primary_key
+ val context = SQL.Column.long("context").make_primary_key
+ val stopped = SQL.Column.bool("stopped")
+
+ val table = make_table(List(context_uuid, context, stopped))
+ }
+
+ object Agents {
+ val agent_uuid = SQL.Column.string("agent_uuid").make_primary_key
+ val context_uuid = SQL.Column.string("context_uuid").make_primary_key
+ val kind = SQL.Column.string("kind")
+ val hostname = SQL.Column.string("hostname")
+ val java_pid = SQL.Column.long("java_pid")
+ val java_start = SQL.Column.date("java_start")
+ val start = SQL.Column.date("start")
+ val stamp = SQL.Column.date("stamp")
+ val stop = SQL.Column.date("stop")
+ val seen = SQL.Column.long("seen")
+
+ val table =
+ make_table(
+ List(agent_uuid, context_uuid, kind, hostname, java_pid, java_start,
+ start, stamp, stop, seen),
+ name = "agents")
+ }
+
+ object Messages {
+ type T = SortedMap[Long, Progress.Message]
+ val empty: T = SortedMap.empty
+
+ val context = SQL.Column.long("context").make_primary_key
+ val serial = SQL.Column.long("serial").make_primary_key
+ val kind = SQL.Column.int("kind")
+ val text = SQL.Column.string("text")
+ val verbose = SQL.Column.bool("verbose")
+
+ val table = make_table(List(context, serial, kind, text, verbose), name = "messages")
+ }
+
+ val channel: String = Base.table.name
+ val channel_ping: SQL.Notification = SQL.Notification(channel, payload = "ping")
+ val channel_output: SQL.Notification = SQL.Notification(channel, payload = "output")
+
+ def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] =
+ db.execute_query_statementO(
+ Base.table.select(List(Base.context),
+ sql = Base.context_uuid.where_equal(context_uuid)), _.long(Base.context))
+
+ def next_progress_context(db: SQL.Database): Long =
+ db.execute_query_statementO(
+ Base.table.select(List(Base.context.max)), _.long(Base.context)).getOrElse(0L) + 1L
+
+ def read_progress_stopped(db: SQL.Database, context: Long): Boolean =
+ db.execute_query_statementO(
+ Base.table.select(List(Base.stopped), sql = Base.context.where_equal(context)),
+ _.bool(Base.stopped)
+ ).getOrElse(false)
+
+ def write_progress_stopped(db: SQL.Database, context: Long, stopped: Boolean): Unit =
+ db.execute_statement(
+ Base.table.update(List(Base.stopped), sql = Base.context.where_equal(context)),
+ body = { stmt => stmt.bool(1) = stopped })
+
+ def update_agent(
+ db: SQL.Database,
+ agent_uuid: String,
+ seen: Long,
+ stop_now: Boolean = false
+ ): Unit = {
+ val sql = Agents.agent_uuid.where_equal(agent_uuid)
+
+ val stop =
+ db.execute_query_statementO(
+ Agents.table.select(List(Agents.stop), sql = sql), _.get_date(Agents.stop)).flatten
+
+ db.execute_statement(
+ Agents.table.update(List(Agents.stamp, Agents.stop, Agents.seen), sql = sql),
+ body = { stmt =>
+ val now = db.now()
+ stmt.date(1) = now
+ stmt.date(2) = if (stop_now) Some(now) else stop
+ stmt.long(3) = seen
+ })
+ }
+
+ def read_messages_serial(db: SQL.Database, context: Long): Long =
+ db.execute_query_statementO(
+ Messages.table.select(
+ List(Messages.serial.max), sql = Base.context.where_equal(context)),
+ _.long(Messages.serial)
+ ).getOrElse(0L)
+
+ def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T =
+ db.execute_query_statement(
+ Messages.table.select(
+ List(Messages.serial, Messages.kind, Messages.text, Messages.verbose),
+ sql =
+ SQL.where_and(
+ Messages.context.ident + " = " + context,
+ if (seen <= 0) "" else Messages.serial.ident + " > " + seen)),
+ SortedMap.from[Long, Progress.Message],
+ { res =>
+ val serial = res.long(Messages.serial)
+ val kind = Progress.Kind.fromOrdinal(res.int(Messages.kind))
+ val text = res.string(Messages.text)
+ val verbose = res.bool(Messages.verbose)
+ serial -> Progress.Message(kind, text, verbose = verbose)
+ }
+ )
+
+ def write_messages(
+ db: SQL.Database,
+ context: Long,
+ messages: List[(Long, Progress.Message)]
+ ): Unit = {
+ db.execute_batch_statement(Messages.table.insert(), batch =
+ for ((serial, message) <- messages) yield { (stmt: SQL.Statement) =>
+ stmt.long(1) = context
+ stmt.long(2) = serial
+ stmt.int(3) = message.kind.ordinal
+ stmt.string(4) = message.text
+ stmt.bool(5) = message.verbose
+ })
+ }
+ }
+}
+
+class Database_Progress(
+ db: SQL.Database,
+ base_progress: Progress,
+ input_messages: Boolean = false,
+ output_stopped: Boolean = false,
+ kind: String = "progress",
+ hostname: String = Isabelle_System.hostname(),
+ context_uuid: String = UUID.random_string(),
+ timeout: Option[Time] = None,
+ tick_expire: Int = 50)
+extends Progress {
+ database_progress =>
+
+ override def now(): Date = db.now()
+ override val start: Date = now()
+
+ if (UUID.unapply(context_uuid).isEmpty) {
+ error("Bad Database_Progress.context_uuid: " + quote(context_uuid))
+ }
+
+ private var _tick: Long = 0
+ private var _agent_uuid: String = ""
+ private var _context: Long = -1
+ private var _serial: Long = 0
+ private var _stopped_db: Boolean = false
+ private var _consumer: Consumer_Thread[Progress.Output] = null
+
+ def agent_uuid: String = synchronized { _agent_uuid }
+
+ private def init(): Unit = synchronized {
+ db.listen(Database_Progress.private_data.channel)
+ Database_Progress.private_data.transaction_lock(db,
+ create = true,
+ label = "Database_Progress.init"
+ ) {
+ Database_Progress.private_data.read_progress_context(db, context_uuid) match {
+ case Some(context) =>
+ _context = context
+ _agent_uuid = UUID.random_string()
+ case None =>
+ _context = Database_Progress.private_data.next_progress_context(db)
+ _agent_uuid = context_uuid
+ db.execute_statement(Database_Progress.private_data.Base.table.insert(), { stmt =>
+ stmt.string(1) = context_uuid
+ stmt.long(2) = _context
+ stmt.bool(3) = false
+ })
+ }
+ db.execute_statement(Database_Progress.private_data.Agents.table.insert(), { stmt =>
+ val java = ProcessHandle.current()
+ val java_pid = java.pid
+ val java_start = Date.instant(java.info.startInstant.get)
+
+ stmt.string(1) = _agent_uuid
+ stmt.string(2) = context_uuid
+ stmt.string(3) = kind
+ stmt.string(4) = hostname
+ stmt.long(5) = java_pid
+ stmt.date(6) = java_start
+ stmt.date(7) = start
+ stmt.date(8) = start
+ stmt.date(9) = None
+ stmt.long(10) = 0L
+ })
+ }
+ if (context_uuid == _agent_uuid) db.vacuum(Database_Progress.private_data.tables.list)
+
+ def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = {
+ val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
+ val received = db.receive(n => n.channel == Database_Progress.private_data.channel)
+ val ok =
+ bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
+ received.isEmpty ||
+ received.get.contains(Database_Progress.private_data.channel_ping) ||
+ input_messages && received.get.contains(Database_Progress.private_data.channel_output)
+ if (ok) {
+ sync_database {
+ if (bulk_output.nonEmpty) {
+ for (out <- bulk_output) {
+ out match {
+ case message: Progress.Message =>
+ if (do_output(message)) base_progress.output(message)
+ case theory: Progress.Theory => base_progress.theory(theory)
+ }
+ }
+
+ val messages =
+ for ((out, i) <- bulk_output.zipWithIndex)
+ yield (_serial + i + 1) -> out.message
+
+ Database_Progress.private_data.write_messages(db, _context, messages)
+ _serial = messages.last._1
+
+ db.send(Database_Progress.private_data.channel_output)
+ }
+ bulk_output.map(_ => Exn.Res(()))
+ }
+ }
+ else Nil
+ }
+
+ _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
+ bulk = _ => true, timeout = timeout,
+ consume = { bulk_output =>
+ val results =
+ if (bulk_output.isEmpty) consume(Nil)
+ else bulk_output.grouped(200).toList.flatMap(consume)
+ (results, true) })
+ }
+
+ def close(): Unit = synchronized {
+ if (_context > 0) {
+ _consumer.shutdown()
+ _consumer = null
+
+ Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.exit") {
+ Database_Progress.private_data.update_agent(db, _agent_uuid, _serial, stop_now = true)
+ }
+ _context = 0
+ }
+ db.close()
+ }
+
+ private def sync_context[A](body: => A): A = synchronized {
+ if (_context < 0) throw new IllegalStateException("Database_Progress before init")
+ if (_context == 0) throw new IllegalStateException("Database_Progress after exit")
+
+ body
+ }
+
+ private def sync_database[A](body: => A): A = synchronized {
+ Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") {
+ _stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
+
+ if (_stopped_db && !base_progress.stopped) base_progress.stop()
+ if (!_stopped_db && base_progress.stopped && output_stopped) {
+ Database_Progress.private_data.write_progress_stopped(db, _context, true)
+ db.send(Database_Progress.private_data.channel_ping)
+ }
+
+ val serial0 = _serial
+ if (input_messages) {
+ val messages = Database_Progress.private_data.read_messages(db, _context, seen = _serial)
+ for ((message_serial, message) <- messages) {
+ if (base_progress.do_output(message)) base_progress.output(message)
+ _serial = _serial max message_serial
+ }
+ }
+ else {
+ _serial = _serial max Database_Progress.private_data.read_messages_serial(db, _context)
+ }
+
+ val res = body
+
+ if (_serial != serial0) Database_Progress.private_data.update_agent(db, _agent_uuid, _serial)
+
+ res
+ }
+ }
+
+ private def sync(): Unit = sync_database {}
+
+ override def output(message: Progress.Message): Unit = sync_context { _consumer.send(message) }
+ override def theory(theory: Progress.Theory): Unit = sync_context { _consumer.send(theory) }
+
+ override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit =
+ base_progress.nodes_status(nodes_status)
+
+ override def verbose: Boolean = base_progress.verbose
+
+ override def stop(): Unit = sync_context { base_progress.stop(); sync() }
+ override def stopped: Boolean = sync_context { base_progress.stopped }
+ override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db }
+
+ override def toString: String = super.toString + ": database " + db
+
+ init()
+ sync()
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/System/program_progress.scala Tue Mar 12 15:57:25 2024 +0100
@@ -0,0 +1,96 @@
+/* Title: Pure/System/program_progress.scala
+ Author: Makarius
+
+Structured program progress.
+*/
+
+package isabelle
+
+
+object Program_Progress {
+ class Program private[Program_Progress](heading: String, title: String) {
+ private val output_buffer = new StringBuffer(256) // synchronized
+
+ def output(message: Progress.Message): Unit = synchronized {
+ if (output_buffer.length() > 0) output_buffer.append('\n')
+ output_buffer.append(message.output_text)
+ }
+
+ val start_time: Time = Time.now()
+ private var stop_time: Option[Time] = None
+ def stop_now(): Unit = synchronized { stop_time = Some(Time.now()) }
+
+ def output(): (Command.Results, XML.Body) = synchronized {
+ val output_text = output_buffer.toString
+ val elapsed_time = stop_time.map(t => t - start_time)
+
+ val message_prefix = heading + " "
+ val message_suffix =
+ elapsed_time match {
+ case None => " ..."
+ case Some(t) => " ... (" + t.message + " elapsed time)"
+ }
+
+ val (results, message) =
+ if (output_text.isEmpty) {
+ (Command.Results.empty, XML.string(message_prefix + title + message_suffix))
+ }
+ else {
+ val i = Document_ID.make()
+ val results = Command.Results.make(List(i -> Protocol.writeln_message(output_text)))
+ val message =
+ XML.string(message_prefix) :::
+ List(XML.Elem(Markup(Markup.WRITELN, Markup.Serial(i)), XML.string(title))) :::
+ XML.string(message_suffix)
+ (results, message)
+ }
+
+ (results, List(XML.elem(Markup.TRACING_MESSAGE, message)))
+ }
+ }
+}
+
+abstract class Program_Progress(
+ default_heading: String = "Running",
+ default_title: String = "program",
+ override val verbose: Boolean = false
+) extends Progress {
+ private var _finished_programs: List[Program_Progress.Program] = Nil
+ private var _running_program: Option[Program_Progress.Program] = None
+
+ def output(): (Command.Results, XML.Body) = synchronized {
+ val programs = (_running_program.toList ::: _finished_programs).reverse
+ val programs_output = programs.map(_.output())
+ val results = Command.Results.merge(programs_output.map(_._1))
+ val body = Library.separate(Pretty.Separator, programs_output.map(_._2)).flatten
+ (results, body)
+ }
+
+ private def start_program(heading: String, title: String): Unit = synchronized {
+ _running_program = Some(new Program_Progress.Program(heading, title))
+ }
+
+ def stop_program(): Unit = synchronized {
+ _running_program match {
+ case Some(program) =>
+ program.stop_now()
+ _finished_programs ::= program
+ _running_program = None
+ case None =>
+ }
+ }
+
+ def detect_program(s: String): Option[String]
+
+ override def output(message: Progress.Message): Unit = synchronized {
+ val writeln_msg = if (message.kind == Progress.Kind.writeln) message.text else ""
+ detect_program(writeln_msg).map(Word.explode) match {
+ case Some(a :: bs) =>
+ stop_program()
+ start_program(a, Word.implode(bs))
+ case _ =>
+ if (_running_program.isEmpty) start_program(default_heading, default_title)
+ if (do_output(message)) _running_program.get.output(message)
+ }
+ }
+}
--- a/src/Pure/System/progress.scala Tue Mar 12 15:31:44 2024 +0100
+++ b/src/Pure/System/progress.scala Tue Mar 12 15:57:25 2024 +0100
@@ -10,8 +10,6 @@
import java.util.{Map => JMap}
import java.io.{File => JFile}
-import scala.collection.immutable.SortedMap
-
object Progress {
/* output */
@@ -49,141 +47,6 @@
def print_percentage: String =
percentage match { case None => "" case Some(p) => " " + p + "%" }
}
-
-
- /* SQL data model */
-
- object private_data extends SQL.Data("isabelle_progress") {
- val database: Path = Path.explode("$ISABELLE_HOME_USER/progress.db")
-
- override lazy val tables: SQL.Tables =
- SQL.Tables(Base.table, Agents.table, Messages.table)
-
- object Base {
- val context_uuid = SQL.Column.string("context_uuid").make_primary_key
- val context = SQL.Column.long("context").make_primary_key
- val stopped = SQL.Column.bool("stopped")
-
- val table = make_table(List(context_uuid, context, stopped))
- }
-
- object Agents {
- val agent_uuid = SQL.Column.string("agent_uuid").make_primary_key
- val context_uuid = SQL.Column.string("context_uuid").make_primary_key
- val kind = SQL.Column.string("kind")
- val hostname = SQL.Column.string("hostname")
- val java_pid = SQL.Column.long("java_pid")
- val java_start = SQL.Column.date("java_start")
- val start = SQL.Column.date("start")
- val stamp = SQL.Column.date("stamp")
- val stop = SQL.Column.date("stop")
- val seen = SQL.Column.long("seen")
-
- val table =
- make_table(
- List(agent_uuid, context_uuid, kind, hostname, java_pid, java_start, start, stamp, stop, seen),
- name = "agents")
- }
-
- object Messages {
- type T = SortedMap[Long, Message]
- val empty: T = SortedMap.empty
-
- val context = SQL.Column.long("context").make_primary_key
- val serial = SQL.Column.long("serial").make_primary_key
- val kind = SQL.Column.int("kind")
- val text = SQL.Column.string("text")
- val verbose = SQL.Column.bool("verbose")
-
- val table = make_table(List(context, serial, kind, text, verbose), name = "messages")
- }
-
- val channel: String = Base.table.name
- val channel_ping: SQL.Notification = SQL.Notification(channel, payload = "ping")
- val channel_output: SQL.Notification = SQL.Notification(channel, payload = "output")
-
- def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] =
- db.execute_query_statementO(
- Base.table.select(List(Base.context),
- sql = Base.context_uuid.where_equal(context_uuid)), _.long(Base.context))
-
- def next_progress_context(db: SQL.Database): Long =
- db.execute_query_statementO(
- Base.table.select(List(Base.context.max)), _.long(Base.context)).getOrElse(0L) + 1L
-
- def read_progress_stopped(db: SQL.Database, context: Long): Boolean =
- db.execute_query_statementO(
- Base.table.select(List(Base.stopped), sql = Base.context.where_equal(context)),
- _.bool(Base.stopped)
- ).getOrElse(false)
-
- def write_progress_stopped(db: SQL.Database, context: Long, stopped: Boolean): Unit =
- db.execute_statement(
- Base.table.update(List(Base.stopped), sql = Base.context.where_equal(context)),
- body = { stmt => stmt.bool(1) = stopped })
-
- def update_agent(
- db: SQL.Database,
- agent_uuid: String,
- seen: Long,
- stop_now: Boolean = false
- ): Unit = {
- val sql = Agents.agent_uuid.where_equal(agent_uuid)
-
- val stop =
- db.execute_query_statementO(
- Agents.table.select(List(Agents.stop), sql = sql), _.get_date(Agents.stop)).flatten
-
- db.execute_statement(
- Agents.table.update(List(Agents.stamp, Agents.stop, Agents.seen), sql = sql),
- body = { stmt =>
- val now = db.now()
- stmt.date(1) = now
- stmt.date(2) = if (stop_now) Some(now) else stop
- stmt.long(3) = seen
- })
- }
-
- def read_messages_serial(db: SQL.Database, context: Long): Long =
- db.execute_query_statementO(
- Messages.table.select(
- List(Messages.serial.max), sql = Base.context.where_equal(context)),
- _.long(Messages.serial)
- ).getOrElse(0L)
-
- def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T =
- db.execute_query_statement(
- Messages.table.select(
- List(Messages.serial, Messages.kind, Messages.text, Messages.verbose),
- sql =
- SQL.where_and(
- Messages.context.ident + " = " + context,
- if (seen <= 0) "" else Messages.serial.ident + " > " + seen)),
- SortedMap.from[Long, Message],
- { res =>
- val serial = res.long(Messages.serial)
- val kind = Kind.fromOrdinal(res.int(Messages.kind))
- val text = res.string(Messages.text)
- val verbose = res.bool(Messages.verbose)
- serial -> Message(kind, text, verbose = verbose)
- }
- )
-
- def write_messages(
- db: SQL.Database,
- context: Long,
- messages: List[(Long, Message)]
- ): Unit = {
- db.execute_batch_statement(Messages.table.insert(), batch =
- for ((serial, message) <- messages) yield { (stmt: SQL.Statement) =>
- stmt.long(1) = context
- stmt.long(2) = serial
- stmt.int(3) = message.kind.ordinal
- stmt.string(4) = message.text
- stmt.bool(5) = message.verbose
- })
- }
- }
}
class Progress {
@@ -268,274 +131,3 @@
override def toString: String = super.toString + ": " + path.toString
}
-
-
-/* database progress */
-
-class Database_Progress(
- db: SQL.Database,
- base_progress: Progress,
- input_messages: Boolean = false,
- output_stopped: Boolean = false,
- kind: String = "progress",
- hostname: String = Isabelle_System.hostname(),
- context_uuid: String = UUID.random_string(),
- timeout: Option[Time] = None,
- tick_expire: Int = 50)
-extends Progress {
- database_progress =>
-
- override def now(): Date = db.now()
- override val start: Date = now()
-
- if (UUID.unapply(context_uuid).isEmpty) {
- error("Bad Database_Progress.context_uuid: " + quote(context_uuid))
- }
-
- private var _tick: Long = 0
- private var _agent_uuid: String = ""
- private var _context: Long = -1
- private var _serial: Long = 0
- private var _stopped_db: Boolean = false
- private var _consumer: Consumer_Thread[Progress.Output] = null
-
- def agent_uuid: String = synchronized { _agent_uuid }
-
- private def init(): Unit = synchronized {
- db.listen(Progress.private_data.channel)
- Progress.private_data.transaction_lock(db, create = true, label = "Database_Progress.init") {
- Progress.private_data.read_progress_context(db, context_uuid) match {
- case Some(context) =>
- _context = context
- _agent_uuid = UUID.random_string()
- case None =>
- _context = Progress.private_data.next_progress_context(db)
- _agent_uuid = context_uuid
- db.execute_statement(Progress.private_data.Base.table.insert(), { stmt =>
- stmt.string(1) = context_uuid
- stmt.long(2) = _context
- stmt.bool(3) = false
- })
- }
- db.execute_statement(Progress.private_data.Agents.table.insert(), { stmt =>
- val java = ProcessHandle.current()
- val java_pid = java.pid
- val java_start = Date.instant(java.info.startInstant.get)
-
- stmt.string(1) = _agent_uuid
- stmt.string(2) = context_uuid
- stmt.string(3) = kind
- stmt.string(4) = hostname
- stmt.long(5) = java_pid
- stmt.date(6) = java_start
- stmt.date(7) = start
- stmt.date(8) = start
- stmt.date(9) = None
- stmt.long(10) = 0L
- })
- }
- if (context_uuid == _agent_uuid) db.vacuum(Progress.private_data.tables.list)
-
- def consume(bulk_output: List[Progress.Output]): List[Exn.Result[Unit]] = {
- val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
- val received = db.receive(n => n.channel == Progress.private_data.channel)
- val ok =
- bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
- received.isEmpty ||
- received.get.contains(Progress.private_data.channel_ping) ||
- input_messages && received.get.contains(Progress.private_data.channel_output)
- if (ok) {
- sync_database {
- if (bulk_output.nonEmpty) {
- for (out <- bulk_output) {
- out match {
- case message: Progress.Message =>
- if (do_output(message)) base_progress.output(message)
- case theory: Progress.Theory => base_progress.theory(theory)
- }
- }
-
- val messages =
- for ((out, i) <- bulk_output.zipWithIndex)
- yield (_serial + i + 1) -> out.message
-
- Progress.private_data.write_messages(db, _context, messages)
- _serial = messages.last._1
-
- db.send(Progress.private_data.channel_output)
- }
- bulk_output.map(_ => Exn.Res(()))
- }
- }
- else Nil
- }
-
- _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
- bulk = _ => true, timeout = timeout,
- consume = { bulk_output =>
- val results =
- if (bulk_output.isEmpty) consume(Nil)
- else bulk_output.grouped(200).toList.flatMap(consume)
- (results, true) })
- }
-
- def close(): Unit = synchronized {
- if (_context > 0) {
- _consumer.shutdown()
- _consumer = null
-
- Progress.private_data.transaction_lock(db, label = "Database_Progress.exit") {
- Progress.private_data.update_agent(db, _agent_uuid, _serial, stop_now = true)
- }
- _context = 0
- }
- db.close()
- }
-
- private def sync_context[A](body: => A): A = synchronized {
- if (_context < 0) throw new IllegalStateException("Database_Progress before init")
- if (_context == 0) throw new IllegalStateException("Database_Progress after exit")
-
- body
- }
-
- private def sync_database[A](body: => A): A = synchronized {
- Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") {
- _stopped_db = Progress.private_data.read_progress_stopped(db, _context)
-
- if (_stopped_db && !base_progress.stopped) base_progress.stop()
- if (!_stopped_db && base_progress.stopped && output_stopped) {
- Progress.private_data.write_progress_stopped(db, _context, true)
- db.send(Progress.private_data.channel_ping)
- }
-
- val serial0 = _serial
- if (input_messages) {
- val messages = Progress.private_data.read_messages(db, _context, seen = _serial)
- for ((message_serial, message) <- messages) {
- if (base_progress.do_output(message)) base_progress.output(message)
- _serial = _serial max message_serial
- }
- }
- else {
- _serial = _serial max Progress.private_data.read_messages_serial(db, _context)
- }
-
- val res = body
-
- if (_serial != serial0) Progress.private_data.update_agent(db, _agent_uuid, _serial)
-
- res
- }
- }
-
- private def sync(): Unit = sync_database {}
-
- override def output(message: Progress.Message): Unit = sync_context { _consumer.send(message) }
- override def theory(theory: Progress.Theory): Unit = sync_context { _consumer.send(theory) }
-
- override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit =
- base_progress.nodes_status(nodes_status)
-
- override def verbose: Boolean = base_progress.verbose
-
- override def stop(): Unit = sync_context { base_progress.stop(); sync() }
- override def stopped: Boolean = sync_context { base_progress.stopped }
- override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db }
-
- override def toString: String = super.toString + ": database " + db
-
- init()
- sync()
-}
-
-
-/* structured program progress */
-
-object Program_Progress {
- class Program private[Program_Progress](heading: String, title: String) {
- private val output_buffer = new StringBuffer(256) // synchronized
-
- def output(message: Progress.Message): Unit = synchronized {
- if (output_buffer.length() > 0) output_buffer.append('\n')
- output_buffer.append(message.output_text)
- }
-
- val start_time: Time = Time.now()
- private var stop_time: Option[Time] = None
- def stop_now(): Unit = synchronized { stop_time = Some(Time.now()) }
-
- def output(): (Command.Results, XML.Body) = synchronized {
- val output_text = output_buffer.toString
- val elapsed_time = stop_time.map(t => t - start_time)
-
- val message_prefix = heading + " "
- val message_suffix =
- elapsed_time match {
- case None => " ..."
- case Some(t) => " ... (" + t.message + " elapsed time)"
- }
-
- val (results, message) =
- if (output_text.isEmpty) {
- (Command.Results.empty, XML.string(message_prefix + title + message_suffix))
- }
- else {
- val i = Document_ID.make()
- val results = Command.Results.make(List(i -> Protocol.writeln_message(output_text)))
- val message =
- XML.string(message_prefix) :::
- List(XML.Elem(Markup(Markup.WRITELN, Markup.Serial(i)), XML.string(title))) :::
- XML.string(message_suffix)
- (results, message)
- }
-
- (results, List(XML.elem(Markup.TRACING_MESSAGE, message)))
- }
- }
-}
-
-abstract class Program_Progress(
- default_heading: String = "Running",
- default_title: String = "program",
- override val verbose: Boolean = false
-) extends Progress {
- private var _finished_programs: List[Program_Progress.Program] = Nil
- private var _running_program: Option[Program_Progress.Program] = None
-
- def output(): (Command.Results, XML.Body) = synchronized {
- val programs = (_running_program.toList ::: _finished_programs).reverse
- val programs_output = programs.map(_.output())
- val results = Command.Results.merge(programs_output.map(_._1))
- val body = Library.separate(Pretty.Separator, programs_output.map(_._2)).flatten
- (results, body)
- }
-
- private def start_program(heading: String, title: String): Unit = synchronized {
- _running_program = Some(new Program_Progress.Program(heading, title))
- }
-
- def stop_program(): Unit = synchronized {
- _running_program match {
- case Some(program) =>
- program.stop_now()
- _finished_programs ::= program
- _running_program = None
- case None =>
- }
- }
-
- def detect_program(s: String): Option[String]
-
- override def output(message: Progress.Message): Unit = synchronized {
- val writeln_msg = if (message.kind == Progress.Kind.writeln) message.text else ""
- detect_program(writeln_msg).map(Word.explode) match {
- case Some(a :: bs) =>
- stop_program()
- start_program(a, Word.implode(bs))
- case _ =>
- if (_running_program.isEmpty) start_program(default_heading, default_title)
- if (do_output(message)) _running_program.get.output(message)
- }
- }
-}