--- a/src/Pure/Tools/build_process.scala Mon Mar 06 16:20:12 2023 +0100
+++ b/src/Pure/Tools/build_process.scala Mon Mar 06 17:29:00 2023 +0100
@@ -428,33 +428,69 @@
object Workers {
val worker_uuid = Generic.worker_uuid.make_primary_key
val build_uuid = Generic.build_uuid
+ val start = SQL.Column.date("start")
val stamp = SQL.Column.date("stamp")
+ val stop = SQL.Column.date("stop")
val serial = SQL.Column.long("serial")
- val table = make_table("workers", List(worker_uuid, build_uuid, stamp, serial))
+ val table = make_table("workers", List(worker_uuid, build_uuid, start, stamp, stop, serial))
val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")")
}
- def get_serial(db: SQL.Database, worker_uuid: String = ""): Long =
+ def serial_max(db: SQL.Database): Long =
db.using_statement(
- Workers.table.select(List(Workers.serial_max),
- sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))
+ Workers.table.select(List(Workers.serial_max))
)(stmt => stmt.execute_query().iterator(_.long(Workers.serial)).nextOption.getOrElse(0L))
- def set_serial(db: SQL.Database, worker_uuid: String, build_uuid: String, serial: Long): Unit =
- if (get_serial(db, worker_uuid = worker_uuid) != serial) {
- db.execute_statement(
- Workers.table.delete(sql = SQL.where(Generic.sql(worker_uuid = worker_uuid))))
- db.execute_statement(Workers.table.insert(), body =
- { stmt =>
- stmt.string(1) = worker_uuid
- stmt.string(2) = build_uuid
- stmt.date(3) = db.now()
- stmt.long(4) = serial
- })
+ def start_worker(db: SQL.Database, worker_uuid: String, build_uuid: String): Long = {
+ def err(msg: String): Nothing =
+ error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg))
+
+ val build_stop = {
+ val sql =
+ Base.table.select(List(Base.stop),
+ sql = SQL.where(Generic.sql(build_uuid = build_uuid)))
+ db.using_statement(sql)(_.execute_query().iterator(_.get_date(Base.stop)).nextOption)
+ }
+ build_stop match {
+ case Some(None) =>
+ case Some(Some(_)) => err("for already stopped build process " + build_uuid)
+ case None => err("for unknown build process " + build_uuid)
}
+ val serial = serial_max(db)
+ db.execute_statement(Workers.table.insert(), body =
+ { stmt =>
+ val now = db.now()
+ stmt.string(1) = worker_uuid
+ stmt.string(2) = build_uuid
+ stmt.date(3) = now
+ stmt.date(4) = now
+ stmt.date(5) = None
+ stmt.long(6) = serial
+ })
+ serial
+ }
+
+ def stamp_worker(
+ db: SQL.Database,
+ worker_uuid: String,
+ serial: Long,
+ stop: Boolean = false
+ ): Unit = {
+ val sql =
+ Workers.table.update(List(Workers.stamp, Workers.stop, Workers.serial),
+ sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))
+ db.execute_statement(sql, body =
+ { stmt =>
+ val now = db.now()
+ stmt.date(1) = now
+ stmt.date(2) = if (stop) Some(now) else None
+ stmt.long(3) = serial
+ })
+ }
+
/* pending jobs */
@@ -645,10 +681,10 @@
update_results(db, state.results),
Host.Data.update_numa_next(db, hostname, state.numa_next))
- val serial0 = get_serial(db)
+ val serial0 = serial_max(db)
val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
- set_serial(db, worker_uuid, build_uuid, serial)
+ stamp_worker(db, worker_uuid, serial)
state.set_serial(serial)
}
}
@@ -705,7 +741,7 @@
val state1 = _state.inc_serial.progress_serial()
for (db <- _database) {
Build_Process.Data.write_progress(db, state1.serial, message, build_uuid)
- Build_Process.Data.set_serial(db, worker_uuid, build_uuid, state1.serial)
+ Build_Process.Data.stamp_worker(db, worker_uuid, state1.serial)
}
build_progress_output
_state = state1
@@ -813,30 +849,46 @@
}
+ /* build process roles */
+
+ final def start_build(): Unit = synchronized_database {
+ for (db <- _database) {
+ Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
+ store.options.make_prefs(Options.init(prefs = "")))
+ }
+ }
+
+ final def stop_build(): Unit = synchronized_database {
+ for (db <- _database) {
+ Build_Process.Data.stop_build(db, build_uuid)
+ }
+ }
+
+ final def start_worker(): Unit = synchronized_database {
+ for (db <- _database) {
+ val serial = Build_Process.Data.start_worker(db, worker_uuid, build_uuid)
+ _state = _state.set_serial(serial)
+ }
+ }
+
+ final def stop_worker(): Unit = synchronized_database {
+ for (db <- _database) {
+ Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true)
+ }
+ }
+
+
/* run */
- def run(): Map[String, Process_Result] = {
+ def run(master: Boolean = false): Map[String, Process_Result] = {
def finished(): Boolean = synchronized_database { _state.finished }
- def init(): Unit = synchronized_database {
- for (db <- _database) {
- Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
- store.options.make_prefs(Options.init(prefs = "")))
- }
- }
-
- def exit(): Unit = synchronized_database {
- for (db <- _database) {
- Build_Process.Data.stop_build(db, build_uuid)
- }
- }
-
def sleep(): Unit =
Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
build_options.seconds("editor_input_delay").sleep()
}
- def start(): Boolean = synchronized_database {
+ def start_job(): Boolean = synchronized_database {
next_job(_state) match {
case Some(name) =>
if (Build_Job.is_session_name(name)) {
@@ -853,7 +905,8 @@
Map.empty[String, Process_Result]
}
else {
- init()
+ if (master) start_build()
+ start_worker()
try {
while (!finished()) {
if (progress.stopped) synchronized_database { _state.stop_running() }
@@ -869,13 +922,16 @@
}
}
- if (!start()) {
+ if (!start_job()) {
sync_database()
sleep()
}
}
}
- finally exit()
+ finally {
+ stop_worker()
+ if (master) stop_build()
+ }
synchronized_database {
for ((name, result) <- _state.results) yield name -> result.process_result