use build database to synchronize build schedule computed on master node (e.g., such that view on cluster is consistent);
authorFabian Huch <huch@in.tum.de>
Thu, 07 Dec 2023 11:34:01 +0100
changeset 79186 a22440b9cb70
parent 79185 cfed4fcf1dae
child 79187 8cb732d7a98c
use build database to synchronize build schedule computed on master node (e.g., such that view on cluster is consistent);
src/Pure/Tools/build_schedule.scala
--- a/src/Pure/Tools/build_schedule.scala	Thu Dec 07 11:28:12 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala	Thu Dec 07 11:34:01 2023 +0100
@@ -780,12 +780,49 @@
       }
       catch { case exn: Throwable => close(); throw exn }
 
+    private val _build_database: Option[SQL.Database] =
+      try {
+        for (db <- store.maybe_open_build_database(server = server)) yield {
+          if (build_context.master) {
+            Build_Schedule.private_data.transaction_lock(
+              db,
+              create = true,
+              label = "Build_Schedule.build_database"
+            ) { Build_Schedule.private_data.clean_build_schedules(db) }
+            db.vacuum(Build_Schedule.private_data.tables.list)
+          }
+          db
+        }
+      }
+      catch { case exn: Throwable => close(); throw exn }
+
     override def close(): Unit = {
       super.close()
       Option(_log_database).foreach(_.close())
+      Option(_build_database).flatten.foreach(_.close())
     }
 
 
