record updates within database, based on serial;
authorwenzelm
Sat, 09 Mar 2024 20:52:06 +0100
changeset 79839 f425bbc4b2eb
parent 79838 5c9df01bee89
child 79840 3f7ac523f5b3
record updates within database, based on serial;
src/Pure/Build/build_process.scala
src/Pure/General/sql.scala
src/Pure/library.scala
--- a/src/Pure/Build/build_process.scala	Sat Mar 09 20:20:13 2024 +0100
+++ b/src/Pure/Build/build_process.scala	Sat Mar 09 20:52:06 2024 +0100
@@ -338,6 +338,26 @@
     }
 
 
+    /* recorded updates */
+
+    object Updates {
+      val serial = SQL.Column.long("serial").make_primary_key
+      val kind = SQL.Column.int("kind").make_primary_key
+      val name = Generic.name.make_primary_key
+
+      val table = make_table(List(serial, kind, name), name = "updates")
+    }
+
+    def write_updates(db: SQL.Database, serial: Long, updates: List[Library.Update]): Unit =
+      db.execute_batch_statement(db.insert_permissive(Updates.table), batch =
+        for (update <- updates.iterator; kind = update.kind; name <- update.domain.iterator)
+        yield { (stmt: SQL.Statement) =>
+          stmt.long(1) = serial
+          stmt.int(2) = kind
+          stmt.string(3) = name
+        })
+
+
     /* base table */
 
     object Base {
@@ -424,6 +444,8 @@
           List(name, deps, ancestors, options, sources, timeout,
             old_time, old_command_timings, build_uuid),
           name = "sessions")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
     def read_sessions_domain(db: SQL.Database, build_uuid: String = ""): Set[String] =
@@ -464,10 +486,10 @@
       db: SQL.Database,
       sessions: Build_Process.Sessions,
       old_sessions: Build_Process.Sessions
-    ): Boolean = {
+    ): Library.Update = {
       val update =
         if (old_sessions.eq(sessions)) Library.Update.empty
-        else Library.Update.make(old_sessions.data, sessions.data)
+        else Library.Update.make(old_sessions.data, sessions.data, kind = Sessions.table_index)
 
       if (update.deletes) {
         db.execute_statement(
@@ -490,7 +512,7 @@
           })
       }
 
-      update.defined
+      update
     }
 
 
@@ -506,11 +528,19 @@
 
       val table =
         make_table(List(worker_uuid, build_uuid, start, stamp, stop, serial), name = "workers")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
-    def read_serial(db: SQL.Database): Long =
-      db.execute_query_statementO[Long](
-        Workers.table.select(List(Workers.serial.max)), _.long(Workers.serial)).getOrElse(0L)
+    def read_serial(db: SQL.Database): Long = {
+      val a =
+        db.execute_query_statementO[Long](
+          Workers.table.select(List(Workers.serial.max)), _.long(Workers.serial))
+      val b =
+        db.execute_query_statementO[Long](
+          Updates.table.select(List(Updates.serial.max)), _.long(Updates.serial))
+      a.getOrElse(0L) max b.getOrElse(0L)
+    }
 
     def read_workers(
       db: SQL.Database,
@@ -595,6 +625,8 @@
       val build_uuid = Generic.build_uuid
 
       val table = make_table(List(name, deps, build_uuid), name = "pending")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
     def read_pending(db: SQL.Database): State.Pending =
@@ -612,8 +644,8 @@
       db: SQL.Database,
       pending: State.Pending,
       old_pending: State.Pending
-    ): Boolean = {
-      val update = Library.Update.make(old_pending, pending)
+    ): Library.Update = {
+      val update = Library.Update.make(old_pending, pending, kind = Pending.table_index)
 
       if (update.deletes) {
         db.execute_statement(
@@ -630,7 +662,7 @@
           })
       }
 
-      update.defined
+      update
     }
 
 
@@ -649,6 +681,8 @@
         make_table(
           List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus, start_date),
           name = "running")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
     def read_running(db: SQL.Database): State.Running =
@@ -673,8 +707,8 @@
       db: SQL.Database,
       running: State.Running,
       old_running: State.Running
-    ): Boolean = {
-      val update = Library.Update.make(old_running, running)
+    ): Library.Update = {
+      val update = Library.Update.make(old_running, running, kind = Running.table_index)
 
       if (update.deletes) {
         db.execute_statement(
@@ -695,7 +729,7 @@
           })
       }
 
-      update.defined
+      update
     }
 
 
@@ -722,6 +756,8 @@
           List(name, worker_uuid, build_uuid, hostname, numa_node, rel_cpus,
             rc, out, err, timing_elapsed, timing_cpu, timing_gc, output_shasum, current),
           name = "results")
+
+      lazy val table_index: Int = tables.index(table)
     }
 
     def read_results_domain(db: SQL.Database): Set[String] =
@@ -768,8 +804,8 @@
       db: SQL.Database,
       results: State.Results,
       old_results: State.Results
-    ): Boolean = {
-      val update = Library.Update.make(old_results, results)
+    ): Library.Update = {
+      val update = Library.Update.make(old_results, results, kind = Results.table_index)
 
       if (update.deletes) {
         db.execute_statement(
@@ -798,7 +834,7 @@
           })
       }
 
-      update.defined
+      update
     }
 
 
@@ -806,12 +842,13 @@
 
     override val tables: SQL.Tables =
       SQL.Tables(
-        Base.table,
-        Workers.table,
+        Updates.table,
         Sessions.table,
         Pending.table,
         Running.table,
-        Results.table)
+        Results.table,
+        Base.table,
+        Workers.table)
 
     private val build_uuid_tables =
       tables.filter(table =>
@@ -840,16 +877,16 @@
       state: State,
       old_state: State
     ): State = {
-      val serial = state.next_serial
-      val changed =
+      val updates =
         List(
           update_sessions(db, state.sessions, old_state.sessions),
           update_pending(db, state.pending, old_state.pending),
           update_running(db, state.running, old_state.running),
-          update_results(db, state.results, old_state.results)
-        ).exists(identity)
+          update_results(db, state.results, old_state.results))
 
-      if (changed) {
+      if (updates.exists(_.defined)) {
+        val serial = state.next_serial
+        write_updates(db, serial, updates)
         stamp_worker(db, worker_uuid, serial)
         state.copy(serial = serial)
       }
--- a/src/Pure/General/sql.scala	Sat Mar 09 20:20:13 2024 +0100
+++ b/src/Pure/General/sql.scala	Sat Mar 09 20:52:06 2024 +0100
@@ -234,6 +234,11 @@
 
     def iterator: Iterator[Table] = list.iterator
 
+    def index(table: Table): Int =
+      iterator.zipWithIndex
+        .collectFirst({ case (t, i) if t.name == table.name => i })
+        .getOrElse(error("No table " + quote(table.name)))
+
     // requires transaction
     def lock(db: Database, create: Boolean = false): Boolean = {
       if (create) foreach(db.create_table(_))
--- a/src/Pure/library.scala	Sat Mar 09 20:20:13 2024 +0100
+++ b/src/Pure/library.scala	Sat Mar 09 20:52:06 2024 +0100
@@ -292,18 +292,19 @@
 
     val empty: Update = Update()
 
-    def make[A](data0: Data[A], data1: Data[A]): Update =
+    def make[A](data0: Data[A], data1: Data[A], kind: Int = 0): Update =
       if (data0.eq(data1)) empty
       else {
         val delete = List.from(for ((x, y) <- data0.iterator if !data1.get(x).contains(y)) yield x)
         val insert = List.from(for ((x, y) <- data1.iterator if !data0.get(x).contains(y)) yield x)
-        Update(delete = delete, insert = insert)
+        Update(delete = delete, insert = insert, kind = kind)
       }
   }
 
   sealed case class Update(
     delete: List[String] = Nil,
-    insert: List[String] = Nil
+    insert: List[String] = Nil,
+    kind: Int = 0
   ) {
     def deletes: Boolean = delete.nonEmpty
     def inserts: Boolean = insert.nonEmpty