separate host.db for independent db.transaction_lock;
authorwenzelm
Sat, 17 Jun 2023 15:58:41 +0200
changeset 78174 cc0bd66eb498
parent 78173 c0ad1c0edd26
child 78175 a081ad6c3c4b
separate host.db for independent db.transaction_lock;
src/Pure/System/host.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/System/host.scala	Sat Jun 17 14:45:24 2023 +0200
+++ b/src/Pure/System/host.scala	Sat Jun 17 15:58:41 2023 +0200
@@ -96,6 +96,8 @@
   /* SQL data model */
 
   object Data {
+    val database: Path = Path.explode("$ISABELLE_HOME_USER/host.db")
+
     def make_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table =
       SQL.Table("isabelle_host" + if_proper(name, "_" + name), columns, body = body)
 
@@ -106,6 +108,8 @@
       val table = make_table("node_info", List(hostname, numa_next))
     }
 
+    val all_tables: SQL.Tables = SQL.Tables(Node_Info.table)
+
     def read_numa_next(db: SQL.Database, hostname: String): Int =
       db.execute_query_statementO[Int](
         Node_Info.table.select(List(Node_Info.numa_next),
@@ -113,16 +117,38 @@
         res => res.int(Node_Info.numa_next)
       ).getOrElse(0)
 
-    def update_numa_next(db: SQL.Database, hostname: String, numa_next: Int): Boolean =
-      if (read_numa_next(db, hostname) != numa_next) {
-        db.execute_statement(Node_Info.table.delete(sql = Node_Info.hostname.where_equal(hostname)))
-        db.execute_statement(Node_Info.table.insert(), body =
-          { stmt =>
-            stmt.string(1) = hostname
-            stmt.int(2) = numa_next
-          })
-        true
+    def write_numa_next(db: SQL.Database, hostname: String, numa_next: Int): Unit = {
+      db.execute_statement(Node_Info.table.delete(sql = Node_Info.hostname.where_equal(hostname)))
+      db.execute_statement(Node_Info.table.insert(), body =
+        { stmt =>
+          stmt.string(1) = hostname
+          stmt.int(2) = numa_next
+        })
+    }
+  }
+
+  def next_numa_node(
+    db: SQL.Database,
+    hostname: String,
+    available_nodes: List[Int],
+    used_nodes: => Set[Int]
+  ): Option[Int] =
+    if (available_nodes.isEmpty) None
+    else {
+      val available = available_nodes.zipWithIndex
+      val used = used_nodes
+      db.transaction_lock(Data.all_tables, create = true) {
+        val numa_next = Data.read_numa_next(db, hostname)
+        val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0)
+        val candidates = available.drop(numa_index) ::: available.take(numa_index)
+        val (n, i) =
+          candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse
+          candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
+
+        val numa_next1 = available_nodes((i + 1) % available_nodes.length)
+        if (numa_next != numa_next1) Data.write_numa_next(db, hostname, numa_next1)
+
+        Some(n)
       }
-      else false
-  }
+    }
 }
--- a/src/Pure/Tools/build_process.scala	Sat Jun 17 14:45:24 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Sat Jun 17 15:58:41 2023 +0200
@@ -215,7 +215,6 @@
 
   sealed case class State(
     serial: Long = 0,
-    numa_next: Int = 0,
     sessions: State.Sessions = Map.empty,
     pending: State.Pending = Nil,
     running: State.Running = Map.empty,
@@ -228,22 +227,6 @@
       copy(serial = i)
     }
 
-    def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) =
-      if (numa_nodes.isEmpty) (None, this)
-      else {
-        val available = numa_nodes.zipWithIndex
-        val used =
-          Set.from(for (job <- running.valuesIterator; i <- job.node_info.numa_node) yield i)
-
-        val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0)
-        val candidates = available.drop(numa_index) ::: available.take(numa_index)
-        val (n, i) =
-          candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse
-          candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
-
-        (Some(n), copy(numa_next = numa_nodes((i + 1) % numa_nodes.length)))
-      }
-
     def finished: Boolean = pending.isEmpty
 
     def remove_pending(name: String): State =
@@ -751,8 +734,7 @@
         Sessions.table,
         Pending.table,
         Running.table,
-        Results.table,
-        Host.Data.Node_Info.table)
+        Results.table)
 
     val build_uuid_tables =
       all_tables.filter(table =>
@@ -770,14 +752,13 @@
         val serial = serial_db max state.serial
         stamp_worker(db, worker_uuid, serial)
 
-        val numa_next = Host.Data.read_numa_next(db, hostname)
         val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions)
         val pending = read_pending(db)
         val running = pull0(read_running(db), state.running)
         val results = pull1(read_results_domain(db), read_results(db, _), state.results)
 
-        state.copy(serial = serial, numa_next = numa_next, sessions = sessions,
-          pending = pending, running = running, results = results)
+        state.copy(serial = serial, sessions = sessions, pending = pending,
+          running = running, results = results)
       }
     }
 
@@ -793,8 +774,7 @@
           update_sessions(db, state.sessions),
           update_pending(db, state.pending),
           update_running(db, state.running),
-          update_results(db, state.results),
-          Host.Data.update_numa_next(db, hostname, state.numa_next))
+          update_results(db, state.results))
 
       val serial0 = state.serial
       val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
@@ -828,6 +808,9 @@
   private val _database: Option[SQL.Database] =
     store.maybe_open_build_database(Build_Process.Data.database)
 
+  private val _host_database: Option[SQL.Database] =
+    store.maybe_open_build_database(Host.Data.database)
+
   protected val (progress, worker_uuid) = synchronized {
     _database match {
       case None => (build_progress, UUID.random().toString)
@@ -942,7 +925,13 @@
         .make_result(result_name, Process_Result.undefined, output_shasum)
     }
     else {
-      val (numa_node, state1) = state.next_numa_node(build_context.numa_nodes)
+      def used_nodes: Set[Int] =
+        Set.from(for (job <- state.running.valuesIterator; i <- job.node_info.numa_node) yield i)
+      val numa_node =
+        for {
+          db <- _host_database
+          n <- Host.next_numa_node(db, hostname, build_context.numa_nodes, used_nodes)
+        } yield n
       val node_info = Host.Node_Info(hostname, numa_node)
 
       progress.echo(
@@ -957,7 +946,7 @@
 
       val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))
 
-      state1.add_running(job)
+      state.add_running(job)
     }
   }