--- a/src/Pure/Tools/build_process.scala Wed Jun 14 15:37:30 2023 +0200
+++ b/src/Pure/Tools/build_process.scala Wed Jun 14 16:27:44 2023 +0200
@@ -143,16 +143,12 @@
/** dynamic state **/
- type Progress_Messages = SortedMap[Long, Progress.Message]
- val progress_messages_empty: Progress_Messages = SortedMap.empty
-
case class Build(
build_uuid: String,
ml_platform: String,
options: String,
start: Date,
- stop: Option[Date],
- progress_stopped: Boolean
+ stop: Option[Date]
)
case class Worker(
@@ -201,7 +197,6 @@
}
sealed case class Snapshot(
- progress_messages: Progress_Messages,
builds: List[Build], // available build configurations
workers: List[Worker], // available worker processes
sessions: State.Sessions, // static build targets
@@ -223,7 +218,6 @@
sealed case class State(
serial: Long = 0,
- progress_seen: Long = 0,
numa_next: Int = 0,
sessions: State.Sessions = Map.empty,
pending: State.Pending = Nil,
@@ -237,10 +231,6 @@
copy(serial = i)
}
- def progress_serial(message_serial: Long = serial): State =
- if (message_serial > progress_seen) copy(progress_seen = message_serial)
- else error("Bad serial " + message_serial + " for progress output (already seen)")
-
def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) =
if (numa_nodes.isEmpty) (None, this)
else {
@@ -360,10 +350,8 @@
val options = SQL.Column.string("options")
val start = SQL.Column.date("start")
val stop = SQL.Column.date("stop")
- val progress_stopped = SQL.Column.bool("progress_stopped")
- val table =
- make_table("", List(build_uuid, ml_platform, options, start, stop, progress_stopped))
+ val table = make_table("", List(build_uuid, ml_platform, options, start, stop))
}
def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] =
@@ -376,16 +364,14 @@
val options = res.string(Base.options)
val start = res.date(Base.start)
val stop = res.get_date(Base.stop)
- val progress_stopped = res.bool(Base.progress_stopped)
- Build(build_uuid, ml_platform, options, start, stop, progress_stopped)
+ Build(build_uuid, ml_platform, options, start, stop)
})
def start_build(
db: SQL.Database,
build_uuid: String,
ml_platform: String,
- options: String,
- progress_stopped: Boolean
+ options: String
): Unit = {
db.execute_statement(Base.table.insert(), body =
{ stmt =>
@@ -394,7 +380,6 @@
stmt.string(3) = options
stmt.date(4) = db.now()
stmt.date(5) = None
- stmt.bool(6) = progress_stopped
})
}
@@ -482,83 +467,6 @@
}
- /* progress */
-
- object Progress {
- val serial = SQL.Column.long("serial").make_primary_key
- val kind = SQL.Column.int("kind")
- val text = SQL.Column.string("text")
- val verbose = SQL.Column.bool("verbose")
- val build_uuid = Generic.build_uuid
-
- val table = make_table("progress", List(serial, kind, text, verbose, build_uuid))
- }
-
- def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages =
- db.execute_query_statement(
- Progress.table.select(
- sql =
- SQL.where_and(
- if (seen <= 0) "" else Progress.serial.ident + " > " + seen,
- Generic.sql(build_uuid = build_uuid))),
- SortedMap.from[Long, isabelle.Progress.Message],
- { res =>
- val serial = res.long(Progress.serial)
- val kind = isabelle.Progress.Kind(res.int(Progress.kind))
- val text = res.string(Progress.text)
- val verbose = res.bool(Progress.verbose)
- serial -> isabelle.Progress.Message(kind, text, verbose = verbose)
- }
- )
-
- def write_progress(
- db: SQL.Database,
- message_serial: Long,
- message: isabelle.Progress.Message,
- build_uuid: String
- ): Unit = {
- db.execute_statement(Progress.table.insert(), body =
- { stmt =>
- stmt.long(1) = message_serial
- stmt.int(2) = message.kind.id
- stmt.string(3) = message.text
- stmt.bool(4) = message.verbose
- stmt.string(5) = build_uuid
- })
- }
-
- def sync_progress(
- db: SQL.Database,
- seen: Long,
- build_uuid: String,
- build_progress: Progress
- ): (Progress_Messages, Boolean) = {
- require(build_uuid.nonEmpty)
-
- val messages = read_progress(db, seen = seen, build_uuid = build_uuid)
-
- val stopped_db =
- db.execute_query_statementO[Boolean](
- Base.table.select(List(Base.progress_stopped),
- sql = SQL.where(Base.build_uuid.equal(build_uuid))),
- res => res.bool(Base.progress_stopped)
- ).getOrElse(false)
-
- def stop_db(): Unit =
- db.execute_statement(
- Base.table.update(
- List(Base.progress_stopped), sql = Base.build_uuid.where_equal(build_uuid)),
- body = { stmt => stmt.bool(1) = true })
-
- val stopped = build_progress.stopped
-
- if (stopped_db && !stopped) build_progress.stop()
- if (stopped && !stopped_db) stop_db()
-
- (messages, messages.isEmpty && stopped_db == stopped)
- }
-
-
/* workers */
object Workers {
@@ -854,7 +762,6 @@
SQL.Tables(
Base.table,
Workers.table,
- Progress.table,
Sessions.table,
Pending.table,
Running.table,
@@ -928,90 +835,54 @@
protected final val build_deps: Sessions.Deps = build_context.build_deps
protected final val hostname: String = build_context.hostname
protected final val build_uuid: String = build_context.build_uuid
- protected final val worker_uuid: String = UUID.random().toString
+
+
+ /* progress backed by database */
+
+ private val _database: Option[SQL.Database] = store.open_build_database()
- override def toString: String =
- "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) +
- if_proper(build_context.master, ", master = true") + ")"
+ protected val (progress, worker_uuid) = synchronized {
+ _database match {
+ case None => (build_progress, UUID.random().toString)
+ case Some(db) =>
+ val progress_db =
+ if (db.is_postgresql) store.open_database_server()
+ else db
+ val progress = new Database_Progress(progress_db, build_progress, context_uuid = build_uuid)
+ (progress, progress.agent_uuid)
+ }
+ }
+ protected val log: Logger = Logger.make_system_log(progress, build_options)
+
+ def close(): Unit = synchronized {
+ _database.foreach(_.close())
+ progress match {
+ case db_progress: Database_Progress =>
+ db_progress.exit()
+ db_progress.db.close()
+ }
+ }
/* global state: internal var vs. external database */
private var _state: Build_Process.State = Build_Process.State()
- private val _database: Option[SQL.Database] = store.open_build_database()
-
- def close(): Unit = synchronized { _database.foreach(_.close()) }
-
- protected def synchronized_database[A](body: => A): A =
- synchronized {
- _database match {
- case None => body
- case Some(db) =>
- def pull_database(): Unit = {
- _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state)
- }
-
- def sync_database(): Unit = {
- _state =
- Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state)
- }
-
- def attempt(): Either[A, Build_Process.Progress_Messages] = {
- val (messages, sync) =
- Build_Process.Data.sync_progress(
- db, _state.progress_seen, build_uuid, build_progress)
- if (sync) Left { pull_database(); val res = body; sync_database(); res }
- else Right(messages)
- }
-
- @tailrec def attempts(): A = {
- db.transaction_lock(Build_Process.Data.all_tables) { attempt() } match {
- case Left(res) => res
- case Right(messages) =>
- for ((message_serial, message) <- messages) {
- _state = _state.progress_serial(message_serial = message_serial)
- if (build_progress.do_output(message)) build_progress.output(message)
- }
- attempts()
- }
- }
- attempts()
- }
- }
-
-
- /* progress backed by database */
-
- private def progress_output(message: Progress.Message, build_progress_output: => Unit): Unit = {
- synchronized_database {
- _state = _state.inc_serial.progress_serial()
- for (db <- _database) {
- Build_Process.Data.write_progress(db, _state.serial, message, build_uuid)
- Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial)
- }
- build_progress_output
+ protected def synchronized_database[A](body: => A): A = synchronized {
+ _database match {
+ case None => body
+ case Some(db) =>
+ db.transaction_lock(Build_Process.Data.all_tables) {
+ progress.asInstanceOf[Database_Progress].sync()
+ _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state)
+ val res = body
+ _state =
+ Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state)
+ res
+ }
}
}
- protected object progress extends Progress {
- override def verbose: Boolean = build_progress.verbose
-
- override def output(message: Progress.Message): Unit =
- progress_output(message, if (do_output(message)) build_progress.output(message))
-
- override def theory(theory: Progress.Theory): Unit =
- progress_output(theory.message, build_progress.theory(theory))
-
- override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit =
- build_progress.nodes_status(nodes_status)
-
- override def stop(): Unit = build_progress.stop()
- override def stopped: Boolean = build_progress.stopped
- }
-
- protected val log: Logger = Logger.make_system_log(progress, build_options)
-
/* policy operations */
@@ -1110,7 +981,7 @@
protected final def start_build(): Unit = synchronized_database {
for (db <- _database) {
Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
- build_context.sessions_structure.session_prefs, progress.stopped)
+ build_context.sessions_structure.session_prefs)
}
}
@@ -1207,16 +1078,14 @@
/* snapshot */
def snapshot(): Build_Process.Snapshot = synchronized_database {
- val (progress_messages, builds, workers) =
+ val (builds, workers) =
_database match {
- case None => (Build_Process.progress_messages_empty, Nil, Nil)
+ case None => (Nil, Nil)
case Some(db) =>
- (Build_Process.Data.read_progress(db),
- Build_Process.Data.read_builds(db),
+ (Build_Process.Data.read_builds(db),
Build_Process.Data.read_workers(db))
}
Build_Process.Snapshot(
- progress_messages = progress_messages,
builds = builds,
workers = workers,
sessions = _state.sessions,
@@ -1224,4 +1093,11 @@
running = _state.running,
results = _state.results)
}
+
+
+ /* toString */
+
+ override def toString: String =
+ "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) +
+ if_proper(build_context.master, ", master = true") + ")"
}