# HG changeset patch # User wenzelm # Date 1686752864 -7200 # Node ID da5cc332ded3bca8e04342f471c3fe786d763120 # Parent 54d6b2f75806698c63fe297dcb5fbfa51b24ba9e prefer Database_Progress, which is more robust (amending afb1a19307c4); diff -r 54d6b2f75806 -r da5cc332ded3 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Wed Jun 14 15:37:30 2023 +0200 +++ b/src/Pure/System/progress.scala Wed Jun 14 16:27:44 2023 +0200 @@ -237,8 +237,8 @@ /* database progress */ class Database_Progress( - db: SQL.Database, - base_progress: Progress, + val db: SQL.Database, + val base_progress: Progress, val hostname: String = Isabelle_System.hostname(), val context_uuid: String = UUID.random().toString) extends Progress { diff -r 54d6b2f75806 -r da5cc332ded3 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Wed Jun 14 15:37:30 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Wed Jun 14 16:27:44 2023 +0200 @@ -143,16 +143,12 @@ /** dynamic state **/ - type Progress_Messages = SortedMap[Long, Progress.Message] - val progress_messages_empty: Progress_Messages = SortedMap.empty - case class Build( build_uuid: String, ml_platform: String, options: String, start: Date, - stop: Option[Date], - progress_stopped: Boolean + stop: Option[Date] ) case class Worker( @@ -201,7 +197,6 @@ } sealed case class Snapshot( - progress_messages: Progress_Messages, builds: List[Build], // available build configurations workers: List[Worker], // available worker processes sessions: State.Sessions, // static build targets @@ -223,7 +218,6 @@ sealed case class State( serial: Long = 0, - progress_seen: Long = 0, numa_next: Int = 0, sessions: State.Sessions = Map.empty, pending: State.Pending = Nil, @@ -237,10 +231,6 @@ copy(serial = i) } - def progress_serial(message_serial: Long = serial): State = - if (message_serial > progress_seen) copy(progress_seen = message_serial) - else error("Bad serial " + message_serial + " for progress output (already seen)") - def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) = if (numa_nodes.isEmpty) (None, this) else { @@ -360,10 +350,8 @@ val options = SQL.Column.string("options") val start = SQL.Column.date("start") val stop = SQL.Column.date("stop") - val progress_stopped = SQL.Column.bool("progress_stopped") - val table = - make_table("", List(build_uuid, ml_platform, options, start, stop, progress_stopped)) + val table = make_table("", List(build_uuid, ml_platform, options, start, stop)) } def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] = @@ -376,16 +364,14 @@ val options = res.string(Base.options) val start = res.date(Base.start) val stop = res.get_date(Base.stop) - val progress_stopped = res.bool(Base.progress_stopped) - Build(build_uuid, ml_platform, options, start, stop, progress_stopped) + Build(build_uuid, ml_platform, options, start, stop) }) def start_build( db: SQL.Database, build_uuid: String, ml_platform: String, - options: String, - progress_stopped: Boolean + options: String ): Unit = { db.execute_statement(Base.table.insert(), body = { stmt => @@ -394,7 +380,6 @@ stmt.string(3) = options stmt.date(4) = db.now() stmt.date(5) = None - stmt.bool(6) = progress_stopped }) } @@ -482,83 +467,6 @@ } - /* progress */ - - object Progress { - val serial = SQL.Column.long("serial").make_primary_key - val kind = SQL.Column.int("kind") - val text = SQL.Column.string("text") - val verbose = SQL.Column.bool("verbose") - val build_uuid = Generic.build_uuid - - val table = make_table("progress", List(serial, kind, text, verbose, build_uuid)) - } - - def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages = - db.execute_query_statement( - Progress.table.select( - sql = - SQL.where_and( - if (seen <= 0) "" else Progress.serial.ident + " > " + seen, - Generic.sql(build_uuid = build_uuid))), - SortedMap.from[Long, isabelle.Progress.Message], - { res => - val serial = res.long(Progress.serial) - val kind = isabelle.Progress.Kind(res.int(Progress.kind)) - val text = res.string(Progress.text) - val verbose = res.bool(Progress.verbose) - serial -> isabelle.Progress.Message(kind, text, verbose = verbose) - } - ) - - def write_progress( - db: SQL.Database, - message_serial: Long, - message: isabelle.Progress.Message, - build_uuid: String - ): Unit = { - db.execute_statement(Progress.table.insert(), body = - { stmt => - stmt.long(1) = message_serial - stmt.int(2) = message.kind.id - stmt.string(3) = message.text - stmt.bool(4) = message.verbose - stmt.string(5) = build_uuid - }) - } - - def sync_progress( - db: SQL.Database, - seen: Long, - build_uuid: String, - build_progress: Progress - ): (Progress_Messages, Boolean) = { - require(build_uuid.nonEmpty) - - val messages = read_progress(db, seen = seen, build_uuid = build_uuid) - - val stopped_db = - db.execute_query_statementO[Boolean]( - Base.table.select(List(Base.progress_stopped), - sql = SQL.where(Base.build_uuid.equal(build_uuid))), - res => res.bool(Base.progress_stopped) - ).getOrElse(false) - - def stop_db(): Unit = - db.execute_statement( - Base.table.update( - List(Base.progress_stopped), sql = Base.build_uuid.where_equal(build_uuid)), - body = { stmt => stmt.bool(1) = true }) - - val stopped = build_progress.stopped - - if (stopped_db && !stopped) build_progress.stop() - if (stopped && !stopped_db) stop_db() - - (messages, messages.isEmpty && stopped_db == stopped) - } - - /* workers */ object Workers { @@ -854,7 +762,6 @@ SQL.Tables( Base.table, Workers.table, - Progress.table, Sessions.table, Pending.table, Running.table, @@ -928,90 +835,54 @@ protected final val build_deps: Sessions.Deps = build_context.build_deps protected final val hostname: String = build_context.hostname protected final val build_uuid: String = build_context.build_uuid - protected final val worker_uuid: String = UUID.random().toString + + + /* progress backed by database */ + + private val _database: Option[SQL.Database] = store.open_build_database() - override def toString: String = - "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) + - if_proper(build_context.master, ", master = true") + ")" + protected val (progress, worker_uuid) = synchronized { + _database match { + case None => (build_progress, UUID.random().toString) + case Some(db) => + val progress_db = + if (db.is_postgresql) store.open_database_server() + else db + val progress = new Database_Progress(progress_db, build_progress, context_uuid = build_uuid) + (progress, progress.agent_uuid) + } + } + protected val log: Logger = Logger.make_system_log(progress, build_options) + + def close(): Unit = synchronized { + _database.foreach(_.close()) + progress match { + case db_progress: Database_Progress => + db_progress.exit() + db_progress.db.close() + } + } /* global state: internal var vs. external database */ private var _state: Build_Process.State = Build_Process.State() - private val _database: Option[SQL.Database] = store.open_build_database() - - def close(): Unit = synchronized { _database.foreach(_.close()) } - - protected def synchronized_database[A](body: => A): A = - synchronized { - _database match { - case None => body - case Some(db) => - def pull_database(): Unit = { - _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state) - } - - def sync_database(): Unit = { - _state = - Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state) - } - - def attempt(): Either[A, Build_Process.Progress_Messages] = { - val (messages, sync) = - Build_Process.Data.sync_progress( - db, _state.progress_seen, build_uuid, build_progress) - if (sync) Left { pull_database(); val res = body; sync_database(); res } - else Right(messages) - } - - @tailrec def attempts(): A = { - db.transaction_lock(Build_Process.Data.all_tables) { attempt() } match { - case Left(res) => res - case Right(messages) => - for ((message_serial, message) <- messages) { - _state = _state.progress_serial(message_serial = message_serial) - if (build_progress.do_output(message)) build_progress.output(message) - } - attempts() - } - } - attempts() - } - } - - - /* progress backed by database */ - - private def progress_output(message: Progress.Message, build_progress_output: => Unit): Unit = { - synchronized_database { - _state = _state.inc_serial.progress_serial() - for (db <- _database) { - Build_Process.Data.write_progress(db, _state.serial, message, build_uuid) - Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial) - } - build_progress_output + protected def synchronized_database[A](body: => A): A = synchronized { + _database match { + case None => body + case Some(db) => + db.transaction_lock(Build_Process.Data.all_tables) { + progress.asInstanceOf[Database_Progress].sync() + _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state) + val res = body + _state = + Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state) + res + } } } - protected object progress extends Progress { - override def verbose: Boolean = build_progress.verbose - - override def output(message: Progress.Message): Unit = - progress_output(message, if (do_output(message)) build_progress.output(message)) - - override def theory(theory: Progress.Theory): Unit = - progress_output(theory.message, build_progress.theory(theory)) - - override def nodes_status(nodes_status: Document_Status.Nodes_Status): Unit = - build_progress.nodes_status(nodes_status) - - override def stop(): Unit = build_progress.stop() - override def stopped: Boolean = build_progress.stopped - } - - protected val log: Logger = Logger.make_system_log(progress, build_options) - /* policy operations */ @@ -1110,7 +981,7 @@ protected final def start_build(): Unit = synchronized_database { for (db <- _database) { Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform, - build_context.sessions_structure.session_prefs, progress.stopped) + build_context.sessions_structure.session_prefs) } } @@ -1207,16 +1078,14 @@ /* snapshot */ def snapshot(): Build_Process.Snapshot = synchronized_database { - val (progress_messages, builds, workers) = + val (builds, workers) = _database match { - case None => (Build_Process.progress_messages_empty, Nil, Nil) + case None => (Nil, Nil) case Some(db) => - (Build_Process.Data.read_progress(db), - Build_Process.Data.read_builds(db), + (Build_Process.Data.read_builds(db), Build_Process.Data.read_workers(db)) } Build_Process.Snapshot( - progress_messages = progress_messages, builds = builds, workers = workers, sessions = _state.sessions, @@ -1224,4 +1093,11 @@ running = _state.running, results = _state.results) } + + + /* toString */ + + override def toString: String = + "Build_Process(worker_uuid = " + quote(worker_uuid) + ", build_uuid = " + quote(build_uuid) + + if_proper(build_context.master, ", master = true") + ")" }