prefer Database_Progress, which is more robust (amending afb1a19307c4);
authorwenzelm
Wed, 14 Jun 2023 16:27:44 +0200
changeset 78156 da5cc332ded3
parent 78155 54d6b2f75806
child 78157 403e4d9a3768
prefer Database_Progress, which is more robust (amending afb1a19307c4);
src/Pure/System/progress.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/System/progress.scala	Wed Jun 14 15:37:30 2023 +0200
+++ b/src/Pure/System/progress.scala	Wed Jun 14 16:27:44 2023 +0200
@@ -237,8 +237,8 @@
 /* database progress */
 
 class Database_Progress(
-  db: SQL.Database,
-  base_progress: Progress,
+  val db: SQL.Database,
+  val base_progress: Progress,
   val hostname: String = Isabelle_System.hostname(),
   val context_uuid: String = UUID.random().toString)
 extends Progress {
--- a/src/Pure/Tools/build_process.scala	Wed Jun 14 15:37:30 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Wed Jun 14 16:27:44 2023 +0200
@@ -143,16 +143,12 @@
 
   /** dynamic state **/
 
-  type Progress_Messages = SortedMap[Long, Progress.Message]
-  val progress_messages_empty: Progress_Messages = SortedMap.empty
-
   case class Build(
     build_uuid: String,
     ml_platform: String,
     options: String,
     start: Date,
-    stop: Option[Date],
-    progress_stopped: Boolean
+    stop: Option[Date]
   )
 
   case class Worker(
@@ -201,7 +197,6 @@
   }
 
   sealed case class Snapshot(
-    progress_messages: Progress_Messages,
     builds: List[Build],        // available build configurations
     workers: List[Worker],      // available worker processes
     sessions: State.Sessions,   // static build targets
@@ -223,7 +218,6 @@
 
   sealed case class State(
     serial: Long = 0,
-    progress_seen: Long = 0,
     numa_next: Int = 0,
     sessions: State.Sessions = Map.empty,
     pending: State.Pending = Nil,
@@ -237,10 +231,6 @@
       copy(serial = i)
     }
 
-    def progress_serial(message_serial: Long = serial): State =
-      if (message_serial > progress_seen) copy(progress_seen = message_serial)
-      else error("Bad serial " + message_serial + " for progress output (already seen)")
-
     def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) =
       if (numa_nodes.isEmpty) (None, this)
       else {
@@ -360,10 +350,8 @@
       val options = SQL.Column.string("options")
       val start = SQL.Column.date("start")
       val stop = SQL.Column.date("stop")
-      val progress_stopped = SQL.Column.bool("progress_stopped")
 
-      val table =
-        make_table("", List(build_uuid, ml_platform, options, start, stop, progress_stopped))
+      val table = make_table("", List(build_uuid, ml_platform, options, start, stop))
     }
 
     def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] =
@@ -376,16 +364,14 @@
           val options = res.string(Base.options)
           val start = res.date(Base.start)
           val stop = res.get_date(Base.stop)
-          val progress_stopped = res.bool(Base.progress_stopped)
-          Build(build_uuid, ml_platform, options, start, stop, progress_stopped)
+          Build(build_uuid, ml_platform, options, start, stop)
         })
 
     def start_build(
       db: SQL.Database,
       build_uuid: String,
       ml_platform: String,
-      options: String,
-      progress_stopped: Boolean
+      options: String
     ): Unit = {
       db.execute_statement(Base.table.insert(), body =
         { stmt =>
@@ -394,7 +380,6 @@
           stmt.string(3) = options
           stmt.date(4) = db.now()
           stmt.date(5) = None
-          stmt.bool(6) = progress_stopped
         })
     }
 
@@ -482,83 +467,6 @@
     }
 
 
