# HG changeset patch # User Fabian Huch # Date 1701945241 -3600 # Node ID a22440b9cb70cff3f899440edb294362aa1eb354 # Parent cfed4fcf1dae126232f5e0f4c0f4db1cd9f05bb8 use build database to synchronize build schedule computed on master node (e.g., such that view on cluster is consistent); diff -r cfed4fcf1dae -r a22440b9cb70 src/Pure/Tools/build_schedule.scala --- a/src/Pure/Tools/build_schedule.scala Thu Dec 07 11:28:12 2023 +0100 +++ b/src/Pure/Tools/build_schedule.scala Thu Dec 07 11:34:01 2023 +0100 @@ -780,12 +780,49 @@ } catch { case exn: Throwable => close(); throw exn } + private val _build_database: Option[SQL.Database] = + try { + for (db <- store.maybe_open_build_database(server = server)) yield { + if (build_context.master) { + Build_Schedule.private_data.transaction_lock( + db, + create = true, + label = "Build_Schedule.build_database" + ) { Build_Schedule.private_data.clean_build_schedules(db) } + db.vacuum(Build_Schedule.private_data.tables.list) + } + db + } + } + catch { case exn: Throwable => close(); throw exn } + override def close(): Unit = { super.close() Option(_log_database).foreach(_.close()) + Option(_build_database).flatten.foreach(_.close()) } + /* global state: internal var vs. external database */ + + private var _schedule = Schedule.init(build_uuid) + + override protected def synchronized_database[A](label: String)(body: => A): A = + super.synchronized_database(label) { + _build_database match { + case None => body + case Some(db) => + Build_Schedule.private_data.transaction_lock(db, label = label) { + val old_schedule = Build_Schedule.private_data.pull_schedule(db, _schedule) + _schedule = old_schedule + val res = body + Build_Schedule.private_data.update_schedule(db, _schedule) + res + } + } + } + + /* previous results via build log */ override def open_build_cluster(): Build_Cluster = { @@ -866,8 +903,6 @@ /* build process */ - private var _schedule = Schedule.init(build_uuid) - override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = _schedule.graph.get_node(session_name).node_info @@ -902,6 +937,7 @@ else { val current = state.next_ready.filter(task => is_current(state, task.name)) if (current.nonEmpty) current.map(_.name).take(finalize_limit) + else if (!build_context.master) Nil else { val start = Time.now() val schedule = scheduler.build_schedule(state) @@ -924,6 +960,144 @@ } } + + /** SQL data model of build schedule, extending isabelle_build database */ + + object private_data extends SQL.Data("isabelle_build") { + import Build_Process.private_data.{Base, Generic} + + + /* schedule */ + + object Schedules { + val build_uuid = Generic.build_uuid.make_primary_key + val generator = SQL.Column.string("generator") + val start = SQL.Column.date("start") + + val table = make_table(List(build_uuid, generator, start), name = "schedules") + } + + def read_scheduled_builds_domain(db: SQL.Database): List[String] = + db.execute_query_statement( + Schedules.table.select(List(Schedules.build_uuid)), + List.from[String], res => res.string(Schedules.build_uuid)) + + def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = { + val schedules = + db.execute_query_statement(Schedules.table.select(sql = + SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))), + List.from[Schedule], + { res => + val build_uuid = res.string(Schedules.build_uuid) + val generator = res.string(Schedules.generator) + val start = res.date(Schedules.start) + Schedule(build_uuid, generator, start, Graph.empty) + }) + + for (schedule <- schedules.sortBy(_.start)(Date.Ordering)) yield { + val nodes = private_data.read_nodes(db, build_uuid = schedule.build_uuid) + schedule.copy(graph = Graph.make(nodes)) + } + } + + def update_schedule(db: SQL.Database, schedule: Schedule): Unit = { + db.execute_statement( + Schedules.table.delete(Schedules.build_uuid.where_equal(schedule.build_uuid))) + db.execute_statement(Schedules.table.insert(), { stmt => + stmt.string(1) = schedule.build_uuid + stmt.string(2) = schedule.generator + stmt.date(3) = schedule.start + }) + update_nodes(db, schedule.build_uuid, schedule.graph.dest) + } + + + /* nodes */ + + object Nodes { + val build_uuid = Generic.build_uuid.make_primary_key + val name = Generic.name.make_primary_key + val succs = SQL.Column.string("succs") + val hostname = SQL.Column.string("hostname") + val numa_node = SQL.Column.int("numa_node") + val rel_cpus = SQL.Column.string("rel_cpus") + val start = SQL.Column.date("start") + val duration = SQL.Column.long("duration") + + val table = + make_table( + List(build_uuid, name, succs, hostname, numa_node, rel_cpus, start, duration), + name = "schedule_nodes") + } + + type Nodes = List[((String, Schedule.Node), List[String])] + + def read_nodes(db: SQL.Database, build_uuid: String = ""): Nodes = { + db.execute_query_statement( + Nodes.table.select(sql = + SQL.where(if_proper(build_uuid, Nodes.build_uuid.equal(build_uuid)))), + List.from[((String, Schedule.Node), List[String])], + { res => + val name = res.string(Nodes.name) + val succs = split_lines(res.string(Nodes.succs)) + val hostname = res.string(Nodes.hostname) + val numa_node = res.get_int(Nodes.numa_node) + val rel_cpus = res.string(Nodes.rel_cpus) + val start = res.date(Nodes.start) + val duration = Time.ms(res.long(Nodes.duration)) + + val node_info = Node_Info(hostname, numa_node, isabelle.Host.Range.from(rel_cpus)) + ((name, Schedule.Node(name, node_info, start, duration)), succs) + } + ) + } + + def update_nodes(db: SQL.Database, build_uuid: String, nodes: Nodes): Unit = { + db.execute_statement(Nodes.table.delete(Nodes.build_uuid.where_equal(build_uuid))) + db.execute_batch_statement(Nodes.table.insert(), batch = + for (((name, node), succs) <- nodes) yield { (stmt: SQL.Statement) => + stmt.string(1) = build_uuid + stmt.string(2) = name + stmt.string(3) = cat_lines(succs) + stmt.string(4) = node.node_info.hostname + stmt.int(5) = node.node_info.numa_node + stmt.string(6) = isabelle.Host.Range(node.node_info.rel_cpus) + stmt.date(7) = node.start + stmt.long(8) = node.duration.ms + }) + } + + def pull_schedule( + db: SQL.Database, + schedule: Schedule, + ): Build_Schedule.Schedule = + read_schedules(db, schedule.build_uuid) match { + case Nil => schedule + case schedules => Library.the_single(schedules) + } + + def remove_schedules(db: SQL.Database, remove: List[String]): Unit = + if (remove.nonEmpty) { + val sql = Generic.build_uuid.where_member(remove) + db.execute_statement(SQL.MULTI(tables.map(_.delete(sql = sql)))) + } + + def clean_build_schedules(db: SQL.Database): Unit = { + val running_builds_domain = + db.execute_query_statement( + Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)), + List.from[String], res => res.string(Base.build_uuid)) + + val (remove, _) = + Library.symmetric_difference(read_scheduled_builds_domain(db), running_builds_domain) + + remove_schedules(db, remove) + } + + override val tables = SQL.Tables(Schedules.table, Nodes.table) + } + + class Engine extends Build.Engine("build_schedule") { def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = {