--- a/src/Pure/Build/build_process.scala Sun Mar 10 14:20:20 2024 +0100
+++ b/src/Pure/Build/build_process.scala Sun Mar 10 17:30:23 2024 +0100
@@ -18,6 +18,7 @@
sealed case class Build(
build_uuid: String, // Database_Progress.context_uuid
+ build_id: Long,
ml_platform: String,
options: String,
start: Date,
@@ -298,6 +299,10 @@
Base.table,
Workers.table)
+ private lazy val build_uuid_tables = tables.filter(Generic.build_uuid_table)
+ private lazy val build_id_tables =
+ tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t))
+
def pull[A <: Library.Named](
data_domain: Set[String],
data_iterator: Set[String] => Iterator[A],
@@ -325,26 +330,40 @@
}
object Generic {
+ val build_id = SQL.Column.long("build_id")
val build_uuid = SQL.Column.string("build_uuid")
val worker_uuid = SQL.Column.string("worker_uuid")
val name = SQL.Column.string("name")
+ def build_id_table(table: SQL.Table): Boolean =
+ table.columns.exists(column => column.name == build_id.name)
+
+ def build_uuid_table(table: SQL.Table): Boolean =
+ table.columns.exists(column => column.name == build_uuid.name)
+
def sql(
+ build_id: Long = 0,
build_uuid: String = "",
worker_uuid: String = "",
names: Iterable[String] = Nil
): SQL.Source =
SQL.and(
+ if_proper(build_id > 0, Generic.build_id.equal(build_id)),
if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)),
if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)),
if_proper(names, Generic.name.member(names)))
def sql_where(
+ build_id: Long = 0,
build_uuid: String = "",
worker_uuid: String = "",
names: Iterable[String] = Nil
): SQL.Source = {
- SQL.where(sql(build_uuid = build_uuid, worker_uuid = worker_uuid, names = names))
+ SQL.where(sql(
+ build_id = build_id,
+ build_uuid = build_uuid,
+ worker_uuid = worker_uuid,
+ names = names))
}
}
@@ -352,20 +371,30 @@
/* recorded updates */
object Updates {
+ val build_id = Generic.build_id.make_primary_key
val serial = SQL.Column.long("serial").make_primary_key
val kind = SQL.Column.int("kind").make_primary_key
val name = Generic.name.make_primary_key
- val table = make_table(List(serial, kind, name), name = "updates")
+ val table = make_table(List(build_id, serial, kind, name), name = "updates")
}
- def write_updates(db: SQL.Database, serial: Long, updates: List[Library.Update]): Unit =
+ def write_updates(
+ db: SQL.Database,
+ build_id: Long,
+ serial: Long,
+ updates: List[Library.Update]
+ ): Unit =
db.execute_batch_statement(db.insert_permissive(Updates.table), batch =
for (update <- updates.iterator; kind = update.kind; name <- update.domain.iterator)
yield { (stmt: SQL.Statement) =>
- stmt.long(1) = serial
- stmt.int(2) = kind
- stmt.string(3) = name
+ require(build_id > 0 && serial > 0 && kind > 0 && name.nonEmpty,
+ "Bad database update: build_id = " + build_id + ", serial = " + serial +
+ ", kind = " + kind + ", name = " + quote(name))
+ stmt.long(1) = build_id
+ stmt.long(2) = serial
+ stmt.int(3) = kind
+ stmt.string(4) = name
})
@@ -373,12 +402,28 @@
object Base {
val build_uuid = Generic.build_uuid.make_primary_key
+ val build_id = Generic.build_id.make_primary_key
val ml_platform = SQL.Column.string("ml_platform")
val options = SQL.Column.string("options")
val start = SQL.Column.date("start")
val stop = SQL.Column.date("stop")
- val table = make_table(List(build_uuid, ml_platform, options, start, stop))
+ val table = make_table(List(build_uuid, build_id, ml_platform, options, start, stop))
+ }
+
+ def read_build_ids(db: SQL.Database, build_uuids: List[String]): List[Long] =
+ db.execute_query_statement(
+ Base.table.select(List(Base.build_id),
+ sql = if_proper(build_uuids, Base.build_uuid.where_member(build_uuids))),
+ List.from[Long], res => res.long(Base.build_id))
+
+ def get_build_id(db: SQL.Database, build_uuid: String): Long = {
+ read_build_ids(db, build_uuids = List(build_uuid)) match {
+ case build_id :: _ => build_id
+ case _ =>
+ db.execute_query_statementO(
+ Base.table.select(List(Base.build_id.max)), _.long(Base.build_id)).getOrElse(0L) + 1L
+ }
}
def read_builds(db: SQL.Database): List[Build] = {
@@ -386,11 +431,12 @@
db.execute_query_statement(Base.table.select(), List.from[Build],
{ res =>
val build_uuid = res.string(Base.build_uuid)
+ val build_id = res.long(Base.build_id)
val ml_platform = res.string(Base.ml_platform)
val options = res.string(Base.options)
val start = res.date(Base.start)
val stop = res.get_date(Base.stop)
- Build(build_uuid, ml_platform, options, start, stop, Nil)
+ Build(build_uuid, build_id, ml_platform, options, start, stop, Nil)
})
for (build <- builds.sortBy(_.start)(Date.Ordering)) yield {
@@ -399,14 +445,21 @@
}
}
- def remove_builds(db: SQL.Database, remove: List[String]): Unit =
- if (remove.nonEmpty) {
- val sql = Generic.build_uuid.where_member(remove)
- db.execute_statement(SQL.MULTI(build_uuid_tables.map(_.delete(sql = sql))))
+ def remove_builds(db: SQL.Database, build_uuids: List[String]): Unit =
+ if (build_uuids.nonEmpty) {
+ val build_ids = read_build_ids(db, build_uuids = build_uuids)
+
+ val sql1 = Generic.build_uuid.where_member(build_uuids)
+ val sql2 = Generic.build_id.where_member_long(build_ids)
+ db.execute_statement(
+ SQL.MULTI(
+ build_uuid_tables.map(_.delete(sql = sql1)) ++
+ build_id_tables.map(_.delete(sql = sql2))))
}
def start_build(
db: SQL.Database,
+ build_id: Long,
build_uuid: String,
ml_platform: String,
options: String,
@@ -415,10 +468,11 @@
db.execute_statement(Base.table.insert(), body =
{ stmt =>
stmt.string(1) = build_uuid
- stmt.string(2) = ml_platform
- stmt.string(3) = options
- stmt.date(4) = start
- stmt.date(5) = None
+ stmt.long(2) = build_id
+ stmt.string(3) = ml_platform
+ stmt.string(4) = options
+ stmt.date(5) = start
+ stmt.date(6) = None
})
}
@@ -543,15 +597,9 @@
lazy val table_index: Int = tables.index(table)
}
- def read_serial(db: SQL.Database): Long = {
- val a =
- db.execute_query_statementO[Long](
- Workers.table.select(List(Workers.serial.max)), _.long(Workers.serial))
- val b =
- db.execute_query_statementO[Long](
- Updates.table.select(List(Updates.serial.max)), _.long(Updates.serial))
- a.getOrElse(0L) max b.getOrElse(0L)
- }
+ def read_serial(db: SQL.Database): Long =
+ db.execute_query_statementO[Long](
+ Workers.table.select(List(Workers.serial.max)), _.long(Workers.serial)).getOrElse(0L)
def read_workers(
db: SQL.Database,
@@ -851,10 +899,6 @@
/* collective operations */
- private val build_uuid_tables =
- tables.filter(table =>
- table.columns.exists(column => column.name == Generic.build_uuid.name))
-
def pull_database(db: SQL.Database, worker_uuid: String, state: State): State = {
val serial_db = read_serial(db)
if (serial_db == state.serial) state
@@ -874,6 +918,7 @@
def update_database(
db: SQL.Database,
+ build_id: Long,
worker_uuid: String,
state: State,
old_state: State
@@ -887,7 +932,7 @@
if (updates.exists(_.defined)) {
val serial = state.next_serial
- // FIXME write_updates(db, serial, updates)
+ write_updates(db, build_id, serial, updates)
stamp_worker(db, worker_uuid, serial)
state.copy(serial = serial)
}
@@ -899,6 +944,11 @@
private_data.transaction_lock(db, create = true, label = "Build_Process.read_builds") {
private_data.read_builds(db)
}
+
+ def get_build_id(db: SQL.Database, build_uuid: String): Long =
+ private_data.transaction_lock(db, create = true, label = "Build_Process.build_id") {
+ private_data.get_build_id(db, build_uuid)
+ }
}
@@ -993,6 +1043,12 @@
}
}
+ protected val build_id: Long =
+ _build_database match {
+ case None => 0L
+ case Some(db) => Build_Process.get_build_id(db, build_uuid)
+ }
+
protected val build_start: Date = build_context.build_start getOrElse progress.now()
protected val log: Logger = Logger.make_system_log(progress, build_options)
@@ -1033,7 +1089,9 @@
val old_state = Build_Process.private_data.pull_database(db, worker_uuid, _state)
_state = old_state
val res = body
- _state = Build_Process.private_data.update_database(db, worker_uuid, _state, old_state)
+ _state =
+ Build_Process.private_data.update_database(
+ db, build_id, worker_uuid, _state, old_state)
res
}
}
@@ -1155,8 +1213,8 @@
protected final def start_build(): Unit = synchronized_database("Build_Process.start_build") {
for (db <- _build_database) {
- Build_Process.private_data.start_build(db, build_uuid, build_context.ml_platform,
- build_context.sessions_structure.session_prefs, build_start)
+ Build_Process.private_data.start_build(db, build_id, build_uuid,
+ build_context.ml_platform, build_context.sessions_structure.session_prefs, build_start)
}
}
@@ -1216,7 +1274,6 @@
_state = _state.copy(numa_nodes = numa_nodes)
}
-
protected def main_unsynchronized(): Unit = {
for (job <- _state.build_running.filter(_.is_finished)) {
_state = _state.remove_running(job.name)