-    /* progress */
-
-    object Progress {
-      val serial = SQL.Column.long("serial").make_primary_key
-      val kind = SQL.Column.int("kind")
-      val text = SQL.Column.string("text")
-      val verbose = SQL.Column.bool("verbose")
-      val build_uuid = Generic.build_uuid
-
-      val table = make_table("progress", List(serial, kind, text, verbose, build_uuid))
-    }
-
-    def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages =
-      db.execute_query_statement(
-        Progress.table.select(
-          sql =
-            SQL.where_and(
-              if (seen <= 0) "" else Progress.serial.ident + " > " + seen,
-              Generic.sql(build_uuid = build_uuid))),
-        SortedMap.from[Long, isabelle.Progress.Message],
-        { res =>
-          val serial = res.long(Progress.serial)
-          val kind = isabelle.Progress.Kind(res.int(Progress.kind))
-          val text = res.string(Progress.text)
-          val verbose = res.bool(Progress.verbose)
-          serial -> isabelle.Progress.Message(kind, text, verbose = verbose)
-        }
-      )
-
-    def write_progress(
-      db: SQL.Database,
-      message_serial: Long,
-      message: isabelle.Progress.Message,
-      build_uuid: String
-    ): Unit = {
-      db.execute_statement(Progress.table.insert(), body =
-        { stmt =>
-          stmt.long(1) = message_serial
-          stmt.int(2) = message.kind.id
-          stmt.string(3) = message.text
-          stmt.bool(4) = message.verbose
-          stmt.string(5) = build_uuid
-        })
-    }
-
-    def sync_progress(
-      db: SQL.Database,
-      seen: Long,
-      build_uuid: String,
-      build_progress: Progress
-    ): (Progress_Messages, Boolean) = {
-      require(build_uuid.nonEmpty)
-
-      val messages = read_progress(db, seen = seen, build_uuid = build_uuid)
-
-      val stopped_db =
-        db.execute_query_statementO[Boolean](
-          Base.table.select(List(Base.progress_stopped),
-            sql = SQL.where(Base.build_uuid.equal(build_uuid))),
-          res => res.bool(Base.progress_stopped)
-        ).getOrElse(false)
-
-      def stop_db(): Unit =
-        db.execute_statement(
-          Base.table.update(
-            List(Base.progress_stopped), sql = Base.build_uuid.where_equal(build_uuid)),
-          body = { stmt => stmt.bool(1) = true })
-
-      val stopped = build_progress.stopped
-
-      if (stopped_db && !stopped) build_progress.stop()
-      if (stopped && !stopped_db) stop_db()
-
-      (messages, messages.isEmpty && stopped_db == stopped)
-    }
-
-
     /* workers */
 
     object Workers {
@@ -854,7 +762,6 @@
       SQL.Tables(
         Base.table,
         Workers.table,
-        Progress.table,
         Sessions.table,
         Pending.table,
         Running.table,
@@ -928,90 +835,54 @@
   protected final val build_deps: Sessions.Deps = build_context.build_deps
   protected final val hostname: String = build_context.hostname
   protected final val build_uuid: String = build_context.build_uuid
-  protected final val worker_uuid: String = UUID.random().toString
+
+
+  /* progress backed by database */
+
+  private val _database: Option[SQL.Database] = store.open_build_database()
 
-  override def toString: String =
-    "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) +
-      if_proper(build_context.master, ", master = true") + ")"
+  protected val (progress, worker_uuid) = synchronized {
+    _database match {
+      case None => (build_progress, UUID.random().toString)
+      case Some(db) =>
+        val progress_db =
+          if (db.is_postgresql) store.open_database_server()
+          else db
+        val progress = new Database_Progress(progress_db, build_progress, context_uuid = build_uuid)
+        (progress, progress.agent_uuid)
+    }
+  }
 
+  protected val log: Logger = Logger.make_system_log(progress, build_options)
+
+  def close(): Unit = synchronized {
+    _database.foreach(_.close())
+    progress match {
+      case db_progress: Database_Progress =>
+        db_progress.exit()
+        db_progress.db.close()
+    }
+  }
 
   /* global state: internal var vs. external database */
 
   private var _state: Build_Process.State = Build_Process.State()
 
-  private val _database: Option[SQL.Database] = store.open_build_database()
-
-  def close(): Unit = synchronized { _database.foreach(_.close()) }
-
-  protected def synchronized_database[A](body: => A): A =
-    synchronized {
-      _database match {
-        case None => body
-        case Some(db) =>
-          def pull_database(): Unit = {
-            _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state)
-          }
-
-          def sync_database(): Unit = {
-            _state =
-              Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state)
-          }
-
-          def attempt(): Either[A, Build_Process.Progress_Messages] = {
-            val (messages, sync) =
-              Build_Process.Data.sync_progress(
-                db, _state.progress_seen, build_uuid, build_progress)
-            if (sync) Left { pull_database(); val res = body; sync_database(); res }
-            else Right(messages)
-          }
-
-          @tailrec def attempts(): A = {
-            db.transaction_lock(Build_Process.Data.all_tables) { attempt() } match {
-              case Left(res) => res
-              case Right(messages) =>
-                for ((message_serial, message) <- messages) {
-                  _state = _state.progress_serial(message_serial = message_serial)
-                  if (build_progress.do_output(message)) build_progress.output(message)
-                }
-                attempts()
-            }
-          }
-          attempts()
-      }
-    }
-
-
-  /* progress backed by database */
-
-  private def progress_output(message: Progress.Message, build_progress_output: => Unit): Unit = {
-    synchronized_database {
-      _state = _state.inc_serial.progress_serial()
-      for (db <- _database) {
-        Build_Process.Data.write_progress(db, _state.serial, message, build_uuid)
-        Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial)
-      }
-      build_progress_output
+  protected def synchronized_database[A](body: => A): A = synchronized {
+    _database match {
+      case None => body
+      case Some(db) =>
+        db.transaction_lock(Build_Process.Data.all_tables) {
+          progress.asInstanceOf[Database_Progress].sync()
+          _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state)
+          val res = body
+          _state =
+            Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state)
+          res
+        }
     }
   }
 
