# HG changeset patch # User wenzelm # Date 1678091538 -3600 # Node ID 40ccee0fe19a8603d52e5a91e2d2ab210bb72105 # Parent 26ec258e5cf8fcb16d2dbd712beaee43817a406a separate static build_uuid from dynamic worker_uuid, to allow multiple worker processes participate in one build process; diff -r 26ec258e5cf8 -r 40ccee0fe19a src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Sun Mar 05 20:41:45 2023 +0100 +++ b/src/Pure/Tools/build_job.scala Mon Mar 06 09:32:18 2023 +0100 @@ -50,7 +50,7 @@ object Session_Context { def load( - uuid: String, + build_uuid: String, name: String, deps: List[String], ancestors: List[String], @@ -61,7 +61,7 @@ ): Session_Context = { def default: Session_Context = Session_Context( - name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty, uuid) + name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty, build_uuid) store.try_open_database(name) match { case None => default @@ -80,7 +80,7 @@ case _ => Time.zero } new Session_Context( - name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings, uuid) + name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings, build_uuid) } catch { case ERROR(msg) => ignore_error(msg) @@ -100,7 +100,7 @@ timeout: Time, old_time: Time, old_command_timings_blob: Bytes, - uuid: String + build_uuid: String ) { override def toString: String = name } @@ -498,7 +498,8 @@ sources = build_context.sources_shasum(session_name), input_heaps = input_shasum, output_heap = output_shasum, - process_result.rc, build_context.uuid))) + process_result.rc, + build_context.build_uuid))) // messages process_result.err_lines.foreach(progress.echo(_)) diff -r 26ec258e5cf8 -r 40ccee0fe19a src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sun Mar 05 20:41:45 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Mon Mar 06 09:32:18 2023 +0100 @@ -28,7 +28,7 @@ fresh_build: Boolean = false, no_build: Boolean = false, session_setup: (String, Session) => Unit = (_, _) => (), - uuid: String = UUID.random().toString + build_uuid: String = UUID.random().toString ): Context = { val sessions_structure = build_deps.sessions_structure val build_graph = sessions_structure.build_graph @@ -42,7 +42,7 @@ val sources_shasum = build_deps.sources_shasum(name) val session_context = Build_Job.Session_Context.load( - uuid, name, deps, ancestors, sources_shasum, info.timeout, store, + build_uuid, name, deps, ancestors, sources_shasum, info.timeout, store, progress = progress) name -> session_context }) @@ -82,11 +82,10 @@ val numa_nodes = Host.numa_nodes(enabled = numa_shuffling) new Context(store, build_deps, sessions, ordering, hostname, numa_nodes, build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build, - no_build = no_build, session_setup, uuid = uuid) + no_build = no_build, session_setup, build_uuid = build_uuid) } } - // static context of one particular instance, identified by uuid final class Context private( val store: Sessions.Store, val build_deps: Sessions.Deps, @@ -99,7 +98,7 @@ val fresh_build: Boolean, val no_build: Boolean, val session_setup: (String, Session) => Unit, - val uuid: String + val build_uuid: String ) { def build_options: Options = store.options @@ -162,7 +161,6 @@ } } - // dynamic state of various instances, distinguished by uuid sealed case class State( serial: Long = 0, progress_seen: Long = 0, @@ -241,34 +239,38 @@ SQL.Table("isabelle_build" + if_proper(name, "_" + name), columns, body = body) object Generic { - val uuid = SQL.Column.string("uuid") + val build_uuid = SQL.Column.string("build_uuid") + val worker_uuid = SQL.Column.string("worker_uuid") val name = SQL.Column.string("name") - def sql_equal(uuid: String = "", name: String = ""): SQL.Source = + def sql( + build_uuid: String = "", + worker_uuid: String = "", + name: String = "", + names: Iterable[String] = Nil + ): SQL.Source = SQL.and( - if_proper(uuid, Generic.uuid.equal(uuid)), - if_proper(name, Generic.name.equal(name))) - - def sql_member(uuid: String = "", names: Iterable[String] = Nil): SQL.Source = - SQL.and( - if_proper(uuid, Generic.uuid.equal(uuid)), + if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)), + if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)), + if_proper(name, Generic.name.equal(name)), if_proper(names, Generic.name.member(names))) } object Base { - val uuid = Generic.uuid.make_primary_key + val build_uuid = Generic.build_uuid.make_primary_key val ml_platform = SQL.Column.string("ml_platform") val options = SQL.Column.string("options") - val table = make_table("", List(uuid, ml_platform, options)) + val table = make_table("", List(build_uuid, ml_platform, options)) } - object Serial { - val uuid = Generic.uuid.make_primary_key + object Workers { + val worker_uuid = Generic.worker_uuid.make_primary_key + val build_uuid = Generic.build_uuid val stamp = SQL.Column.date("stamp") val serial = SQL.Column.long("serial") - val table = make_table("serial", List(uuid, stamp, serial)) + val table = make_table("workers", List(worker_uuid, build_uuid, stamp, serial)) val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")") } @@ -278,9 +280,9 @@ val kind = SQL.Column.int("kind") val text = SQL.Column.string("text") val verbose = SQL.Column.bool("verbose") - val uuid = Generic.uuid + val build_uuid = Generic.build_uuid - val table = make_table("progress", List(serial, kind, text, verbose, uuid)) + val table = make_table("progress", List(serial, kind, text, verbose, build_uuid)) } object Sessions { @@ -291,10 +293,10 @@ val timeout = SQL.Column.long("timeout") val old_time = SQL.Column.long("old_time") val old_command_timings = SQL.Column.bytes("old_command_timings") - val uuid = Generic.uuid + val build_uuid = Generic.build_uuid val table = make_table("sessions", - List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, uuid)) + List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, build_uuid)) } object Pending { @@ -329,33 +331,35 @@ List(name, hostname, numa_node, rc, out, err, timing_elapsed, timing_cpu, timing_gc)) } - def get_serial(db: SQL.Database, uuid: String = ""): Long = + def get_serial(db: SQL.Database, worker_uuid: String = ""): Long = db.using_statement( - Serial.table.select(List(Serial.serial_max), - sql = SQL.where(Generic.sql_equal(uuid = uuid))) - )(stmt => stmt.execute_query().iterator(_.long(Serial.serial)).nextOption.getOrElse(0L)) + Workers.table.select(List(Workers.serial_max), + sql = SQL.where(Generic.sql(worker_uuid = worker_uuid))) + )(stmt => stmt.execute_query().iterator(_.long(Workers.serial)).nextOption.getOrElse(0L)) - def set_serial(db: SQL.Database, uuid: String, stamp: Date, serial: Long): Unit = - if (get_serial(db, uuid = uuid) != serial) { + def set_serial(db: SQL.Database, worker_uuid: String, build_uuid: String, serial: Long): Unit = { + if (get_serial(db, worker_uuid = worker_uuid) != serial) { db.using_statement( - Serial.table.delete(sql = SQL.where(Generic.sql_equal(uuid = uuid))) + Workers.table.delete(sql = SQL.where(Generic.sql(worker_uuid = worker_uuid))) )(_.execute()) - db.using_statement(Serial.table.insert()) { stmt => - stmt.string(1) = uuid - stmt.date(2) = stamp - stmt.long(3) = serial + db.using_statement(Workers.table.insert()) { stmt => + stmt.string(1) = worker_uuid + stmt.string(2) = build_uuid + stmt.date(3) = db.now() + stmt.long(4) = serial stmt.execute() } } + } - def read_progress(db: SQL.Database, seen: Long = 0, uuid: String = ""): Progress_Messages = + def read_progress(db: SQL.Database, seen: Long = 0, build_uuid: String = ""): Progress_Messages = db.using_statement( Progress.table.select( sql = SQL.where( SQL.and( if (seen <= 0) "" else Progress.serial.ident + " > " + seen, - Generic.sql_equal(uuid = uuid)))) + Generic.sql(build_uuid = build_uuid)))) ) { stmt => SortedMap.from(stmt.execute_query().iterator { res => val serial = res.long(Progress.serial) @@ -370,14 +374,14 @@ db: SQL.Database, message_serial: Long, message: isabelle.Progress.Message, - uuid: String + build_uuid: String ): Unit = { db.using_statement(Progress.table.insert()) { stmt => stmt.long(1) = message_serial stmt.int(2) = message.kind.id stmt.string(3) = message.text stmt.bool(4) = message.verbose - stmt.string(5) = uuid + stmt.string(5) = build_uuid stmt.execute() } } @@ -398,9 +402,9 @@ val timeout = Time.ms(res.long(Sessions.timeout)) val old_time = Time.ms(res.long(Sessions.old_time)) val old_command_timings_blob = res.bytes(Sessions.old_command_timings) - val uuid = res.string(Sessions.uuid) + val build_uuid = res.string(Sessions.build_uuid) name -> Build_Job.Session_Context(name, deps, ancestors, sources_shasum, - timeout, old_time, old_command_timings_blob, uuid) + timeout, old_time, old_command_timings_blob, build_uuid) }) } @@ -417,7 +421,7 @@ stmt.long(5) = session.timeout.ms stmt.long(6) = session.old_time.ms stmt.bytes(7) = session.old_command_timings_blob - stmt.string(8) = session.uuid + stmt.string(8) = session.build_uuid stmt.execute() } } @@ -443,7 +447,7 @@ if (delete.nonEmpty) { db.using_statement( Pending.table.delete( - sql = SQL.where(Generic.sql_member(names = delete.map(_.name)))))(_.execute()) + sql = SQL.where(Generic.sql(names = delete.map(_.name)))))(_.execute()) } for (entry <- insert) { @@ -478,7 +482,7 @@ if (delete.nonEmpty) { db.using_statement( Running.table.delete( - sql = SQL.where(Generic.sql_member(names = delete.map(_.job_name)))))(_.execute()) + sql = SQL.where(Generic.sql(names = delete.map(_.job_name)))))(_.execute()) } for (job <- insert) { @@ -551,7 +555,7 @@ val tables = List( Base.table, - Serial.table, + Workers.table, Progress.table, Sessions.table, Pending.table, @@ -570,7 +574,7 @@ for (table <- tables) db.using_statement(table.delete())(_.execute()) db.using_statement(Base.table.insert()) { stmt => - stmt.string(1) = build_context.uuid + stmt.string(1) = build_context.build_uuid stmt.string(2) = Isabelle_System.getenv("ML_PLATFORM") stmt.string(3) = build_context.store.options.make_prefs(Options.init(prefs = "")) stmt.execute() @@ -579,7 +583,8 @@ def update_database( db: SQL.Database, - uuid: String, + worker_uuid: String, + build_uuid: String, hostname: String, state: State ): State = { @@ -594,7 +599,7 @@ val serial0 = get_serial(db) val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0 - set_serial(db, uuid, db.now(), serial) + set_serial(db, worker_uuid, build_uuid, serial) state.set_serial(serial) } } @@ -614,6 +619,8 @@ protected val store: Sessions.Store = build_context.store protected val build_options: Options = store.options protected val build_deps: Sessions.Deps = build_context.build_deps + protected val build_uuid: String = build_context.build_uuid + protected val worker_uuid: String = UUID.random().toString /* global state: internal var vs. external database */ @@ -645,7 +652,7 @@ for (db <- _database) { _state = Build_Process.Data.update_database( - db, build_context.uuid, build_context.hostname, _state) + db, worker_uuid, build_uuid, build_context.hostname, _state) } } @@ -656,8 +663,8 @@ synchronized_database { val state1 = _state.inc_serial.progress_serial() for (db <- _database) { - Build_Process.Data.write_progress(db, state1.serial, message, build_context.uuid) - Build_Process.Data.set_serial(db, build_context.uuid, db.now(), state1.serial) + Build_Process.Data.write_progress(db, state1.serial, message, build_uuid) + Build_Process.Data.set_serial(db, worker_uuid, build_uuid, state1.serial) } body _state = state1