--- a/src/Pure/System/host.scala Sat Jun 17 14:45:24 2023 +0200
+++ b/src/Pure/System/host.scala Sat Jun 17 15:58:41 2023 +0200
@@ -96,6 +96,8 @@
/* SQL data model */
object Data {
+ val database: Path = Path.explode("$ISABELLE_HOME_USER/host.db")
+
def make_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table =
SQL.Table("isabelle_host" + if_proper(name, "_" + name), columns, body = body)
@@ -106,6 +108,8 @@
val table = make_table("node_info", List(hostname, numa_next))
}
+ val all_tables: SQL.Tables = SQL.Tables(Node_Info.table)
+
def read_numa_next(db: SQL.Database, hostname: String): Int =
db.execute_query_statementO[Int](
Node_Info.table.select(List(Node_Info.numa_next),
@@ -113,16 +117,38 @@
res => res.int(Node_Info.numa_next)
).getOrElse(0)
- def update_numa_next(db: SQL.Database, hostname: String, numa_next: Int): Boolean =
- if (read_numa_next(db, hostname) != numa_next) {
- db.execute_statement(Node_Info.table.delete(sql = Node_Info.hostname.where_equal(hostname)))
- db.execute_statement(Node_Info.table.insert(), body =
- { stmt =>
- stmt.string(1) = hostname
- stmt.int(2) = numa_next
- })
- true
+ def write_numa_next(db: SQL.Database, hostname: String, numa_next: Int): Unit = {
+ db.execute_statement(Node_Info.table.delete(sql = Node_Info.hostname.where_equal(hostname)))
+ db.execute_statement(Node_Info.table.insert(), body =
+ { stmt =>
+ stmt.string(1) = hostname
+ stmt.int(2) = numa_next
+ })
+ }
+ }
+
+ def next_numa_node(
+ db: SQL.Database,
+ hostname: String,
+ available_nodes: List[Int],
+ used_nodes: => Set[Int]
+ ): Option[Int] =
+ if (available_nodes.isEmpty) None
+ else {
+ val available = available_nodes.zipWithIndex
+ val used = used_nodes
+ db.transaction_lock(Data.all_tables, create = true) {
+ val numa_next = Data.read_numa_next(db, hostname)
+ val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0)
+ val candidates = available.drop(numa_index) ::: available.take(numa_index)
+ val (n, i) =
+ candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse
+ candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
+
+ val numa_next1 = available_nodes((i + 1) % available_nodes.length)
+ if (numa_next != numa_next1) Data.write_numa_next(db, hostname, numa_next1)
+
+ Some(n)
}
- else false
- }
+ }
}
--- a/src/Pure/Tools/build_process.scala Sat Jun 17 14:45:24 2023 +0200
+++ b/src/Pure/Tools/build_process.scala Sat Jun 17 15:58:41 2023 +0200
@@ -215,7 +215,6 @@
sealed case class State(
serial: Long = 0,
- numa_next: Int = 0,
sessions: State.Sessions = Map.empty,
pending: State.Pending = Nil,
running: State.Running = Map.empty,
@@ -228,22 +227,6 @@
copy(serial = i)
}
- def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) =
- if (numa_nodes.isEmpty) (None, this)
- else {
- val available = numa_nodes.zipWithIndex
- val used =
- Set.from(for (job <- running.valuesIterator; i <- job.node_info.numa_node) yield i)
-
- val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0)
- val candidates = available.drop(numa_index) ::: available.take(numa_index)
- val (n, i) =
- candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse
- candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
-
- (Some(n), copy(numa_next = numa_nodes((i + 1) % numa_nodes.length)))
- }
-
def finished: Boolean = pending.isEmpty
def remove_pending(name: String): State =
@@ -751,8 +734,7 @@
Sessions.table,
Pending.table,
Running.table,
- Results.table,
- Host.Data.Node_Info.table)
+ Results.table)
val build_uuid_tables =
all_tables.filter(table =>
@@ -770,14 +752,13 @@
val serial = serial_db max state.serial
stamp_worker(db, worker_uuid, serial)
- val numa_next = Host.Data.read_numa_next(db, hostname)
val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions)
val pending = read_pending(db)
val running = pull0(read_running(db), state.running)
val results = pull1(read_results_domain(db), read_results(db, _), state.results)
- state.copy(serial = serial, numa_next = numa_next, sessions = sessions,
- pending = pending, running = running, results = results)
+ state.copy(serial = serial, sessions = sessions, pending = pending,
+ running = running, results = results)
}
}
@@ -793,8 +774,7 @@
update_sessions(db, state.sessions),
update_pending(db, state.pending),
update_running(db, state.running),
- update_results(db, state.results),
- Host.Data.update_numa_next(db, hostname, state.numa_next))
+ update_results(db, state.results))
val serial0 = state.serial
val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
@@ -828,6 +808,9 @@
private val _database: Option[SQL.Database] =
store.maybe_open_build_database(Build_Process.Data.database)
+ private val _host_database: Option[SQL.Database] =
+ store.maybe_open_build_database(Host.Data.database)
+
protected val (progress, worker_uuid) = synchronized {
_database match {
case None => (build_progress, UUID.random().toString)
@@ -942,7 +925,13 @@
.make_result(result_name, Process_Result.undefined, output_shasum)
}
else {
- val (numa_node, state1) = state.next_numa_node(build_context.numa_nodes)
+ def used_nodes: Set[Int] =
+ Set.from(for (job <- state.running.valuesIterator; i <- job.node_info.numa_node) yield i)
+ val numa_node =
+ for {
+ db <- _host_database
+ n <- Host.next_numa_node(db, hostname, build_context.numa_nodes, used_nodes)
+ } yield n
val node_info = Host.Node_Info(hostname, numa_node)
progress.echo(
@@ -957,7 +946,7 @@
val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))
- state1.add_running(job)
+ state.add_running(job)
}
}