-  protected object progress extends Progress {
-    override def verbose: Boolean = build_progress.verbose
-
-    override def output(message: Progress.Message): Unit =
-      progress_output(message, if (do_output(message)) build_progress.output(message))
-
-    override def theory(theory: Progress.Theory): Unit =
-      progress_output(theory.message, build_progress.theory(theory))
-
-    override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit =
-      build_progress.nodes_status(nodes_status)
-
-    override def stop(): Unit = build_progress.stop()
-    override def stopped: Boolean = build_progress.stopped
-  }
-
-  protected val log: Logger = Logger.make_system_log(progress, build_options)
-
 
   /* policy operations */
 
@@ -1110,7 +981,7 @@
   protected final def start_build(): Unit = synchronized_database {
     for (db <- _database) {
       Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
-        build_context.sessions_structure.session_prefs, progress.stopped)
+        build_context.sessions_structure.session_prefs)
     }
   }
 
@@ -1207,16 +1078,14 @@
   /* snapshot */
 
   def snapshot(): Build_Process.Snapshot = synchronized_database {
-    val (progress_messages, builds, workers) =
+    val (builds, workers) =
       _database match {
-        case None => (Build_Process.progress_messages_empty, Nil, Nil)
+        case None => (Nil, Nil)
         case Some(db) =>
-          (Build_Process.Data.read_progress(db),
-           Build_Process.Data.read_builds(db),
+          (Build_Process.Data.read_builds(db),
            Build_Process.Data.read_workers(db))
       }
     Build_Process.Snapshot(
-      progress_messages = progress_messages,
       builds = builds,
       workers = workers,
       sessions = _state.sessions,
@@ -1224,4 +1093,11 @@
       running = _state.running,
       results = _state.results)
   }
+
+
+  /* toString */
+
+  override def toString: String =
+    "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) +
+      if_proper(build_context.master, ", master = true") + ")"
 }