support for Database_Progress;
authorwenzelm
Wed, 14 Jun 2023 15:37:30 +0200
changeset 78155 54d6b2f75806
parent 78154 8a7df40375ae
child 78156 da5cc332ded3
support for Database_Progress;
src/Pure/System/progress.scala
--- a/src/Pure/System/progress.scala	Wed Jun 14 12:10:40 2023 +0200
+++ b/src/Pure/System/progress.scala	Wed Jun 14 15:37:30 2023 +0200
@@ -10,8 +10,12 @@
 import java.util.{Map => JMap}
 import java.io.{File => JFile}
 
+import scala.collection.immutable.SortedMap
+
 
 object Progress {
+  /* messages */
+
   object Kind extends Enumeration { val writeln, warning, error_message = Value }
   sealed case class Message(kind: Kind.Value, text: String, verbose: Boolean = false) {
     def output_text: String =
@@ -33,6 +37,129 @@
     def print_percentage: String =
       percentage match { case None => "" case Some(p) => " " + p + "%" }
   }
+
+
+  /* SQL data model */
+
+  object Data {
+    def make_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table =
+      SQL.Table("isabelle_progress" + if_proper(name, "_" + name), columns, body = body)
+
+    object Base {
+      val context_uuid = SQL.Column.string("context_uuid").make_primary_key
+      val context = SQL.Column.long("context").make_primary_key
+      val stopped = SQL.Column.bool("stopped")
+
+      val table = make_table("", List(context_uuid, context, stopped))
+    }
+
+    object Agents {
+      val agent_uuid = SQL.Column.string("agent_uuid").make_primary_key
+      val context_uuid = SQL.Column.string("context_uuid").make_primary_key
+      val hostname = SQL.Column.string("hostname")
+      val java_pid = SQL.Column.long("java_pid")
+      val java_start = SQL.Column.date("java_start")
+      val start = SQL.Column.date("start")
+      val stamp = SQL.Column.date("stamp")
+      val stop = SQL.Column.date("stop")
+      val seen = SQL.Column.long("seen")
+
+      val table = make_table("agents",
+        List(agent_uuid, context_uuid, hostname, java_pid, java_start, start, stamp, stop, seen))
+    }
+
+    object Messages {
+      type T = SortedMap[Long, Message]
+      val empty: T = SortedMap.empty
+
+      val context = SQL.Column.long("context").make_primary_key
+      val serial = SQL.Column.long("serial").make_primary_key
+      val kind = SQL.Column.int("kind")
+      val text = SQL.Column.string("text")
+      val verbose = SQL.Column.bool("verbose")
+
+      val table = make_table("messages", List(context, serial, kind, text, verbose))
+    }
+
+    val all_tables: SQL.Tables = SQL.Tables(Base.table, Agents.table, Messages.table)
+
+    def read_progress_context(db: SQL.Database, context_uuid: String): Option[Long] =
+      db.execute_query_statementO(
+        Base.table.select(List(Base.context),
+          sql = Base.context_uuid.where_equal(context_uuid)), _.long(Base.context))
+
+    def next_progress_context(db: SQL.Database): Long =
+      db.execute_query_statementO(
+        Base.table.select(List(Base.context.max)), _.long(Base.context)).getOrElse(0L) + 1L
+
+    def read_progress_stopped(db: SQL.Database, context: Long): Boolean =
+      db.execute_query_statementO(
+        Base.table.select(List(Base.stopped), sql = Base.context.where_equal(context)),
+        _.bool(Base.stopped)
+      ).getOrElse(false)
+
+    def write_progress_stopped(db: SQL.Database, context: Long, stopped: Boolean): Unit =
+      db.execute_statement(
+        Base.table.update(List(Base.stopped), sql = Base.context.where_equal(context)),
+          body = { stmt => stmt.bool(1) = stopped })
+
+    def update_agent(
+      db: SQL.Database,
+      agent_uuid: String,
+      seen: Long,
+      stop: Boolean = false
+    ): Unit = {
+      val sql =
+        Agents.table.update(List(Agents.stamp, Agents.stop, Agents.seen),
+          sql = Agents.agent_uuid.where_equal(agent_uuid))
+      db.execute_statement(sql, body = { stmt =>
+        val now = db.now()
+        stmt.date(1) = now
+        stmt.date(2) = if (stop) Some(now) else None
+        stmt.long(3) = seen
+      })
+    }
+
+    def next_messages_serial(db: SQL.Database, context: Long): Long =
+      db.execute_query_statementO(
+        Messages.table.select(
+          List(Messages.serial.max), sql = Base.context.where_equal(context)),
+        _.long(Messages.serial)
+      ).getOrElse(0L) + 1L
+
+    def read_messages(db: SQL.Database, context: Long, seen: Long = 0): Messages.T =
+      db.execute_query_statement(
+        Messages.table.select(
+          List(Messages.serial, Messages.kind, Messages.text, Messages.verbose),
+          sql =
+            SQL.where_and(
+              Messages.context.ident + " = " + context,
+              if (seen <= 0) "" else Messages.serial.ident + " > " + seen)),
+        SortedMap.from[Long, Message],
+        { res =>
+          val serial = res.long(Messages.serial)
+          val kind = Kind(res.int(Messages.kind))
+          val text = res.string(Messages.text)
+          val verbose = res.bool(Messages.verbose)
+          serial -> Message(kind, text, verbose = verbose)
+        }
+      )
+
+    def write_messages(
+      db: SQL.Database,
+      context: Long,
+      serial: Long,
+      message: Message
+    ): Unit = {
+      db.execute_statement(Messages.table.insert(), body = { stmt =>
+        stmt.long(1) = context
+        stmt.long(2) = serial
+        stmt.int(3) = message.kind.id
+        stmt.string(4) = message.text
+        stmt.bool(5) = message.verbose
+      })
+    }
+  }
 }
 
 class Progress {
@@ -107,6 +234,123 @@
 }
 
 
+/* database progress */
+
+class Database_Progress(
+  db: SQL.Database,
+  base_progress: Progress,
+  val hostname: String = Isabelle_System.hostname(),
+  val context_uuid: String = UUID.random().toString)
+extends Progress {
+  database_progress =>
+
+  private var _agent_uuid: String = ""
+  private var _context: Long = 0
+  private var _seen: Long = 0
+
+  def agent_uuid: String = synchronized { _agent_uuid }
+
+  private def transaction_lock[A](body: => A, create: Boolean = false): A =
+    db.transaction_lock(Progress.Data.all_tables, create = create)(body)
+
+  private def init(): Unit = synchronized {
+    transaction_lock(create = true, body = {
+      Progress.Data.read_progress_context(db, context_uuid) match {
+        case Some(context) =>
+          _context = context
+          _agent_uuid = UUID.random().toString
+        case None =>
+          _context = Progress.Data.next_progress_context(db)
+          _agent_uuid = context_uuid
+          db.execute_statement(Progress.Data.Base.table.insert(), { stmt =>
+            stmt.string(1) = context_uuid
+            stmt.long(2) = _context
+            stmt.bool(3) = false
+          })
+      }
+      db.execute_statement(Progress.Data.Agents.table.insert(), { stmt =>
+        val java = ProcessHandle.current()
+        val java_pid = java.pid
+        val java_start = Date.instant(java.info.startInstant.get)
+        val now = db.now()
+
+        stmt.string(1) = _agent_uuid
+        stmt.string(2) = context_uuid
+        stmt.string(3) = hostname
+        stmt.long(4) = java_pid
+        stmt.date(5) = java_start
+        stmt.date(6) = now
+        stmt.date(7) = now
+        stmt.date(8) = None
+        stmt.long(9) = 0L
+      })
+    })
+    if (context_uuid == _agent_uuid) db.vacuum(Progress.Data.all_tables)
+  }
+
+  def exit(): Unit = synchronized {
+    if (_context > 0) {
+      transaction_lock {
+        Progress.Data.update_agent(db, _agent_uuid, _seen, stop = true)
+      }
+      _context = 0
+    }
+  }
+
+  private def sync_database[A](body: => A): A = synchronized {
+    require(_context > 0)
+    transaction_lock {
+      val stopped_db = Progress.Data.read_progress_stopped(db, _context)
+      val stopped = base_progress.stopped
+
+      if (stopped_db && !stopped) base_progress.stop()
+      if (stopped && !stopped_db) Progress.Data.write_progress_stopped(db, _context, true)
+
+      val messages = Progress.Data.read_messages(db, _context, seen = _seen)
+      for ((seen, message) <- messages) {
+        if (base_progress.do_output(message)) base_progress.output(message)
+        _seen = _seen max seen
+      }
+      if (messages.nonEmpty) Progress.Data.update_agent(db, _agent_uuid, _seen)
+
+      body
+    }
+  }
+
+  def sync(): Unit = sync_database {}
+
+  private def output_database(message: Progress.Message, body: => Unit): Unit =
+    sync_database {
+      val serial = Progress.Data.next_messages_serial(db, _context)
+      Progress.Data.write_messages(db, _context, serial, message)
+
+      body
+
+      _seen = _seen max serial
+      Progress.Data.update_agent(db, _agent_uuid, _seen)
+    }
+
+  override def output(message: Progress.Message): Unit =
+    output_database(message, if (do_output(message)) base_progress.output(message))
+
+  override def theory(theory: Progress.Theory): Unit =
+    output_database(theory.message, base_progress.theory(theory))
+
+  override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit =
+    base_progress.nodes_status(nodes_status)
+
+  override def verbose: Boolean = base_progress.verbose
+
+  override def stop(): Unit = synchronized { base_progress.stop(); sync() }
+  override def stopped: Boolean = sync_database { base_progress.stopped }
+
+  override def toString: String = super.toString + ": database " + db
+
+  init()
+  sync()
+}
+
+
 /* structured program progress */
 
 object Program_Progress {