--- a/src/Pure/Tools/build_process.scala Tue Mar 14 18:19:10 2023 +0100
+++ b/src/Pure/Tools/build_process.scala Tue Mar 14 21:01:20 2023 +0100
@@ -105,7 +105,8 @@
val master: Boolean
) {
override def toString: String =
- "Build_Process.Context(build_uuid = " + quote(build_uuid) + ")"
+ "Build_Process.Context(build_uuid = " + quote(build_uuid) +
+ if_proper(master, ", master = true") + ")"
def build_options: Options = store.options
@@ -126,7 +127,7 @@
Data.all_tables.create_lock(db)
Data.clean_build(db)
}
- db.rebuild()
+ db.vacuum(Data.all_tables)
}
}
@@ -142,6 +143,16 @@
/** dynamic state **/
type Progress_Messages = SortedMap[Long, Progress.Message]
+ val progress_messages_empty: Progress_Messages = SortedMap.empty
+
+ case class Build(
+ build_uuid: String,
+ ml_platform: String,
+ options: String,
+ start: Date,
+ stop: Option[Date],
+ progress_stopped: Boolean
+ )
case class Worker(
worker_uuid: String,
@@ -158,7 +169,8 @@
case class Task(
name: String,
deps: List[String],
- info: JSON.Object.T = JSON.Object.empty
+ info: JSON.Object.T,
+ build_uuid: String
) {
def is_ready: Boolean = deps.isEmpty
def resolve(dep: String): Task =
@@ -171,22 +183,33 @@
build_uuid: String,
node_info: Host.Node_Info,
build: Option[Build_Job]
- ) {
+ ) extends Library.Named {
def no_build: Job = copy(build = None)
}
case class Result(
+ name: String,
+ worker_uuid: String,
+ build_uuid: String,
+ node_info: Host.Node_Info,
process_result: Process_Result,
output_shasum: SHA1.Shasum,
- node_info: Host.Node_Info,
current: Boolean
- ) {
+ ) extends Library.Named {
def ok: Boolean = process_result.ok
}
+ sealed case class Snapshot(
+ progress_messages: Progress_Messages,
+ builds: List[Build], // available build configurations
+ workers: List[Worker], // available worker processes
+ sessions: State.Sessions, // static build targets
+ pending: State.Pending, // dynamic build "queue"
+ running: State.Running, // presently running jobs
+ results: State.Results) // finished results
+
object State {
type Sessions = Map[String, Build_Job.Session_Context]
- type Workers = List[Worker]
type Pending = List[Task]
type Running = Map[String, Job]
type Results = Map[String, Result]
@@ -201,11 +224,10 @@
serial: Long = 0,
progress_seen: Long = 0,
numa_next: Int = 0,
- sessions: State.Sessions = Map.empty, // static build targets
- workers: State.Workers = Nil, // available worker processes
- pending: State.Pending = Nil, // dynamic build "queue"
- running: State.Running = Map.empty, // presently running jobs
- results: State.Results = Map.empty // finished results
+ sessions: State.Sessions = Map.empty,
+ pending: State.Pending = Nil,
+ running: State.Running = Map.empty,
+ results: State.Results = Map.empty
) {
require(serial >= 0, "serial underflow")
def inc_serial: State = copy(serial = State.inc_serial(serial))
@@ -218,8 +240,6 @@
if (message_serial > progress_seen) copy(progress_seen = message_serial)
else error("Bad serial " + message_serial + " for progress output (already seen)")
- def set_workers(new_workers: State.Workers): State = copy(workers = new_workers)
-
def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) =
if (numa_nodes.isEmpty) (None, this)
else {
@@ -247,10 +267,10 @@
def stop_running(): Unit =
for (job <- running.valuesIterator; build <- job.build) build.cancel()
- def finished_running(): List[Build_Job] =
+ def finished_running(): List[Job] =
List.from(
for (job <- running.valuesIterator; build <- job.build if build.is_finished)
- yield build)
+ yield job)
def add_running(job: Job): State =
copy(running = running + (job.name -> job))
@@ -259,14 +279,16 @@
copy(running = running - name)
def make_result(
- name: String,
+ result_name: (String, String, String),
process_result: Process_Result,
output_shasum: SHA1.Shasum,
node_info: Host.Node_Info = Host.Node_Info.none,
current: Boolean = false
): State = {
- val entry = name -> Build_Process.Result(process_result, output_shasum, node_info, current)
- copy(results = results + entry)
+ val (name, worker_uuid, build_uuid) = result_name
+ val result =
+ Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current)
+ copy(results = results + (name -> result))
}
}
@@ -278,6 +300,32 @@
def make_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table =
SQL.Table("isabelle_build" + if_proper(name, "_" + name), columns, body = body)
+ def pull_data[A <: Library.Named](
+ data_domain: Set[String],
+ data_iterator: Set[String] => Iterator[A],
+ old_data: Map[String, A]
+ ): Map[String, A] = {
+ val dom = data_domain -- old_data.keysIterator
+ val data = old_data -- old_data.keysIterator.filterNot(dom)
+ if (dom.isEmpty) data
+ else data_iterator(dom).foldLeft(data) { case (map, a) => map + (a.name -> a) }
+ }
+
+ def pull0[A <: Library.Named](
+ new_data: Map[String, A],
+ old_data: Map[String, A]
+ ): Map[String, A] = {
+ pull_data(new_data.keySet, dom => new_data.valuesIterator.filter(a => dom(a.name)), old_data)
+ }
+
+ def pull1[A <: Library.Named](
+ data_domain: Set[String],
+ data_base: Set[String] => Map[String, A],
+ old_data: Map[String, A]
+ ): Map[String, A] = {
+ pull_data(data_domain, dom => data_base(dom).valuesIterator, old_data)
+ }
+
object Generic {
val build_uuid = SQL.Column.string("build_uuid")
val worker_uuid = SQL.Column.string("worker_uuid")
@@ -286,14 +334,20 @@
def sql(
build_uuid: String = "",
worker_uuid: String = "",
- name: String = "",
names: Iterable[String] = Nil
): SQL.Source =
SQL.and(
if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)),
if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)),
- if_proper(name, Generic.name.equal(name)),
if_proper(names, Generic.name.member(names)))
+
+ def sql_where(
+ build_uuid: String = "",
+ worker_uuid: String = "",
+ names: Iterable[String] = Nil
+ ): SQL.Source = {
+ SQL.where(sql(build_uuid = build_uuid, worker_uuid = worker_uuid, names = names))
+ }
}
@@ -311,6 +365,20 @@
make_table("", List(build_uuid, ml_platform, options, start, stop, progress_stopped))
}
+ def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] =
+ db.execute_query_statement(
+ Base.table.select(sql = Generic.sql_where(build_uuid = build_uuid)),
+ List.from[Build],
+ { res =>
+ val build_uuid = res.string(Base.build_uuid)
+ val ml_platform = res.string(Base.ml_platform)
+ val options = res.string(Base.options)
+ val start = res.date(Base.start)
+ val stop = res.get_date(Base.stop)
+ val progress_stopped = res.bool(Base.progress_stopped)
+ Build(build_uuid, ml_platform, options, start, stop, progress_stopped)
+ })
+
def start_build(
db: SQL.Database,
build_uuid: String,
@@ -341,7 +409,7 @@
List.from[String], res => res.string(Base.build_uuid))
if (old.nonEmpty) {
- for (table <- List(Base.table, Sessions.table, Progress.table, Workers.table)) {
+ for (table <- build_uuid_tables) {
db.execute_statement(table.delete(sql = Generic.build_uuid.where_member(old)))
}
}
@@ -510,14 +578,18 @@
val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")")
}
+ def read_serial(db: SQL.Database): Long =
+ db.execute_query_statementO[Long](
+ Workers.table.select(List(Workers.serial_max)), _.long(Workers.serial)).getOrElse(0L)
+
def read_workers(
db: SQL.Database,
build_uuid: String = "",
worker_uuid: String = ""
- ): State.Workers = {
+ ): List[Worker] = {
db.execute_query_statement(
- Workers.table.select(sql =
- SQL.where(Generic.sql(build_uuid = build_uuid, worker_uuid = worker_uuid))),
+ Workers.table.select(
+ sql = Generic.sql_where(build_uuid = build_uuid, worker_uuid = worker_uuid)),
List.from[Worker],
{ res =>
Worker(
@@ -533,20 +605,15 @@
})
}
- def serial_max(db: SQL.Database): Long =
- db.execute_query_statementO[Long](
- Workers.table.select(List(Workers.serial_max)),
- res => res.long(Workers.serial)
- ).getOrElse(0L)
-
def start_worker(
db: SQL.Database,
worker_uuid: String,
build_uuid: String,
hostname: String,
java_pid: Long,
- java_start: Date
- ): Long = {
+ java_start: Date,
+ serial: Long
+ ): Unit = {
def err(msg: String): Nothing =
error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg))
@@ -561,7 +628,6 @@
case None => err("for unknown build process " + build_uuid)
}
- val serial = serial_max(db)
db.execute_statement(Workers.table.insert(), body =
{ stmt =>
val now = db.now()
@@ -575,7 +641,6 @@
stmt.date(8) = None
stmt.long(9) = serial
})
- serial
}
def stamp_worker(
@@ -603,8 +668,9 @@
val name = Generic.name.make_primary_key
val deps = SQL.Column.string("deps")
val info = SQL.Column.string("info")
+ val build_uuid = Generic.build_uuid
- val table = make_table("pending", List(name, deps, info))
+ val table = make_table("pending", List(name, deps, info, build_uuid))
}
def read_pending(db: SQL.Database): List[Task] =
@@ -615,7 +681,8 @@
val name = res.string(Pending.name)
val deps = res.string(Pending.deps)
val info = res.string(Pending.info)
- Task(name, split_lines(deps), info = JSON.Object.parse(info))
+ val build_uuid = res.string(Pending.build_uuid)
+ Task(name, split_lines(deps), JSON.Object.parse(info), build_uuid)
})
def update_pending(db: SQL.Database, pending: State.Pending): Boolean = {
@@ -624,15 +691,16 @@
if (delete.nonEmpty) {
db.execute_statement(
- Pending.table.delete(sql = SQL.where(Generic.sql(names = delete.map(_.name)))))
+ Pending.table.delete(sql = Generic.sql_where(names = delete.map(_.name))))
}
- for (entry <- insert) {
+ for (task <- insert) {
db.execute_statement(Pending.table.insert(), body =
{ stmt =>
- stmt.string(1) = entry.name
- stmt.string(2) = cat_lines(entry.deps)
- stmt.string(3) = JSON.Format(entry.info)
+ stmt.string(1) = task.name
+ stmt.string(2) = cat_lines(task.deps)
+ stmt.string(3) = JSON.Format(task.info)
+ stmt.string(4) = task.build_uuid
})
}
@@ -652,29 +720,29 @@
val table = make_table("running", List(name, worker_uuid, build_uuid, hostname, numa_node))
}
- def read_running(db: SQL.Database): List[Job] =
+ def read_running(db: SQL.Database): State.Running =
db.execute_query_statement(
Running.table.select(sql = SQL.order_by(List(Running.name))),
- List.from[Job],
+ Map.from[String, Job],
{ res =>
val name = res.string(Running.name)
val worker_uuid = res.string(Running.worker_uuid)
val build_uuid = res.string(Running.build_uuid)
val hostname = res.string(Running.hostname)
val numa_node = res.get_int(Running.numa_node)
- Job(name, worker_uuid, build_uuid, Host.Node_Info(hostname, numa_node), None)
+ name -> Job(name, worker_uuid, build_uuid, Host.Node_Info(hostname, numa_node), None)
}
)
def update_running(db: SQL.Database, running: State.Running): Boolean = {
- val running0 = read_running(db)
+ val running0 = read_running(db).valuesIterator.toList
val running1 = running.valuesIterator.map(_.no_build).toList
val (delete, insert) = Library.symmetric_difference(running0, running1)
if (delete.nonEmpty) {
db.execute_statement(
- Running.table.delete(sql = SQL.where(Generic.sql(names = delete.map(_.name)))))
+ Running.table.delete(sql = Generic.sql_where(names = delete.map(_.name))))
}
for (job <- insert) {
@@ -696,6 +764,8 @@
object Results {
val name = Generic.name.make_primary_key
+ val worker_uuid = Generic.worker_uuid
+ val build_uuid = Generic.build_uuid
val hostname = SQL.Column.string("hostname")
val numa_node = SQL.Column.string("numa_node")
val rc = SQL.Column.int("rc")
@@ -704,10 +774,13 @@
val timing_elapsed = SQL.Column.long("timing_elapsed")
val timing_cpu = SQL.Column.long("timing_cpu")
val timing_gc = SQL.Column.long("timing_gc")
+ val output_shasum = SQL.Column.string("output_shasum")
+ val current = SQL.Column.bool("current")
val table =
make_table("results",
- List(name, hostname, numa_node, rc, out, err, timing_elapsed, timing_cpu, timing_gc))
+ List(name, worker_uuid, build_uuid, hostname, numa_node,
+ rc, out, err, timing_elapsed, timing_cpu, timing_gc, output_shasum, current))
}
def read_results_domain(db: SQL.Database): Set[String] =
@@ -715,14 +788,18 @@
Results.table.select(List(Results.name)),
Set.from[String], res => res.string(Results.name))
- def read_results(db: SQL.Database, names: List[String] = Nil): Map[String, Build_Job.Result] =
+ def read_results(db: SQL.Database, names: Iterable[String] = Nil): State.Results =
db.execute_query_statement(
Results.table.select(sql = if_proper(names, Results.name.where_member(names))),
- Map.from[String, Build_Job.Result],
+ Map.from[String, Result],
{ res =>
val name = res.string(Results.name)
+ val worker_uuid = res.string(Results.worker_uuid)
+ val build_uuid = res.string(Results.build_uuid)
val hostname = res.string(Results.hostname)
val numa_node = res.get_int(Results.numa_node)
+ val node_info = Host.Node_Info(hostname, numa_node)
+
val rc = res.int(Results.rc)
val out = res.string(Results.out)
val err = res.string(Results.err)
@@ -731,34 +808,41 @@
Results.timing_elapsed,
Results.timing_cpu,
Results.timing_gc)
- val node_info = Host.Node_Info(hostname, numa_node)
val process_result =
Process_Result(rc,
out_lines = split_lines(out),
err_lines = split_lines(err),
timing = timing)
- name -> Build_Job.Result(node_info, process_result)
+
+ val output_shasum = SHA1.fake_shasum(res.string(Results.output_shasum))
+ val current = res.bool(Results.current)
+
+ name ->
+ Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current)
}
)
def update_results(db: SQL.Database, results: State.Results): Boolean = {
val old_results = read_results_domain(db)
- val insert = results.iterator.filterNot(p => old_results.contains(p._1)).toList
+ val insert = results.valuesIterator.filterNot(res => old_results.contains(res.name)).toList
- for ((name, result) <- insert) {
- val node_info = result.node_info
+ for (result <- insert) {
val process_result = result.process_result
db.execute_statement(Results.table.insert(), body =
{ stmt =>
- stmt.string(1) = name
- stmt.string(2) = node_info.hostname
- stmt.int(3) = node_info.numa_node
- stmt.int(4) = process_result.rc
- stmt.string(5) = cat_lines(process_result.out_lines)
- stmt.string(6) = cat_lines(process_result.err_lines)
- stmt.long(7) = process_result.timing.elapsed.ms
- stmt.long(8) = process_result.timing.cpu.ms
- stmt.long(9) = process_result.timing.gc.ms
+ stmt.string(1) = result.name
+ stmt.string(2) = result.worker_uuid
+ stmt.string(3) = result.build_uuid
+ stmt.string(4) = result.node_info.hostname
+ stmt.int(5) = result.node_info.numa_node
+ stmt.int(6) = process_result.rc
+ stmt.string(7) = cat_lines(process_result.out_lines)
+ stmt.string(8) = cat_lines(process_result.err_lines)
+ stmt.long(9) = process_result.timing.elapsed.ms
+ stmt.long(10) = process_result.timing.cpu.ms
+ stmt.long(11) = process_result.timing.gc.ms
+ stmt.string(12) = result.output_shasum.toString
+ stmt.bool(13) = result.current
})
}
@@ -779,6 +863,33 @@
Results.table,
Host.Data.Node_Info.table)
+ val build_uuid_tables =
+ all_tables.filter(table =>
+ table.columns.exists(column => column.name == Generic.build_uuid.name))
+
+ def pull_database(
+ db: SQL.Database,
+ worker_uuid: String,
+ hostname: String,
+ state: State
+ ): State = {
+ val serial_db = read_serial(db)
+ if (serial_db == state.serial) state
+ else {
+ val serial = serial_db max state.serial
+ stamp_worker(db, worker_uuid, serial)
+
+ val numa_next = Host.Data.read_numa_next(db, hostname)
+ val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions)
+ val pending = read_pending(db)
+ val running = pull0(read_running(db), state.running)
+ val results = pull1(read_results_domain(db), read_results(db, _), state.results)
+
+ state.copy(serial = serial, numa_next = numa_next, sessions = sessions,
+ pending = pending, running = running, results = results)
+ }
+ }
+
def update_database(
db: SQL.Database,
worker_uuid: String,
@@ -794,11 +905,11 @@
update_results(db, state.results),
Host.Data.update_numa_next(db, hostname, state.numa_next))
- val serial0 = serial_max(db)
+ val serial0 = state.serial
val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
stamp_worker(db, worker_uuid, serial)
- state.set_serial(serial).set_workers(read_workers(db))
+ state.set_serial(serial)
}
}
}
@@ -817,6 +928,7 @@
protected final val store: Sessions.Store = build_context.store
protected final val build_options: Options = store.options
protected final val build_deps: Sessions.Deps = build_context.build_deps
+ protected final val hostname: String = build_context.hostname
protected final val build_uuid: String = build_context.build_uuid
protected final val worker_uuid: String = UUID.random().toString
@@ -827,7 +939,7 @@
/* global state: internal var vs. external database */
- private var _state: Build_Process.State = init_state(Build_Process.State())
+ private var _state: Build_Process.State = Build_Process.State()
private val _database: Option[SQL.Database] = store.open_build_database()
@@ -838,34 +950,35 @@
_database match {
case None => body
case Some(db) =>
- @tailrec def loop(): A = {
- val sync_progress =
- db.transaction_lock(Build_Process.Data.all_tables) {
- val (messages, sync) =
- Build_Process.Data.sync_progress(
- db, _state.progress_seen, build_uuid, build_progress)
- if (sync) Left(body) else Right(messages)
- }
- sync_progress match {
+ def pull_database(): Unit = {
+ _state = Build_Process.Data.pull_database(db, worker_uuid, hostname, _state)
+ }
+
+ def sync_database(): Unit = {
+ _state =
+ Build_Process.Data.update_database(db, worker_uuid, build_uuid, hostname, _state)
+ }
+
+ def attempt(): Either[A, Build_Process.Progress_Messages] = {
+ val (messages, sync) =
+ Build_Process.Data.sync_progress(
+ db, _state.progress_seen, build_uuid, build_progress)
+ if (sync) Left { pull_database(); val res = body; sync_database(); res }
+ else Right(messages)
+ }
+
+ @tailrec def attempts(): A = {
+ db.transaction_lock(Build_Process.Data.all_tables) { attempt() } match {
case Left(res) => res
case Right(messages) =>
for ((message_serial, message) <- messages) {
_state = _state.progress_serial(message_serial = message_serial)
if (build_progress.do_output(message)) build_progress.output(message)
}
- loop()
+ attempts()
}
}
- loop()
- }
- }
-
- private def sync_database(): Unit =
- synchronized_database {
- for (db <- _database) {
- _state =
- Build_Process.Data.update_database(
- db, worker_uuid, build_uuid, build_context.hostname, _state)
+ attempts()
}
}
@@ -917,7 +1030,7 @@
for {
(name, session_context) <- build_context.sessions.iterator
if !old_pending(name)
- } yield Build_Process.Task(name, session_context.deps))
+ } yield Build_Process.Task(name, session_context.deps, JSON.Object.empty, build_uuid))
val pending1 = new_pending ::: state.pending
state.copy(sessions = sessions1, pending = pending1)
@@ -936,9 +1049,7 @@
for (a <- build_context.sessions(session_name).ancestors) yield state.results(a)
val input_shasum =
- if (ancestor_results.isEmpty) {
- SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE")))
- }
+ if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum()
else SHA1.flat_shasum(ancestor_results.map(_.output_shasum))
val store_heap = build_context.store_heap(session_name)
@@ -952,26 +1063,28 @@
val all_current = current && ancestor_results.forall(_.current)
+ val result_name = (session_name, worker_uuid, build_uuid)
+
if (all_current) {
state
.remove_pending(session_name)
- .make_result(session_name, Process_Result.ok, output_shasum, current = true)
+ .make_result(result_name, Process_Result.ok, output_shasum, current = true)
}
else if (build_context.no_build) {
progress.echo("Skipping " + session_name + " ...", verbose = true)
state.
remove_pending(session_name).
- make_result(session_name, Process_Result.error, output_shasum)
+ make_result(result_name, Process_Result.error, output_shasum)
}
else if (progress.stopped || !ancestor_results.forall(_.ok)) {
progress.echo(session_name + " CANCELLED")
state
.remove_pending(session_name)
- .make_result(session_name, Process_Result.undefined, output_shasum)
+ .make_result(result_name, Process_Result.undefined, output_shasum)
}
else {
val (numa_node, state1) = state.next_numa_node(build_context.numa_nodes)
- val node_info = Host.Node_Info(build_context.hostname, numa_node)
+ val node_info = Host.Node_Info(hostname, numa_node)
progress.echo(
(if (store_heap) "Building " else "Running ") + session_name +
@@ -992,32 +1105,34 @@
/* build process roles */
- final def start_build(): Unit = synchronized_database {
+ final def is_session_name(job_name: String): Boolean =
+ !Long_Name.is_qualified(job_name)
+
+ protected final def start_build(): Unit = synchronized_database {
for (db <- _database) {
Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
build_context.sessions_structure.session_prefs, progress.stopped)
}
}
- final def stop_build(): Unit = synchronized_database {
+ protected final def stop_build(): Unit = synchronized_database {
for (db <- _database) {
Build_Process.Data.stop_build(db, build_uuid)
}
}
- final def start_worker(): Unit = synchronized_database {
+ protected final def start_worker(): Unit = synchronized_database {
for (db <- _database) {
val java = ProcessHandle.current()
val java_pid = java.pid
val java_start = Date.instant(java.info.startInstant.get)
- val serial =
- Build_Process.Data.start_worker(
- db, worker_uuid, build_uuid, build_context.hostname, java_pid, java_start)
- _state = _state.set_serial(serial)
+ _state = _state.inc_serial
+ Build_Process.Data.start_worker(
+ db, worker_uuid, build_uuid, hostname, java_pid, java_start, _state.serial)
}
}
- final def stop_worker(): Unit = synchronized_database {
+ protected final def stop_worker(): Unit = synchronized_database {
for (db <- _database) {
Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true)
}
@@ -1027,6 +1142,8 @@
/* run */
def run(): Map[String, Process_Result] = {
+ if (build_context.master) synchronized_database { _state = init_state(_state) }
+
def finished(): Boolean = synchronized_database { _state.finished }
def sleep(): Unit =
@@ -1037,7 +1154,7 @@
def start_job(): Boolean = synchronized_database {
next_job(_state) match {
case Some(name) =>
- if (Build_Job.is_session_name(name)) {
+ if (is_session_name(name)) {
_state = start_session(_state, name)
true
}
@@ -1063,20 +1180,17 @@
synchronized_database {
if (progress.stopped) _state.stop_running()
- for (build <- _state.finished_running()) {
- val job_name = build.job_name
- val (process_result, output_shasum) = build.join
+ for (job <- _state.finished_running()) {
+ val result_name = (job.name, worker_uuid, build_uuid)
+ val (process_result, output_shasum) = job.build.get.join
_state = _state.
- remove_pending(job_name).
- remove_running(job_name).
- make_result(job_name, process_result, output_shasum, node_info = build.node_info)
+ remove_pending(job.name).
+ remove_running(job.name).
+ make_result(result_name, process_result, output_shasum, node_info = job.node_info)
}
}
- if (!start_job()) {
- sync_database()
- sleep()
- }
+ if (!start_job()) sleep()
}
}
finally {
@@ -1089,4 +1203,26 @@
}
}
}
+
+
+ /* snapshot */
+
+ def snapshot(): Build_Process.Snapshot = synchronized_database {
+ val (progress_messages, builds, workers) =
+ _database match {
+ case None => (Build_Process.progress_messages_empty, Nil, Nil)
+ case Some(db) =>
+ (Build_Process.Data.read_progress(db),
+ Build_Process.Data.read_builds(db),
+ Build_Process.Data.read_workers(db))
+ }
+ Build_Process.Snapshot(
+ progress_messages = progress_messages,
+ builds = builds,
+ workers = workers,
+ sessions = _state.sessions,
+ pending = _state.pending,
+ running = _state.running,
+ results = _state.results)
+ }
}