+    /* global state: internal var vs. external database */
+
+    private var _schedule = Schedule.init(build_uuid)
+
+    override protected def synchronized_database[A](label: String)(body: => A): A =
+      super.synchronized_database(label) {
+        _build_database match {
+          case None => body
+          case Some(db) =>
+            Build_Schedule.private_data.transaction_lock(db, label = label) {
+              val old_schedule = Build_Schedule.private_data.pull_schedule(db, _schedule)
+              _schedule = old_schedule
+              val res = body
+              Build_Schedule.private_data.update_schedule(db, _schedule)
+              res
+            }
+        }
+      }
+
+
     /* previous results via build log */
 
     override def open_build_cluster(): Build_Cluster = {
@@ -866,8 +903,6 @@
 
     /* build process */
 
-    private var _schedule = Schedule.init(build_uuid)
-
     override def next_node_info(state: Build_Process.State, session_name: String): Node_Info =
       _schedule.graph.get_node(session_name).node_info
 
@@ -902,6 +937,7 @@
       else {
         val current = state.next_ready.filter(task => is_current(state, task.name))
         if (current.nonEmpty) current.map(_.name).take(finalize_limit)
+        else if (!build_context.master) Nil
         else {
           val start = Time.now()
           val schedule = scheduler.build_schedule(state)
@@ -924,6 +960,144 @@
     }
   }
 
+
+  /** SQL data model of build schedule, extending isabelle_build database */
+
+  object private_data extends SQL.Data("isabelle_build") {
+    import Build_Process.private_data.{Base, Generic}
+
+
+    /* schedule */
+
+    object Schedules {
+      val build_uuid = Generic.build_uuid.make_primary_key
+      val generator = SQL.Column.string("generator")
+      val start = SQL.Column.date("start")
+
+      val table = make_table(List(build_uuid, generator, start), name = "schedules")
+    }
+
+    def read_scheduled_builds_domain(db: SQL.Database): List[String] =
+      db.execute_query_statement(
+        Schedules.table.select(List(Schedules.build_uuid)),
+        List.from[String], res => res.string(Schedules.build_uuid))
+
+    def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = {
+      val schedules =
+        db.execute_query_statement(Schedules.table.select(sql =
+          SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))),
+          List.from[Schedule],
+          { res =>
+            val build_uuid = res.string(Schedules.build_uuid)
+            val generator = res.string(Schedules.generator)
+            val start = res.date(Schedules.start)
+            Schedule(build_uuid, generator, start, Graph.empty)
+          })
+
+      for (schedule <- schedules.sortBy(_.start)(Date.Ordering)) yield {
+        val nodes = private_data.read_nodes(db, build_uuid = schedule.build_uuid)
+        schedule.copy(graph = Graph.make(nodes))
+      }
+    }
+
+    def update_schedule(db: SQL.Database, schedule: Schedule): Unit = {
+      db.execute_statement(
+        Schedules.table.delete(Schedules.build_uuid.where_equal(schedule.build_uuid)))
+      db.execute_statement(Schedules.table.insert(), { stmt =>
+        stmt.string(1) = schedule.build_uuid
+        stmt.string(2) = schedule.generator
+        stmt.date(3) = schedule.start
+      })
+      update_nodes(db, schedule.build_uuid, schedule.graph.dest)
+    }
+
+
+    /* nodes */
+
+    object Nodes {
+      val build_uuid = Generic.build_uuid.make_primary_key
+      val name = Generic.name.make_primary_key
+      val succs = SQL.Column.string("succs")
+      val hostname = SQL.Column.string("hostname")
+      val numa_node = SQL.Column.int("numa_node")
+      val rel_cpus = SQL.Column.string("rel_cpus")
+      val start = SQL.Column.date("start")
+      val duration = SQL.Column.long("duration")
+
+      val table =
+        make_table(
+          List(build_uuid, name, succs, hostname, numa_node, rel_cpus, start, duration),
+          name = "schedule_nodes")
+    }
+
+    type Nodes = List[((String, Schedule.Node), List[String])]
+
+    def read_nodes(db: SQL.Database, build_uuid: String = ""): Nodes = {
+      db.execute_query_statement(
+        Nodes.table.select(sql =
+          SQL.where(if_proper(build_uuid, Nodes.build_uuid.equal(build_uuid)))),
+        List.from[((String, Schedule.Node), List[String])],
+        { res =>
+          val name = res.string(Nodes.name)
+          val succs = split_lines(res.string(Nodes.succs))
+          val hostname = res.string(Nodes.hostname)
+          val numa_node = res.get_int(Nodes.numa_node)
+          val rel_cpus = res.string(Nodes.rel_cpus)
+          val start = res.date(Nodes.start)
+          val duration = Time.ms(res.long(Nodes.duration))
+
+          val node_info = Node_Info(hostname, numa_node, isabelle.Host.Range.from(rel_cpus))
+          ((name, Schedule.Node(name, node_info, start, duration)), succs)
+        }
+      )
+    }
+
+    def update_nodes(db: SQL.Database, build_uuid: String, nodes: Nodes): Unit = {
+      db.execute_statement(Nodes.table.delete(Nodes.build_uuid.where_equal(build_uuid)))
+      db.execute_batch_statement(Nodes.table.insert(), batch =
+        for (((name, node), succs) <- nodes) yield { (stmt: SQL.Statement) =>
+          stmt.string(1) = build_uuid
+          stmt.string(2) = name
+          stmt.string(3) = cat_lines(succs)
+          stmt.string(4) = node.node_info.hostname
+          stmt.int(5) = node.node_info.numa_node
+          stmt.string(6) = isabelle.Host.Range(node.node_info.rel_cpus)
+          stmt.date(7) = node.start
+          stmt.long(8) = node.duration.ms
+        })
+    }
+
+    def pull_schedule(
+      db: SQL.Database,
+      schedule: Schedule,
+    ): Build_Schedule.Schedule =
+      read_schedules(db, schedule.build_uuid) match {
+        case Nil => schedule
+        case schedules => Library.the_single(schedules)
+      }
+
+    def remove_schedules(db: SQL.Database, remove: List[String]): Unit =
+      if (remove.nonEmpty) {
+        val sql = Generic.build_uuid.where_member(remove)
+        db.execute_statement(SQL.MULTI(tables.map(_.delete(sql = sql))))
+      }
+
+    def clean_build_schedules(db: SQL.Database): Unit = {
+      val running_builds_domain =
+        db.execute_query_statement(
+          Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)),
+          List.from[String], res => res.string(Base.build_uuid))
+
+      val (remove, _) =
+        Library.symmetric_difference(read_scheduled_builds_domain(db), running_builds_domain)
+
+      remove_schedules(db, remove)
+    }
+
+    override val tables = SQL.Tables(Schedules.table, Nodes.table)
+  }
+
+
   class Engine extends Build.Engine("build_schedule") {
 
     def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = {