--- a/src/Pure/Build/build_process.scala Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Build/build_process.scala Mon Mar 11 08:46:20 2024 +0100
@@ -18,6 +18,7 @@
sealed case class Build(
build_uuid: String, // Database_Progress.context_uuid
+ build_id: Long,
ml_platform: String,
options: String,
start: Date,
@@ -25,6 +26,7 @@
sessions: List[String]
) {
def active: Boolean = stop.isEmpty
+ def active_build_uuid: Option[String] = if (active) Some(build_uuid) else None
def print: String =
build_uuid + " (platform: " + ml_platform + ", start: " + Build_Log.print_date(start) +
@@ -40,14 +42,22 @@
serial: Long
)
+ object Task {
+ type Entry = (String, Task)
+ def entry(name: String, deps: List[String], build_uuid: String): Entry =
+ name -> Task(name, deps, build_uuid)
+ def entry(session: Build_Job.Session_Context, build_context: isabelle.Build.Context): Entry =
+ entry(session.name, session.deps, build_context.build_uuid)
+ }
+
sealed case class Task(
name: String,
deps: List[String],
build_uuid: String
- ) {
+ ) extends Library.Named {
def is_ready: Boolean = deps.isEmpty
- def resolve(dep: String): Task =
- if (deps.contains(dep)) copy(deps = deps.filterNot(_ == dep)) else this
+ def resolve(dep: String): Option[Task] =
+ if (deps.contains(dep)) Some(copy(deps = deps.filterNot(_ == dep))) else None
}
sealed case class Job(
@@ -89,6 +99,9 @@
def iterator: Iterator[Build_Job.Session_Context] =
for (name <- graph.topological_order.iterator) yield apply(name)
+ def data: Library.Update.Data[Build_Job.Session_Context] =
+ Map.from(for ((_, (session, _)) <- graph.iterator) yield session.name -> session)
+
def make(new_graph: Sessions.Graph): Sessions =
if (graph == new_graph) this
else {
@@ -198,31 +211,44 @@
results: State.Results) // finished results
object State {
- type Pending = List[Task]
- type Running = Map[String, Job]
- type Results = Map[String, Result]
+ def inc_serial(serial: Long) = {
+ require(serial < Long.MaxValue, "serial overflow")
+ serial + 1
+ }
+
+ type Pending = Library.Update.Data[Task]
+ type Running = Library.Update.Data[Job]
+ type Results = Library.Update.Data[Result]
}
sealed case class State(
serial: Long = 0,
numa_nodes: List[Int] = Nil,
sessions: Sessions = Sessions.empty,
- pending: State.Pending = Nil,
+ pending: State.Pending = Map.empty,
running: State.Running = Map.empty,
results: State.Results = Map.empty
) {
require(serial >= 0, "serial underflow")
- def inc_serial: State = {
- require(serial < Long.MaxValue, "serial overflow")
- copy(serial = serial + 1)
- }
+
+ def next_serial: Long = State.inc_serial(serial)
+ def inc_serial: State = copy(serial = next_serial)
+
+ def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name)
+ def next_ready: List[Task] = ready.filter(entry => !is_running(entry.name))
- def ready: State.Pending = pending.filter(_.is_ready)
- def next_ready: State.Pending = ready.filter(entry => !is_running(entry.name))
-
- def remove_pending(name: String): State =
- copy(pending = pending.flatMap(
- entry => if (entry.name == name) None else Some(entry.resolve(name))))
+ def remove_pending(a: String): State =
+ copy(pending =
+ pending.foldLeft(pending) {
+ case (map, (b, task)) =>
+ if (a == b) map - a
+ else {
+ task.resolve(a) match {
+ case None => map
+ case Some(task1) => map + (b -> task1)
+ }
+ }
+ })
def is_running(name: String): Boolean = running.isDefinedAt(name)
@@ -263,6 +289,20 @@
object private_data extends SQL.Data("isabelle_build") {
val database: Path = Path.explode("$ISABELLE_HOME_USER/build.db")
+ override lazy val tables: SQL.Tables =
+ SQL.Tables(
+ Updates.table,
+ Sessions.table,
+ Pending.table,
+ Running.table,
+ Results.table,
+ Base.table,
+ Workers.table)
+
+ private lazy val build_uuid_tables = tables.filter(Generic.build_uuid_table)
+ private lazy val build_id_tables =
+ tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t))
+
def pull[A <: Library.Named](
data_domain: Set[String],
data_iterator: Set[String] => Iterator[A],
@@ -290,40 +330,100 @@
}
object Generic {
+ val build_id = SQL.Column.long("build_id")
val build_uuid = SQL.Column.string("build_uuid")
val worker_uuid = SQL.Column.string("worker_uuid")
val name = SQL.Column.string("name")
+ def build_id_table(table: SQL.Table): Boolean =
+ table.columns.exists(column => column.name == build_id.name)
+
+ def build_uuid_table(table: SQL.Table): Boolean =
+ table.columns.exists(column => column.name == build_uuid.name)
+
def sql(
+ build_id: Long = 0,
build_uuid: String = "",
worker_uuid: String = "",
names: Iterable[String] = Nil
): SQL.Source =
SQL.and(
+ if_proper(build_id > 0, Generic.build_id.equal(build_id)),
if_proper(build_uuid, Generic.build_uuid.equal(build_uuid)),
if_proper(worker_uuid, Generic.worker_uuid.equal(worker_uuid)),
if_proper(names, Generic.name.member(names)))
def sql_where(
+ build_id: Long = 0,
build_uuid: String = "",
worker_uuid: String = "",
names: Iterable[String] = Nil
): SQL.Source = {
- SQL.where(sql(build_uuid = build_uuid, worker_uuid = worker_uuid, names = names))
+ SQL.where(sql(
+ build_id = build_id,
+ build_uuid = build_uuid,
+ worker_uuid = worker_uuid,
+ names = names))
}
}
+ /* recorded updates */
+
+ object Updates {
+ val build_id = Generic.build_id.make_primary_key
+ val serial = SQL.Column.long("serial").make_primary_key
+ val kind = SQL.Column.int("kind").make_primary_key
+ val name = Generic.name.make_primary_key
+
+ val table = make_table(List(build_id, serial, kind, name), name = "updates")
+ }
+
+ def write_updates(
+ db: SQL.Database,
+ build_id: Long,
+ serial: Long,
+ updates: List[Library.Update]
+ ): Unit =
+ db.execute_batch_statement(db.insert_permissive(Updates.table), batch =
+ for (update <- updates.iterator; kind = update.kind; name <- update.domain.iterator)
+ yield { (stmt: SQL.Statement) =>
+ require(build_id > 0 && serial > 0 && kind > 0 && name.nonEmpty,
+ "Bad database update: build_id = " + build_id + ", serial = " + serial +
+ ", kind = " + kind + ", name = " + quote(name))
+ stmt.long(1) = build_id
+ stmt.long(2) = serial
+ stmt.int(3) = kind
+ stmt.string(4) = name
+ })
+
+
/* base table */
object Base {
val build_uuid = Generic.build_uuid.make_primary_key
+ val build_id = Generic.build_id.make_primary_key
val ml_platform = SQL.Column.string("ml_platform")
val options = SQL.Column.string("options")
val start = SQL.Column.date("start")
val stop = SQL.Column.date("stop")
- val table = make_table(List(build_uuid, ml_platform, options, start, stop))
+ val table = make_table(List(build_uuid, build_id, ml_platform, options, start, stop))
+ }
+
+ def read_build_ids(db: SQL.Database, build_uuids: List[String]): List[Long] =
+ db.execute_query_statement(
+ Base.table.select(List(Base.build_id),
+ sql = if_proper(build_uuids, Base.build_uuid.where_member(build_uuids))),
+ List.from[Long], res => res.long(Base.build_id))
+
+ def get_build_id(db: SQL.Database, build_uuid: String): Long = {
+ read_build_ids(db, build_uuids = List(build_uuid)) match {
+ case build_id :: _ => build_id
+ case _ =>
+ db.execute_query_statementO(
+ Base.table.select(List(Base.build_id.max)), _.long(Base.build_id)).getOrElse(0L) + 1L
+ }
}
def read_builds(db: SQL.Database): List[Build] = {
@@ -331,11 +431,12 @@
db.execute_query_statement(Base.table.select(), List.from[Build],
{ res =>
val build_uuid = res.string(Base.build_uuid)
+ val build_id = res.long(Base.build_id)
val ml_platform = res.string(Base.ml_platform)
val options = res.string(Base.options)
val start = res.date(Base.start)
val stop = res.get_date(Base.stop)
- Build(build_uuid, ml_platform, options, start, stop, Nil)
+ Build(build_uuid, build_id, ml_platform, options, start, stop, Nil)
})
for (build <- builds.sortBy(_.start)(Date.Ordering)) yield {
@@ -344,14 +445,21 @@
}
}
- def remove_builds(db: SQL.Database, remove: List[String]): Unit =
- if (remove.nonEmpty) {
- val sql = Generic.build_uuid.where_member(remove)
- db.execute_statement(SQL.MULTI(build_uuid_tables.map(_.delete(sql = sql))))
+ def remove_builds(db: SQL.Database, build_uuids: List[String]): Unit =
+ if (build_uuids.nonEmpty) {
+ val build_ids = read_build_ids(db, build_uuids = build_uuids)
+
+ val sql1 = Generic.build_uuid.where_member(build_uuids)
+ val sql2 = Generic.build_id.where_member_long(build_ids)
+ db.execute_statement(
+ SQL.MULTI(
+ build_uuid_tables.map(_.delete(sql = sql1)) ++
+ build_id_tables.map(_.delete(sql = sql2))))
}
def start_build(
db: SQL.Database,
+ build_id: Long,
build_uuid: String,
ml_platform: String,
options: String,
@@ -360,10 +468,11 @@
db.execute_statement(Base.table.insert(), body =
{ stmt =>
stmt.string(1) = build_uuid
- stmt.string(2) = ml_platform
- stmt.string(3) = options
- stmt.date(4) = start
- stmt.date(5) = None
+ stmt.long(2) = build_id
+ stmt.string(3) = ml_platform
+ stmt.string(4) = options
+ stmt.date(5) = start
+ stmt.date(6) = None
})
}
@@ -400,6 +509,8 @@
List(name, deps, ancestors, options, sources, timeout,
old_time, old_command_timings, build_uuid),
name = "sessions")
+
+ lazy val table_index: Int = tables.index(table)
}
def read_sessions_domain(db: SQL.Database, build_uuid: String = ""): Set[String] =
@@ -440,12 +551,20 @@
db: SQL.Database,
sessions: Build_Process.Sessions,
old_sessions: Build_Process.Sessions
- ): Boolean = {
- val insert = sessions.iterator.filterNot(s => old_sessions.defined(s.name)).toList
+ ): Library.Update = {
+ val update =
+ if (old_sessions.eq(sessions)) Library.Update.empty
+ else Library.Update.make(old_sessions.data, sessions.data, kind = Sessions.table_index)
- if (insert.nonEmpty) {
+ if (update.deletes) {
+ db.execute_statement(
+ Sessions.table.delete(sql = Generic.sql_where(names = update.delete)))
+ }
+
+ if (update.inserts) {
db.execute_batch_statement(Sessions.table.insert(), batch =
- for (session <- insert) yield { (stmt: SQL.Statement) =>
+ for (name <- update.insert) yield { (stmt: SQL.Statement) =>
+ val session = sessions(name)
stmt.string(1) = session.name
stmt.string(2) = cat_lines(session.deps)
stmt.string(3) = cat_lines(session.ancestors)
@@ -458,7 +577,7 @@
})
}
- insert.nonEmpty
+ update
}
@@ -474,6 +593,8 @@
val table =
make_table(List(worker_uuid, build_uuid, start, stamp, stop, serial), name = "workers")
+
+ lazy val table_index: Int = tables.index(table)
}
def read_serial(db: SQL.Database): Long =
@@ -563,41 +684,44 @@
val build_uuid = Generic.build_uuid
val table = make_table(List(name, deps, build_uuid), name = "pending")
+
+ lazy val table_index: Int = tables.index(table)
}
- def read_pending(db: SQL.Database): List[Task] =
+ def read_pending(db: SQL.Database): State.Pending =
db.execute_query_statement(
- Pending.table.select(sql = SQL.order_by(List(Pending.name))),
- List.from[Task],
+ Pending.table.select(),
+ Map.from[String, Task],
{ res =>
val name = res.string(Pending.name)
val deps = res.string(Pending.deps)
val build_uuid = res.string(Pending.build_uuid)
- Task(name, split_lines(deps), build_uuid)
+ Task.entry(name, split_lines(deps), build_uuid)
})
def update_pending(
db: SQL.Database,
pending: State.Pending,
old_pending: State.Pending
- ): Boolean = {
- val (delete, insert) = Library.symmetric_difference(old_pending, pending)
+ ): Library.Update = {
+ val update = Library.Update.make(old_pending, pending, kind = Pending.table_index)
- if (delete.nonEmpty) {
+ if (update.deletes) {
db.execute_statement(
- Pending.table.delete(sql = Generic.sql_where(names = delete.map(_.name))))
+ Pending.table.delete(sql = Generic.sql_where(names = update.delete)))
}
- if (insert.nonEmpty) {
+ if (update.inserts) {
db.execute_batch_statement(Pending.table.insert(), batch =
- for (task <- insert) yield { (stmt: SQL.Statement) =>
+ for (name <- update.insert) yield { (stmt: SQL.Statement) =>
+ val task = pending(name)
stmt.string(1) = task.name
stmt.string(2) = cat_lines(task.deps)
stmt.string(3) = task.build_uuid
})
}
- delete.nonEmpty || insert.nonEmpty
+ update
}
@@ -616,11 +740,13 @@
make_table(
List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus, start_date),
name = "running")
+
+ lazy val table_index: Int = tables.index(table)
}
def read_running(db: SQL.Database): State.Running =
db.execute_query_statement(
- Running.table.select(sql = SQL.order_by(List(Running.name))),
+ Running.table.select(),
Map.from[String, Job],
{ res =>
val name = res.string(Running.name)
@@ -640,19 +766,18 @@
db: SQL.Database,
running: State.Running,
old_running: State.Running
- ): Boolean = {
- val running0 = old_running.valuesIterator.toList
- val running1 = running.valuesIterator.toList
- val (delete, insert) = Library.symmetric_difference(running0, running1)
+ ): Library.Update = {
+ val update = Library.Update.make(old_running, running, kind = Running.table_index)
- if (delete.nonEmpty) {
+ if (update.deletes) {
db.execute_statement(
- Running.table.delete(sql = Generic.sql_where(names = delete.map(_.name))))
+ Running.table.delete(sql = Generic.sql_where(names = update.delete)))
}
- if (insert.nonEmpty) {
+ if (update.inserts) {
db.execute_batch_statement(Running.table.insert(), batch =
- for (job <- insert) yield { (stmt: SQL.Statement) =>
+ for (name <- update.insert) yield { (stmt: SQL.Statement) =>
+ val job = running(name)
stmt.string(1) = job.name
stmt.string(2) = job.worker_uuid
stmt.string(3) = job.build_uuid
@@ -663,7 +788,7 @@
})
}
- delete.nonEmpty || insert.nonEmpty
+ update
}
@@ -690,6 +815,8 @@
List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus,
rc, out, err, timing_elapsed, timing_cpu, timing_gc, output_shasum, current),
name = "results")
+
+ lazy val table_index: Int = tables.index(table)
}
def read_results_domain(db: SQL.Database): Set[String] =
@@ -736,13 +863,18 @@
db: SQL.Database,
results: State.Results,
old_results: State.Results
- ): Boolean = {
- val insert =
- results.valuesIterator.filterNot(res => old_results.isDefinedAt(res.name)).toList
+ ): Library.Update = {
+ val update = Library.Update.make(old_results, results, kind = Results.table_index)
- if (insert.nonEmpty) {
+ if (update.deletes) {
+ db.execute_statement(
+ Results.table.delete(sql = Generic.sql_where(names = update.delete)))
+ }
+
+ if (update.inserts) {
db.execute_batch_statement(Results.table.insert(), batch =
- for (result <- insert) yield { (stmt: SQL.Statement) =>
+ for (name <- update.insert) yield { (stmt: SQL.Statement) =>
+ val result = results(name)
val process_result = result.process_result
stmt.string(1) = result.name
stmt.string(2) = result.worker_uuid
@@ -761,25 +893,12 @@
})
}
- insert.nonEmpty
+ update
}
/* collective operations */
- override val tables: SQL.Tables =
- SQL.Tables(
- Base.table,
- Workers.table,
- Sessions.table,
- Pending.table,
- Running.table,
- Results.table)
-
- private val build_uuid_tables =
- tables.filter(table =>
- table.columns.exists(column => column.name == Generic.build_uuid.name))
-
def pull_database(db: SQL.Database, worker_uuid: String, state: State): State = {
val serial_db = read_serial(db)
if (serial_db == state.serial) state
@@ -799,21 +918,26 @@
def update_database(
db: SQL.Database,
+ build_id: Long,
worker_uuid: String,
state: State,
old_state: State
): State = {
- val changed =
+ val updates =
List(
update_sessions(db, state.sessions, old_state.sessions),
update_pending(db, state.pending, old_state.pending),
update_running(db, state.running, old_state.running),
- update_results(db, state.results, old_state.results))
+ update_results(db, state.results, old_state.results)
+ ).filter(_.defined)
- val state1 = if (changed.exists(identity)) state.inc_serial else state
- if (state1.serial != state.serial) stamp_worker(db, worker_uuid, state1.serial)
-
- state1
+ if (updates.nonEmpty) {
+ val serial = state.next_serial
+ write_updates(db, build_id, serial, updates)
+ stamp_worker(db, worker_uuid, serial)
+ state.copy(serial = serial)
+ }
+ else state
}
}
@@ -821,6 +945,21 @@
private_data.transaction_lock(db, create = true, label = "Build_Process.read_builds") {
private_data.read_builds(db)
}
+
+ def init_build(
+ db: SQL.Database,
+ build_context: isabelle.Build.Context,
+ build_start: Date
+ ): Long =
+ private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") {
+ val build_uuid = build_context.build_uuid
+ val build_id = private_data.get_build_id(db, build_uuid)
+ if (build_context.master) {
+ private_data.start_build(db, build_id, build_uuid, build_context.ml_platform,
+ build_context.sessions_structure.session_prefs, build_start)
+ }
+ build_id
+ }
}
@@ -915,9 +1054,15 @@
}
}
+ protected val log: Logger = Logger.make_system_log(progress, build_options)
+
protected val build_start: Date = build_context.build_start getOrElse progress.now()
- protected val log: Logger = Logger.make_system_log(progress, build_options)
+ protected val build_id: Long =
+ _build_database match {
+ case None => 0L
+ case Some(db) => Build_Process.init_build(db, build_context, build_start)
+ }
protected def open_build_cluster(): Build_Cluster =
Build_Cluster.make(build_context, progress = build_progress).open()
@@ -955,7 +1100,9 @@
val old_state = Build_Process.private_data.pull_database(db, worker_uuid, _state)
_state = old_state
val res = body
- _state = Build_Process.private_data.update_database(db, worker_uuid, _state, old_state)
+ _state =
+ Build_Process.private_data.update_database(
+ db, build_id, worker_uuid, _state, old_state)
res
}
}
@@ -964,19 +1111,6 @@
/* policy operations */
- protected def init_state(state: Build_Process.State): Build_Process.State = {
- val sessions1 = state.sessions.init(build_context, _database_server, progress = build_progress)
-
- val old_pending = state.pending.iterator.map(_.name).toSet
- val new_pending =
- List.from(
- for (session <- sessions1.iterator if !old_pending(session.name))
- yield Build_Process.Task(session.name, session.deps, build_uuid))
- val pending1 = new_pending ::: state.pending
-
- state.copy(sessions = sessions1, pending = pending1)
- }
-
protected def next_jobs(state: Build_Process.State): List[String] = {
val limit = {
if (progress.stopped) { if (build_context.master) Int.MaxValue else 0 }
@@ -1088,13 +1222,6 @@
final def is_session_name(job_name: String): Boolean =
!Long_Name.is_qualified(job_name)
- protected final def start_build(): Unit = synchronized_database("Build_Process.start_build") {
- for (db <- _build_database) {
- Build_Process.private_data.start_build(db, build_uuid, build_context.ml_platform,
- build_context.sessions_structure.session_prefs, build_start)
- }
- }
-
protected final def stop_build(): Unit = synchronized_database("Build_Process.stop_build") {
for (db <- _build_database) {
Build_Process.private_data.stop_build(db, build_uuid)
@@ -1134,6 +1261,23 @@
protected def sleep(): Unit =
Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() }
+ protected def init_unsynchronized(): Unit = {
+ if (build_context.master) {
+ val sessions1 =
+ _state.sessions.init(build_context, _database_server, progress = build_progress)
+ val pending1 =
+ sessions1.iterator.foldLeft(_state.pending) {
+ case (map, session) =>
+ if (map.isDefinedAt(session.name)) map
+ else map + Build_Process.Task.entry(session, build_context)
+ }
+ _state = _state.copy(sessions = sessions1, pending = pending1)
+ }
+
+ val numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling)
+ _state = _state.copy(numa_nodes = numa_nodes)
+ }
+
protected def main_unsynchronized(): Unit = {
for (job <- _state.build_running.filter(_.is_finished)) {
_state = _state.remove_running(job.name)
@@ -1165,19 +1309,16 @@
def run(): Build.Results = {
val vacuous =
synchronized_database("Build_Process.init") {
- if (build_context.master) {
- _build_cluster.init()
- _state = init_state(_state)
- }
- _state = _state.copy(numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling))
+ _build_cluster.init()
+ init_unsynchronized()
build_context.master && _state.pending.isEmpty
}
if (vacuous) {
progress.echo_warning("Nothing to build")
+ if (build_context.master) stop_build()
Build.Results(build_context)
}
else {
- if (build_context.master) start_build()
start_worker()
_build_cluster.start()
--- a/src/Pure/Build/build_schedule.scala Mon Mar 11 08:45:55 2024 +0100
+++ b/src/Pure/Build/build_schedule.scala Mon Mar 11 08:46:20 2024 +0100
@@ -469,11 +469,9 @@
serial: Long = 0,
) {
require(serial >= 0, "serial underflow")
- def inc_serial: Schedule = {
- require(serial < Long.MaxValue, "serial overflow")
- copy(serial = serial + 1)
- }
-
+
+ def next_serial: Long = Build_Process.State.inc_serial(serial)
+
def end: Date =
if (graph.is_empty) start
else graph.maximals.map(graph.get_node).map(_.end).maxBy(_.unix_epoch)
@@ -503,7 +501,6 @@
def update(state: Build_Process.State): Schedule = {
val start1 = Date.now()
- val pending = state.pending.map(_.name).toSet
def shift_elapsed(graph: Schedule.Graph, name: String): Schedule.Graph =
graph.map_node(name, { node =>
@@ -517,7 +514,8 @@
node.copy(start = starts.max(Date.Ordering))
})
- val graph0 = state.running.keys.foldLeft(graph.restrict(pending.contains))(shift_elapsed)
+ val graph0 =
+ state.running.keys.foldLeft(graph.restrict(state.pending.isDefinedAt))(shift_elapsed)
val graph1 = graph0.topological_order.foldLeft(graph0)(shift_starts)
copy(start = start1, graph = graph1)
@@ -859,7 +857,8 @@
_schedule = old_schedule
val res = body
_state =
- Build_Process.private_data.update_database(db, worker_uuid, _state, old_state)
+ Build_Process.private_data.update_database(
+ db, build_id, worker_uuid, _state, old_state)
_schedule = Build_Schedule.private_data.update_schedule(db, _schedule, old_schedule)
res
}
@@ -1063,6 +1062,12 @@
object private_data extends SQL.Data("isabelle_build") {
import Build_Process.private_data.{Base, Generic}
+ override lazy val tables: SQL.Tables =
+ SQL.Tables(Schedules.table, Nodes.table)
+
+ lazy val all_tables: SQL.Tables =
+ SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list)
+
/* schedule */
@@ -1081,10 +1086,10 @@
SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))),
_.long(Schedules.serial)).getOrElse(0L)
- def read_scheduled_builds_domain(db: SQL.Database): List[String] =
+ def read_scheduled_builds_domain(db: SQL.Database): Map[String, Unit] =
db.execute_query_statement(
Schedules.table.select(List(Schedules.build_uuid)),
- List.from[String], res => res.string(Schedules.build_uuid))
+ Map.from[String, Unit], res => res.string(Schedules.build_uuid) -> ())
def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = {
val schedules =
@@ -1191,7 +1196,7 @@
schedule.graph != old_schedule.graph
val schedule1 =
- if (changed) schedule.copy(serial = old_schedule.serial).inc_serial else schedule
+ if (changed) schedule.copy(serial = old_schedule.next_serial) else schedule
if (schedule1.serial != schedule.serial) write_schedule(db, schedule1)
schedule1
@@ -1207,18 +1212,12 @@
val running_builds_domain =
db.execute_query_statement(
Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)),
- List.from[String], res => res.string(Base.build_uuid))
-
- val (remove, _) =
- Library.symmetric_difference(read_scheduled_builds_domain(db), running_builds_domain)
+ Map.from[String, Unit], res => res.string(Base.build_uuid) -> ())
- remove_schedules(db, remove)
- }
+ val update = Library.Update.make(read_scheduled_builds_domain(db), running_builds_domain)
- override val tables: SQL.Tables = SQL.Tables(Schedules.table, Nodes.table)
-
- val all_tables: SQL.Tables =
- SQL.Tables.list(Build_Process.private_data.tables.list ::: tables.list)
+ remove_schedules(db, update.delete)
+ }
}
@@ -1329,11 +1328,10 @@
val timing_data = Timing_Data.load(host_infos, log_database)
val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress)
- def task(session: Build_Job.Session_Context): Build_Process.Task =
- Build_Process.Task(session.name, session.deps, build_context.build_uuid)
val build_state =
- Build_Process.State(sessions = sessions, pending = sessions.iterator.map(task).toList)
+ Build_Process.State(sessions = sessions,
+ pending = Map.from(sessions.iterator.map(Build_Process.Task.entry(_, build_context))))
val scheduler = Build_Engine.scheduler(timing_data, build_context)
def schedule_msg(res: Exn.Result[Schedule]): String =