# HG changeset patch # User wenzelm # Date 1687010321 -7200 # Node ID cc0bd66eb49858329931eb6bb3fc5a21f60ba2fd # Parent c0ad1c0edd268ff28ebd46ac377d5d105037ab31 separate host.db for independent db.transaction_lock; diff -r c0ad1c0edd26 -r cc0bd66eb498 src/Pure/System/host.scala --- a/src/Pure/System/host.scala Sat Jun 17 14:45:24 2023 +0200 +++ b/src/Pure/System/host.scala Sat Jun 17 15:58:41 2023 +0200 @@ -96,6 +96,8 @@ /* SQL data model */ object Data { + val database: Path = Path.explode("$ISABELLE_HOME_USER/host.db") + def make_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table = SQL.Table("isabelle_host" + if_proper(name, "_" + name), columns, body = body) @@ -106,6 +108,8 @@ val table = make_table("node_info", List(hostname, numa_next)) } + val all_tables: SQL.Tables = SQL.Tables(Node_Info.table) + def read_numa_next(db: SQL.Database, hostname: String): Int = db.execute_query_statementO[Int]( Node_Info.table.select(List(Node_Info.numa_next), @@ -113,16 +117,38 @@ res => res.int(Node_Info.numa_next) ).getOrElse(0) - def update_numa_next(db: SQL.Database, hostname: String, numa_next: Int): Boolean = - if (read_numa_next(db, hostname) != numa_next) { - db.execute_statement(Node_Info.table.delete(sql = Node_Info.hostname.where_equal(hostname))) - db.execute_statement(Node_Info.table.insert(), body = - { stmt => - stmt.string(1) = hostname - stmt.int(2) = numa_next - }) - true + def write_numa_next(db: SQL.Database, hostname: String, numa_next: Int): Unit = { + db.execute_statement(Node_Info.table.delete(sql = Node_Info.hostname.where_equal(hostname))) + db.execute_statement(Node_Info.table.insert(), body = + { stmt => + stmt.string(1) = hostname + stmt.int(2) = numa_next + }) + } + } + + def next_numa_node( + db: SQL.Database, + hostname: String, + available_nodes: List[Int], + used_nodes: => Set[Int] + ): Option[Int] = + if (available_nodes.isEmpty) None + else { + val available = available_nodes.zipWithIndex + val used = used_nodes + db.transaction_lock(Data.all_tables, create = true) { + val numa_next = Data.read_numa_next(db, hostname) + val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0) + val candidates = available.drop(numa_index) ::: available.take(numa_index) + val (n, i) = + candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse + candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head + + val numa_next1 = available_nodes((i + 1) % available_nodes.length) + if (numa_next != numa_next1) Data.write_numa_next(db, hostname, numa_next1) + + Some(n) } - else false - } + } } diff -r c0ad1c0edd26 -r cc0bd66eb498 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sat Jun 17 14:45:24 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Sat Jun 17 15:58:41 2023 +0200 @@ -215,7 +215,6 @@ sealed case class State( serial: Long = 0, - numa_next: Int = 0, sessions: State.Sessions = Map.empty, pending: State.Pending = Nil, running: State.Running = Map.empty, @@ -228,22 +227,6 @@ copy(serial = i) } - def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) = - if (numa_nodes.isEmpty) (None, this) - else { - val available = numa_nodes.zipWithIndex - val used = - Set.from(for (job <- running.valuesIterator; i <- job.node_info.numa_node) yield i) - - val numa_index = available.collectFirst({ case (n, i) if n == numa_next => i }).getOrElse(0) - val candidates = available.drop(numa_index) ::: available.take(numa_index) - val (n, i) = - candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse - candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head - - (Some(n), copy(numa_next = numa_nodes((i + 1) % numa_nodes.length))) - } - def finished: Boolean = pending.isEmpty def remove_pending(name: String): State = @@ -751,8 +734,7 @@ Sessions.table, Pending.table, Running.table, - Results.table, - Host.Data.Node_Info.table) + Results.table) val build_uuid_tables = all_tables.filter(table => @@ -770,14 +752,13 @@ val serial = serial_db max state.serial stamp_worker(db, worker_uuid, serial) - val numa_next = Host.Data.read_numa_next(db, hostname) val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions) val pending = read_pending(db) val running = pull0(read_running(db), state.running) val results = pull1(read_results_domain(db), read_results(db, _), state.results) - state.copy(serial = serial, numa_next = numa_next, sessions = sessions, - pending = pending, running = running, results = results) + state.copy(serial = serial, sessions = sessions, pending = pending, + running = running, results = results) } } @@ -793,8 +774,7 @@ update_sessions(db, state.sessions), update_pending(db, state.pending), update_running(db, state.running), - update_results(db, state.results), - Host.Data.update_numa_next(db, hostname, state.numa_next)) + update_results(db, state.results)) val serial0 = state.serial val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0 @@ -828,6 +808,9 @@ private val _database: Option[SQL.Database] = store.maybe_open_build_database(Build_Process.Data.database) + private val _host_database: Option[SQL.Database] = + store.maybe_open_build_database(Host.Data.database) + protected val (progress, worker_uuid) = synchronized { _database match { case None => (build_progress, UUID.random().toString) @@ -942,7 +925,13 @@ .make_result(result_name, Process_Result.undefined, output_shasum) } else { - val (numa_node, state1) = state.next_numa_node(build_context.numa_nodes) + def used_nodes: Set[Int] = + Set.from(for (job <- state.running.valuesIterator; i <- job.node_info.numa_node) yield i) + val numa_node = + for { + db <- _host_database + n <- Host.next_numa_node(db, hostname, build_context.numa_nodes, used_nodes) + } yield n val node_info = Host.Node_Info(hostname, numa_node) progress.echo( @@ -957,7 +946,7 @@ val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build)) - state1.add_running(job) + state.add_running(job) } }