# HG changeset patch # User wenzelm # Date 1686749850 -7200 # Node ID 54d6b2f75806698c63fe297dcb5fbfa51b24ba9e # Parent 8a7df40375ae77684e193c14c7b7f59396e26e92 support for Database_Progress; diff -r 8a7df40375ae -r 54d6b2f75806 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Wed Jun 14 12:10:40 2023 +0200 +++ b/src/Pure/System/progress.scala Wed Jun 14 15:37:30 2023 +0200 @@ -10,8 +10,12 @@ import java.util.{Map => JMap} import java.io.{File => JFile} +import scala.collection.immutable.SortedMap + object Progress { + /* messages */ + object Kind extends Enumeration { val writeln, warning, error_message = Value } sealed case class Message(kind: Kind.Value, text: String, verbose: Boolean = false) { def output_text: String = @@ -33,6 +37,129 @@ def print_percentage: String = percentage match { case None => "" case Some(p) => " " + p + "%" } } + + + /* SQL data model */ + + object Data { + def make_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table = + SQL.Table("isabelle_progress" + if_proper(name, "_" + name), columns, body = body) + + object Base { + val context_uuid = SQL.Column.string("context_uuid").make_primary_key + val context = SQL.Column.long("context").make_primary_key + val stopped = SQL.Column.bool("stopped") + + val table = make_table("", List(context_uuid, context, stopped)) + } + + object Agents { + val agent_uuid = SQL.Column.string("agent_uuid").make_primary_key + val context_uuid = SQL.Column.string("context_uuid").make_primary_key + val hostname = SQL.Column.string("hostname") + val java_pid = SQL.Column.long("java_pid") + val java_start = SQL.Column.date("java_start") + val start = SQL.Column.date("start") + val stamp = SQL.Column.date("stamp") + val stop = SQL.Column.date("stop") + val seen = SQL.Column.long("seen") + + val table = make_table("agents", + List(agent_uuid, context_uuid, hostname, java_pid, java_start, start, stamp, stop, seen)) + } + + object Messages { + type T = SortedMap[Long, Message] + val empty: T = SortedMap.empty + + val context = SQL.Column.long("context").make_primary_key + val serial = SQL.Column.long("serial").make_primary_key + val kind = SQL.Column.int("kind") + val text = SQL.Column.string("text") + val verbose = SQL.Column.bool("verbose") + + val table = make_table("messages", List(context, serial, kind, text, verbose)) + } + + val all_tables: SQL.Tables = SQL.Tables(Base.table, Agents.table, Messages.table) + + def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] = + db.execute_query_statementO( + Base.table.select(List(Base.context), + sql = Base.context_uuid.where_equal(context_uuid)), _.long(Base.context)) + + def next_progress_context(db: SQL.Database): Long = + db.execute_query_statementO( + Base.table.select(List(Base.context.max)), _.long(Base.context)).getOrElse(0L) + 1L + + def read_progress_stopped(db: SQL.Database, context: Long): Boolean = + db.execute_query_statementO( + Base.table.select(List(Base.stopped), sql = Base.context.where_equal(context)), + _.bool(Base.stopped) + ).getOrElse(false) + + def write_progress_stopped(db: SQL.Database, context: Long, stopped: Boolean): Unit = + db.execute_statement( + Base.table.update(List(Base.stopped), sql = Base.context.where_equal(context)), + body = { stmt => stmt.bool(1) = stopped }) + + def update_agent( + db: SQL.Database, + agent_uuid: String, + seen: Long, + stop: Boolean = false + ): Unit = { + val sql = + Agents.table.update(List(Agents.stamp, Agents.stop, Agents.seen), + sql = Agents.agent_uuid.where_equal(agent_uuid)) + db.execute_statement(sql, body = { stmt => + val now = db.now() + stmt.date(1) = now + stmt.date(2) = if (stop) Some(now) else None + stmt.long(3) = seen + }) + } + + def next_messages_serial(db: SQL.Database, context: Long): Long = + db.execute_query_statementO( + Messages.table.select( + List(Messages.serial.max), sql = Base.context.where_equal(context)), + _.long(Messages.serial) + ).getOrElse(0L) + 1L + + def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T = + db.execute_query_statement( + Messages.table.select( + List(Messages.serial, Messages.kind, Messages.text, Messages.verbose), + sql = + SQL.where_and( + Messages.context.ident + " = " + context, + if (seen <= 0) "" else Messages.serial.ident + " > " + seen)), + SortedMap.from[Long, Message], + { res => + val serial = res.long(Messages.serial) + val kind = Kind(res.int(Messages.kind)) + val text = res.string(Messages.text) + val verbose = res.bool(Messages.verbose) + serial -> Message(kind, text, verbose = verbose) + } + ) + + def write_messages( + db: SQL.Database, + context: Long, + serial: Long, + message: Message + ): Unit = { + db.execute_statement(Messages.table.insert(), body = { stmt => + stmt.long(1) = context + stmt.long(2) = serial + stmt.int(3) = message.kind.id + stmt.string(4) = message.text + stmt.bool(5) = message.verbose + }) + } + } } class Progress { @@ -107,6 +234,123 @@ } +/* database progress */ + +class Database_Progress( + db: SQL.Database, + base_progress: Progress, + val hostname: String = Isabelle_System.hostname(), + val context_uuid: String = UUID.random().toString) +extends Progress { + database_progress => + + private var _agent_uuid: String = "" + private var _context: Long = 0 + private var _seen: Long = 0 + + def agent_uuid: String = synchronized { _agent_uuid } + + private def transaction_lock[A](body: => A, create: Boolean = false): A = + db.transaction_lock(Progress.Data.all_tables, create = create)(body) + + private def init(): Unit = synchronized { + transaction_lock(create = true, body = { + Progress.Data.read_progress_context(db, context_uuid) match { + case Some(context) => + _context = context + _agent_uuid = UUID.random().toString + case None => + _context = Progress.Data.next_progress_context(db) + _agent_uuid = context_uuid + db.execute_statement(Progress.Data.Base.table.insert(), { stmt => + stmt.string(1) = context_uuid + stmt.long(2) = _context + stmt.bool(3) = false + }) + } + db.execute_statement(Progress.Data.Agents.table.insert(), { stmt => + val java = ProcessHandle.current() + val java_pid = java.pid + val java_start = Date.instant(java.info.startInstant.get) + val now = db.now() + + stmt.string(1) = _agent_uuid + stmt.string(2) = context_uuid + stmt.string(3) = hostname + stmt.long(4) = java_pid + stmt.date(5) = java_start + stmt.date(6) = now + stmt.date(7) = now + stmt.date(8) = None + stmt.long(9) = 0L + }) + }) + if (context_uuid == _agent_uuid) db.vacuum(Progress.Data.all_tables) + } + + def exit(): Unit = synchronized { + if (_context > 0) { + transaction_lock { + Progress.Data.update_agent(db, _agent_uuid, _seen, stop = true) + } + _context = 0 + } + } + + private def sync_database[A](body: => A): A = synchronized { + require(_context > 0) + transaction_lock { + val stopped_db = Progress.Data.read_progress_stopped(db, _context) + val stopped = base_progress.stopped + + if (stopped_db && !stopped) base_progress.stop() + if (stopped && !stopped_db) Progress.Data.write_progress_stopped(db, _context, true) + + val messages = Progress.Data.read_messages(db, _context, seen = _seen) + for ((seen, message) <- messages) { + if (base_progress.do_output(message)) base_progress.output(message) + _seen = _seen max seen + } + if (messages.nonEmpty) Progress.Data.update_agent(db, _agent_uuid, _seen) + + body + } + } + + def sync(): Unit = sync_database {} + + private def output_database(message: Progress.Message, body: => Unit): Unit = + sync_database { + val serial = Progress.Data.next_messages_serial(db, _context) + Progress.Data.write_messages(db, _context, serial, message) + + body + + _seen = _seen max serial + Progress.Data.update_agent(db, _agent_uuid, _seen) + } + + override def output(message: Progress.Message): Unit = + output_database(message, if (do_output(message)) base_progress.output(message)) + + override def theory(theory: Progress.Theory): Unit = + output_database(theory.message, base_progress.theory(theory)) + + override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit = + base_progress.nodes_status(nodes_status) + + override def verbose: Boolean = base_progress.verbose + + override def stop(): Unit = synchronized { base_progress.stop(); sync() } + override def stopped: Boolean = sync_database { base_progress.stopped } + + override def toString: String = super.toString + ": database " + db + + init() + sync() +} + + /* structured program progress */ object Program_Progress {