--- a/src/Pure/Tools/build_schedule.scala Thu Dec 07 11:28:12 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala Thu Dec 07 11:34:01 2023 +0100
@@ -780,12 +780,49 @@
}
catch { case exn: Throwable => close(); throw exn }
+ private val _build_database: Option[SQL.Database] =
+ try {
+ for (db <- store.maybe_open_build_database(server = server)) yield {
+ if (build_context.master) {
+ Build_Schedule.private_data.transaction_lock(
+ db,
+ create = true,
+ label = "Build_Schedule.build_database"
+ ) { Build_Schedule.private_data.clean_build_schedules(db) }
+ db.vacuum(Build_Schedule.private_data.tables.list)
+ }
+ db
+ }
+ }
+ catch { case exn: Throwable => close(); throw exn }
+
override def close(): Unit = {
super.close()
Option(_log_database).foreach(_.close())
+ Option(_build_database).flatten.foreach(_.close())
}
+ /* global state: internal var vs. external database */
+
+ private var _schedule = Schedule.init(build_uuid)
+
+ override protected def synchronized_database[A](label: String)(body: => A): A =
+ super.synchronized_database(label) {
+ _build_database match {
+ case None => body
+ case Some(db) =>
+ Build_Schedule.private_data.transaction_lock(db, label = label) {
+ val old_schedule = Build_Schedule.private_data.pull_schedule(db, _schedule)
+ _schedule = old_schedule
+ val res = body
+ Build_Schedule.private_data.update_schedule(db, _schedule)
+ res
+ }
+ }
+ }
+
+
/* previous results via build log */
override def open_build_cluster(): Build_Cluster = {
@@ -866,8 +903,6 @@
/* build process */
- private var _schedule = Schedule.init(build_uuid)
-
override def next_node_info(state: Build_Process.State, session_name: String): Node_Info =
_schedule.graph.get_node(session_name).node_info
@@ -902,6 +937,7 @@
else {
val current = state.next_ready.filter(task => is_current(state, task.name))
if (current.nonEmpty) current.map(_.name).take(finalize_limit)
+ else if (!build_context.master) Nil
else {
val start = Time.now()
val schedule = scheduler.build_schedule(state)
@@ -924,6 +960,144 @@
}
}
+
+ /** SQL data model of build schedule, extending isabelle_build database */
+
+ object private_data extends SQL.Data("isabelle_build") {
+ import Build_Process.private_data.{Base, Generic}
+
+
+ /* schedule */
+
+ object Schedules {
+ val build_uuid = Generic.build_uuid.make_primary_key
+ val generator = SQL.Column.string("generator")
+ val start = SQL.Column.date("start")
+
+ val table = make_table(List(build_uuid, generator, start), name = "schedules")
+ }
+
+ def read_scheduled_builds_domain(db: SQL.Database): List[String] =
+ db.execute_query_statement(
+ Schedules.table.select(List(Schedules.build_uuid)),
+ List.from[String], res => res.string(Schedules.build_uuid))
+
+ def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = {
+ val schedules =
+ db.execute_query_statement(Schedules.table.select(sql =
+ SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))),
+ List.from[Schedule],
+ { res =>
+ val build_uuid = res.string(Schedules.build_uuid)
+ val generator = res.string(Schedules.generator)
+ val start = res.date(Schedules.start)
+ Schedule(build_uuid, generator, start, Graph.empty)
+ })
+
+ for (schedule <- schedules.sortBy(_.start)(Date.Ordering)) yield {
+ val nodes = private_data.read_nodes(db, build_uuid = schedule.build_uuid)
+ schedule.copy(graph = Graph.make(nodes))
+ }
+ }
+
+ def update_schedule(db: SQL.Database, schedule: Schedule): Unit = {
+ db.execute_statement(
+ Schedules.table.delete(Schedules.build_uuid.where_equal(schedule.build_uuid)))
+ db.execute_statement(Schedules.table.insert(), { stmt =>
+ stmt.string(1) = schedule.build_uuid
+ stmt.string(2) = schedule.generator
+ stmt.date(3) = schedule.start
+ })
+ update_nodes(db, schedule.build_uuid, schedule.graph.dest)
+ }
+
+
+ /* nodes */
+
+ object Nodes {
+ val build_uuid = Generic.build_uuid.make_primary_key
+ val name = Generic.name.make_primary_key
+ val succs = SQL.Column.string("succs")
+ val hostname = SQL.Column.string("hostname")
+ val numa_node = SQL.Column.int("numa_node")
+ val rel_cpus = SQL.Column.string("rel_cpus")
+ val start = SQL.Column.date("start")
+ val duration = SQL.Column.long("duration")
+
+ val table =
+ make_table(
+ List(build_uuid, name, succs, hostname, numa_node, rel_cpus, start, duration),
+ name = "schedule_nodes")
+ }
+
+ type Nodes = List[((String, Schedule.Node), List[String])]
+
+ def read_nodes(db: SQL.Database, build_uuid: String = ""): Nodes = {
+ db.execute_query_statement(
+ Nodes.table.select(sql =
+ SQL.where(if_proper(build_uuid, Nodes.build_uuid.equal(build_uuid)))),
+ List.from[((String, Schedule.Node), List[String])],
+ { res =>
+ val name = res.string(Nodes.name)
+ val succs = split_lines(res.string(Nodes.succs))
+ val hostname = res.string(Nodes.hostname)
+ val numa_node = res.get_int(Nodes.numa_node)
+ val rel_cpus = res.string(Nodes.rel_cpus)
+ val start = res.date(Nodes.start)
+ val duration = Time.ms(res.long(Nodes.duration))
+
+ val node_info = Node_Info(hostname, numa_node, isabelle.Host.Range.from(rel_cpus))
+ ((name, Schedule.Node(name, node_info, start, duration)), succs)
+ }
+ )
+ }
+
+ def update_nodes(db: SQL.Database, build_uuid: String, nodes: Nodes): Unit = {
+ db.execute_statement(Nodes.table.delete(Nodes.build_uuid.where_equal(build_uuid)))
+ db.execute_batch_statement(Nodes.table.insert(), batch =
+ for (((name, node), succs) <- nodes) yield { (stmt: SQL.Statement) =>
+ stmt.string(1) = build_uuid
+ stmt.string(2) = name
+ stmt.string(3) = cat_lines(succs)
+ stmt.string(4) = node.node_info.hostname
+ stmt.int(5) = node.node_info.numa_node
+ stmt.string(6) = isabelle.Host.Range(node.node_info.rel_cpus)
+ stmt.date(7) = node.start
+ stmt.long(8) = node.duration.ms
+ })
+ }
+
+ def pull_schedule(
+ db: SQL.Database,
+ schedule: Schedule,
+ ): Build_Schedule.Schedule =
+ read_schedules(db, schedule.build_uuid) match {
+ case Nil => schedule
+ case schedules => Library.the_single(schedules)
+ }
+
+ def remove_schedules(db: SQL.Database, remove: List[String]): Unit =
+ if (remove.nonEmpty) {
+ val sql = Generic.build_uuid.where_member(remove)
+ db.execute_statement(SQL.MULTI(tables.map(_.delete(sql = sql))))
+ }
+
+ def clean_build_schedules(db: SQL.Database): Unit = {
+ val running_builds_domain =
+ db.execute_query_statement(
+ Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)),
+ List.from[String], res => res.string(Base.build_uuid))
+
+ val (remove, _) =
+ Library.symmetric_difference(read_scheduled_builds_domain(db), running_builds_domain)
+
+ remove_schedules(db, remove)
+ }
+
+ override val tables = SQL.Tables(Schedules.table, Nodes.table)
+ }
+
+
class Engine extends Build.Engine("build_schedule") {
def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = {