--- a/src/Pure/Tools/build_job.scala Sun Mar 05 20:41:45 2023 +0100
+++ b/src/Pure/Tools/build_job.scala Mon Mar 06 09:32:18 2023 +0100
@@ -50,7 +50,7 @@
object Session_Context {
def load(
- uuid: String,
+ build_uuid: String,
name: String,
deps: List[String],
ancestors: List[String],
@@ -61,7 +61,7 @@
): Session_Context = {
def default: Session_Context =
Session_Context(
- name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty, uuid)
+ name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty, build_uuid)
store.try_open_database(name) match {
case None => default
@@ -80,7 +80,7 @@
case _ => Time.zero
}
new Session_Context(
- name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings, uuid)
+ name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings, build_uuid)
}
catch {
case ERROR(msg) => ignore_error(msg)
@@ -100,7 +100,7 @@
timeout: Time,
old_time: Time,
old_command_timings_blob: Bytes,
- uuid: String
+ build_uuid: String
) {
override def toString: String = name
}
@@ -498,7 +498,8 @@
sources = build_context.sources_shasum(session_name),
input_heaps = input_shasum,
output_heap = output_shasum,
- process_result.rc, build_context.uuid)))
+ process_result.rc,
+ build_context.build_uuid)))
// messages
process_result.err_lines.foreach(progress.echo(_))
--- a/src/Pure/Tools/build_process.scala Sun Mar 05 20:41:45 2023 +0100
+++ b/src/Pure/Tools/build_process.scala Mon Mar 06 09:32:18 2023 +0100
@@ -28,7 +28,7 @@
fresh_build: Boolean = false,
no_build: Boolean = false,
session_setup: (String, Session) => Unit = (_, _) => (),
- uuid: String = UUID.random().toString
+ build_uuid: String = UUID.random().toString
): Context = {
val sessions_structure = build_deps.sessions_structure
val build_graph = sessions_structure.build_graph
@@ -42,7 +42,7 @@
val sources_shasum = build_deps.sources_shasum(name)
val session_context =
Build_Job.Session_Context.load(
- uuid, name, deps, ancestors, sources_shasum, info.timeout, store,
+ build_uuid, name, deps, ancestors, sources_shasum, info.timeout, store,
progress = progress)
name -> session_context
})
@@ -82,11 +82,10 @@
val numa_nodes = Host.numa_nodes(enabled = numa_shuffling)
new Context(store, build_deps, sessions, ordering, hostname, numa_nodes,
build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build,
- no_build = no_build, session_setup, uuid = uuid)
+ no_build = no_build, session_setup, build_uuid = build_uuid)
}
}
- // static context of one particular instance, identified by uuid
final class Context private(
val store: Sessions.Store,
val build_deps: Sessions.Deps,
@@ -99,7 +98,7 @@
val fresh_build: Boolean,
val no_build: Boolean,
val session_setup: (String, Session) => Unit,
- val uuid: String
+ val build_uuid: String
) {
def build_options: Options = store.options
@@ -162,7 +161,6 @@
}
}
- // dynamic state of various instances, distinguished by uuid
sealed case class State(
serial: Long = 0,
progress_seen: Long = 0,
@@ -241,34 +239,38 @@
SQL.Table("isabelle_build" + if_proper(name, "_" + name), columns, body = body)
object Generic {
- val uuid = SQL.Column.string("uuid")
+ val build_uuid = SQL.Column.string("build_uuid")
+ val worker_uuid = SQL.Column.string("worker_uuid")
val name = SQL.Column.string("name")
- def sql_equal(uuid: String = "", name: String = ""): SQL.Source =
+ def sql(
+ build_uuid: String = "",
+ worker_uuid: String = "",
+ name: String = "",
+ names: Iterable[String] = Nil
+ ): SQL.Source =
SQL.and(
- if_proper(uuid, Generic.uuid.equal(uuid)),
- if_proper(name, Generic.name.equal(name)))
-
- def sql_member(uuid: String = "", names: Iterable[String] = Nil): SQL.Source =
- SQL.and(
- if_proper(uuid, Generic.uuid.equal(uuid)),
+ if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)),
+ if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)),
+ if_proper(name, Generic.name.equal(name)),
if_proper(names, Generic.name.member(names)))
}
object Base {
- val uuid = Generic.uuid.make_primary_key
+ val build_uuid = Generic.build_uuid.make_primary_key
val ml_platform = SQL.Column.string("ml_platform")
val options = SQL.Column.string("options")
- val table = make_table("", List(uuid, ml_platform, options))
+ val table = make_table("", List(build_uuid, ml_platform, options))
}
- object Serial {
- val uuid = Generic.uuid.make_primary_key
+ object Workers {
+ val worker_uuid = Generic.worker_uuid.make_primary_key
+ val build_uuid = Generic.build_uuid
val stamp = SQL.Column.date("stamp")
val serial = SQL.Column.long("serial")
- val table = make_table("serial", List(uuid, stamp, serial))
+ val table = make_table("workers", List(worker_uuid, build_uuid, stamp, serial))
val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")")
}
@@ -278,9 +280,9 @@
val kind = SQL.Column.int("kind")
val text = SQL.Column.string("text")
val verbose = SQL.Column.bool("verbose")
- val uuid = Generic.uuid
+ val build_uuid = Generic.build_uuid
- val table = make_table("progress", List(serial, kind, text, verbose, uuid))
+ val table = make_table("progress", List(serial, kind, text, verbose, build_uuid))
}
object Sessions {
@@ -291,10 +293,10 @@
val timeout = SQL.Column.long("timeout")
val old_time = SQL.Column.long("old_time")
val old_command_timings = SQL.Column.bytes("old_command_timings")
- val uuid = Generic.uuid
+ val build_uuid = Generic.build_uuid
val table = make_table("sessions",
- List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, uuid))
+ List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, build_uuid))
}
object Pending {
@@ -329,33 +331,35 @@
List(name, hostname, numa_node, rc, out, err, timing_elapsed, timing_cpu, timing_gc))
}
- def get_serial(db: SQL.Database, uuid: String = ""): Long =
+ def get_serial(db: SQL.Database, worker_uuid: String = ""): Long =
db.using_statement(
- Serial.table.select(List(Serial.serial_max),
- sql = SQL.where(Generic.sql_equal(uuid = uuid)))
- )(stmt => stmt.execute_query().iterator(_.long(Serial.serial)).nextOption.getOrElse(0L))
+ Workers.table.select(List(Workers.serial_max),
+ sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))
+ )(stmt => stmt.execute_query().iterator(_.long(Workers.serial)).nextOption.getOrElse(0L))
- def set_serial(db: SQL.Database, uuid: String, stamp: Date, serial: Long): Unit =
- if (get_serial(db, uuid = uuid) != serial) {
+ def set_serial(db: SQL.Database, worker_uuid: String, build_uuid: String, serial: Long): Unit = {
+ if (get_serial(db, worker_uuid = worker_uuid) != serial) {
db.using_statement(
- Serial.table.delete(sql = SQL.where(Generic.sql_equal(uuid = uuid)))
+ Workers.table.delete(sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))
)(_.execute())
- db.using_statement(Serial.table.insert()) { stmt =>
- stmt.string(1) = uuid
- stmt.date(2) = stamp
- stmt.long(3) = serial
+ db.using_statement(Workers.table.insert()) { stmt =>
+ stmt.string(1) = worker_uuid
+ stmt.string(2) = build_uuid
+ stmt.date(3) = db.now()
+ stmt.long(4) = serial
stmt.execute()
}
}
+ }
- def read_progress(db: SQL.Database, seen: Long = 0, uuid: String = ""): Progress_Messages =
+ def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages =
db.using_statement(
Progress.table.select(
sql =
SQL.where(
SQL.and(
if (seen <= 0) "" else Progress.serial.ident + " > " + seen,
- Generic.sql_equal(uuid = uuid))))
+ Generic.sql(build_uuid = build_uuid))))
) { stmt =>
SortedMap.from(stmt.execute_query().iterator { res =>
val serial = res.long(Progress.serial)
@@ -370,14 +374,14 @@
db: SQL.Database,
message_serial: Long,
message: isabelle.Progress.Message,
- uuid: String
+ build_uuid: String
): Unit = {
db.using_statement(Progress.table.insert()) { stmt =>
stmt.long(1) = message_serial
stmt.int(2) = message.kind.id
stmt.string(3) = message.text
stmt.bool(4) = message.verbose
- stmt.string(5) = uuid
+ stmt.string(5) = build_uuid
stmt.execute()
}
}
@@ -398,9 +402,9 @@
val timeout = Time.ms(res.long(Sessions.timeout))
val old_time = Time.ms(res.long(Sessions.old_time))
val old_command_timings_blob = res.bytes(Sessions.old_command_timings)
- val uuid = res.string(Sessions.uuid)
+ val build_uuid = res.string(Sessions.build_uuid)
name -> Build_Job.Session_Context(name, deps, ancestors, sources_shasum,
- timeout, old_time, old_command_timings_blob, uuid)
+ timeout, old_time, old_command_timings_blob, build_uuid)
})
}
@@ -417,7 +421,7 @@
stmt.long(5) = session.timeout.ms
stmt.long(6) = session.old_time.ms
stmt.bytes(7) = session.old_command_timings_blob
- stmt.string(8) = session.uuid
+ stmt.string(8) = session.build_uuid
stmt.execute()
}
}
@@ -443,7 +447,7 @@
if (delete.nonEmpty) {
db.using_statement(
Pending.table.delete(
- sql = SQL.where(Generic.sql_member(names = delete.map(_.name)))))(_.execute())
+ sql = SQL.where(Generic.sql(names = delete.map(_.name)))))(_.execute())
}
for (entry <- insert) {
@@ -478,7 +482,7 @@
if (delete.nonEmpty) {
db.using_statement(
Running.table.delete(
- sql = SQL.where(Generic.sql_member(names = delete.map(_.job_name)))))(_.execute())
+ sql = SQL.where(Generic.sql(names = delete.map(_.job_name)))))(_.execute())
}
for (job <- insert) {
@@ -551,7 +555,7 @@
val tables =
List(
Base.table,
- Serial.table,
+ Workers.table,
Progress.table,
Sessions.table,
Pending.table,
@@ -570,7 +574,7 @@
for (table <- tables) db.using_statement(table.delete())(_.execute())
db.using_statement(Base.table.insert()) { stmt =>
- stmt.string(1) = build_context.uuid
+ stmt.string(1) = build_context.build_uuid
stmt.string(2) = Isabelle_System.getenv("ML_PLATFORM")
stmt.string(3) = build_context.store.options.make_prefs(Options.init(prefs = ""))
stmt.execute()
@@ -579,7 +583,8 @@
def update_database(
db: SQL.Database,
- uuid: String,
+ worker_uuid: String,
+ build_uuid: String,
hostname: String,
state: State
): State = {
@@ -594,7 +599,7 @@
val serial0 = get_serial(db)
val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
- set_serial(db, uuid, db.now(), serial)
+ set_serial(db, worker_uuid, build_uuid, serial)
state.set_serial(serial)
}
}
@@ -614,6 +619,8 @@
protected val store: Sessions.Store = build_context.store
protected val build_options: Options = store.options
protected val build_deps: Sessions.Deps = build_context.build_deps
+ protected val build_uuid: String = build_context.build_uuid
+ protected val worker_uuid: String = UUID.random().toString
/* global state: internal var vs. external database */
@@ -645,7 +652,7 @@
for (db <- _database) {
_state =
Build_Process.Data.update_database(
- db, build_context.uuid, build_context.hostname, _state)
+ db, worker_uuid, build_uuid, build_context.hostname, _state)
}
}
@@ -656,8 +663,8 @@
synchronized_database {
val state1 = _state.inc_serial.progress_serial()
for (db <- _database) {
- Build_Process.Data.write_progress(db, state1.serial, message, build_context.uuid)
- Build_Process.Data.set_serial(db, build_context.uuid, db.now(), state1.serial)
+ Build_Process.Data.write_progress(db, state1.serial, message, build_uuid)
+ Build_Process.Data.set_serial(db, worker_uuid, build_uuid, state1.serial)
}
body
_state = state1