--- a/src/Pure/ML/ml_process.scala Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/ML/ml_process.scala Mon Feb 20 21:53:15 2023 +0100
@@ -117,9 +117,9 @@
bash_env.put("ISABELLE_TMP", File.standard_path(isabelle_tmp))
bash_env.put("POLYSTATSDIR", isabelle_tmp.getAbsolutePath)
- Bash.process(
- options.string("ML_process_policy") + """ "$ML_HOME/poly" -q """ +
- Bash.strings(bash_args),
+ val policy = options.string("ML_process_policy") match { case "" => "" case s => s + " " }
+
+ Bash.process(policy + """"$ML_HOME/poly" -q """ + Bash.strings(bash_args),
cwd = cwd,
env = bash_env,
redirect = redirect,
--- a/src/Pure/System/numa.scala Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/System/numa.scala Mon Feb 20 21:53:15 2023 +0100
@@ -32,29 +32,39 @@
/* CPU policy via numactl tool */
- lazy val numactl_available: Boolean = Isabelle_System.bash("numactl -m0 -N0 true").ok
+ def numactl(node: Int): String = "numactl -m" + node + " -N" + node
+ def numactl_ok(node: Int): Boolean = Isabelle_System.bash(numactl(node) + " true").ok
- def policy(node: Int): String =
- if (numactl_available) "numactl -m" + node + " -N" + node else ""
+ def policy(node: Int): String = if (numactl_ok(node)) numactl(node) else ""
- def policy_options(options: Options, numa_node: Option[Int] = Some(0)): Options =
+ def policy_options(options: Options, numa_node: Option[Int]): Options =
numa_node match {
case None => options
case Some(n) => options.string("ML_process_policy") = policy(n)
}
+ def perhaps_policy_options(options: Options): Options = {
+ val numa_node =
+ try {
+ nodes() match {
+ case ns if ns.length >= 2 && numactl_ok(ns.head) => Some(ns.head)
+ case _ => None
+ }
+ }
+ catch { case ERROR(_) => None }
+ policy_options(options, numa_node)
+ }
+
/* shuffling of CPU nodes */
- def enabled: Boolean =
- try { nodes().length >= 2 && numactl_available }
- catch { case ERROR(_) => false }
-
def enabled_warning(progress: Progress, enabled: Boolean): Boolean = {
def warning =
- if (nodes().length < 2) Some("no NUMA nodes available")
- else if (!numactl_available) Some("bad numactl tool")
- else None
+ nodes() match {
+ case ns if ns.length < 2 => Some("no NUMA nodes available")
+ case ns if !numactl_ok(ns.head) => Some("bad numactl tool")
+ case _ => None
+ }
enabled &&
(warning match {
@@ -62,21 +72,4 @@
case _ => true
})
}
-
- class Nodes(enabled: Boolean = true) {
- private val available = nodes().zipWithIndex
- private var next_index = 0
-
- def next(used: Int => Boolean = _ => false): Option[Int] = synchronized {
- if (!enabled || available.isEmpty) None
- else {
- val candidates = available.drop(next_index) ::: available.take(next_index)
- val (n, i) =
- candidates.find({ case (n, i) => i == next_index && !used(n) }) orElse
- candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
- next_index = (i + 1) % available.length
- Some(n)
- }
- }
- }
}
--- a/src/Pure/Tools/build.scala Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/Tools/build.scala Mon Feb 20 21:53:15 2023 +0100
@@ -11,14 +11,24 @@
object Build {
/** build with results **/
- class Results private[Build](
+ object Results {
+ def apply(context: Build_Process.Context, results: Map[String, Process_Result]): Results =
+ new Results(context.store, context.deps, results)
+ }
+
+ class Results private(
val store: Sessions.Store,
val deps: Sessions.Deps,
- val sessions_ok: List[String],
results: Map[String, Process_Result]
) {
def cache: Term.Cache = store.cache
+ def sessions_ok: List[String] =
+ (for {
+ name <- deps.sessions_structure.build_topological_order.iterator
+ result <- results.get(name) if result.ok
+ } yield name).toList
+
def info(name: String): Sessions.Info = deps.sessions_structure(name)
def sessions: Set[String] = results.keySet
def cancelled(name: String): Boolean = !results(name).defined
@@ -117,7 +127,11 @@
/* build process and results */
- val build_context = Build_Process.Context(store, build_deps, progress = progress)
+ val build_context =
+ Build_Process.Context(store, build_deps, progress = progress,
+ build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs,
+ fresh_build = fresh_build, no_build = no_build, verbose = verbose,
+ session_setup = session_setup)
store.prepare_output_dir()
@@ -131,35 +145,12 @@
}
}
- val results = {
- val build_results =
- if (build_deps.is_empty) {
- progress.echo_warning("Nothing to build")
- Map.empty[String, Build_Process.Result]
- }
- else {
- Isabelle_Thread.uninterruptible {
- val build_process =
- new Build_Process(build_context, build_heap = build_heap,
- numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build,
- no_build = no_build, verbose = verbose, session_setup = session_setup)
- build_process.run()
- }
- }
-
- val sessions_ok: List[String] =
- (for {
- name <- build_deps.sessions_structure.build_topological_order.iterator
- result <- build_results.get(name)
- if result.ok
- } yield name).toList
-
- val results =
- (for ((name, result) <- build_results.iterator)
- yield (name, result.process_result)).toMap
-
- new Results(store, build_deps, sessions_ok, results)
- }
+ val results =
+ Isabelle_Thread.uninterruptible {
+ val build_process = new Build_Process(build_context)
+ val res = build_process.run()
+ Results(build_context, res)
+ }
if (export_files) {
for (name <- full_sessions_selection.iterator if results(name).ok) {
--- a/src/Pure/Tools/build_process.scala Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/Tools/build_process.scala Mon Feb 20 21:53:15 2023 +0100
@@ -9,7 +9,6 @@
import scala.math.Ordering
-import scala.collection.immutable.SortedSet
import scala.annotation.tailrec
@@ -68,7 +67,14 @@
def apply(
store: Sessions.Store,
deps: Sessions.Deps,
- progress: Progress = new Progress
+ progress: Progress = new Progress,
+ build_heap: Boolean = false,
+ numa_shuffling: Boolean = false,
+ max_jobs: Int = 1,
+ fresh_build: Boolean = false,
+ no_build: Boolean = false,
+ verbose: Boolean = false,
+ session_setup: (String, Session) => Unit = (_, _) => ()
): Context = {
val sessions_structure = deps.sessions_structure
val build_graph = sessions_structure.build_graph
@@ -113,7 +119,10 @@
}
}
- new Context(store, deps, sessions, ordering, progress)
+ val numa_nodes = if (numa_shuffling) NUMA.nodes() else Nil
+ new Context(store, deps, sessions, ordering, progress, numa_nodes,
+ build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build,
+ no_build = no_build, verbose = verbose, session_setup)
}
}
@@ -122,20 +131,33 @@
val deps: Sessions.Deps,
sessions: Map[String, Session_Context],
val ordering: Ordering[String],
- val progress: Progress
+ val progress: Progress,
+ val numa_nodes: List[Int],
+ val build_heap: Boolean,
+ val max_jobs: Int,
+ val fresh_build: Boolean,
+ val no_build: Boolean,
+ val verbose: Boolean,
+ val session_setup: (String, Session) => Unit
) {
def sessions_structure: Sessions.Structure = deps.sessions_structure
def apply(session: String): Session_Context =
sessions.getOrElse(session, Session_Context.empty(session, Time.zero))
- def build_heap(session: String): Boolean =
- Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session)
+ def do_store(session: String): Boolean =
+ build_heap || Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session)
}
/* main */
+ case class Entry(name: String, deps: List[String]) {
+ def is_ready: Boolean = deps.isEmpty
+ def resolve(dep: String): Entry =
+ if (deps.contains(dep)) copy(deps = deps.filterNot(_ == dep)) else this
+ }
+
case class Result(
current: Boolean,
output_heap: SHA1.Shasum,
@@ -143,22 +165,24 @@
) {
def ok: Boolean = process_result.ok
}
+
+ def session_finished(session_name: String, process_result: Process_Result): String =
+ "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
+
+ def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = {
+ val props = build_log.session_timing
+ val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
+ val timing = Markup.Timing_Properties.get(props)
+ "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")"
+ }
}
-class Build_Process(
- build_context: Build_Process.Context,
- build_heap: Boolean = false,
- numa_shuffling: Boolean = false,
- max_jobs: Int = 1,
- fresh_build: Boolean = false,
- no_build: Boolean = false,
- verbose: Boolean = false,
- session_setup: (String, Session) => Unit = (_, _) => ()
-) {
+class Build_Process(build_context: Build_Process.Context) {
private val store = build_context.store
private val build_options = store.options
private val build_deps = build_context.deps
private val progress = build_context.progress
+ private val verbose = build_context.verbose
private val log =
build_options.string("system_log") match {
@@ -168,33 +192,43 @@
}
// global state
- private val _numa_nodes = new NUMA.Nodes(numa_shuffling)
- private var _build_graph = build_context.sessions_structure.build_graph
- private var _build_order = SortedSet.from(_build_graph.keys)(build_context.ordering)
+ private var _numa_index = 0
+ private var _pending: List[Build_Process.Entry] =
+ (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator)
+ yield Build_Process.Entry(name, preds.toList)).toList
private var _running = Map.empty[String, Build_Job]
private var _results = Map.empty[String, Build_Process.Result]
+ private def test_pending(): Boolean = synchronized { _pending.nonEmpty }
+
private def remove_pending(name: String): Unit = synchronized {
- _build_graph = _build_graph.del_node(name)
- _build_order = _build_order - name
+ _pending = _pending.flatMap(entry => if (entry.name == name) None else Some(entry.resolve(name)))
}
private def next_pending(): Option[String] = synchronized {
- if (_running.size < (max_jobs max 1)) {
- _build_order.iterator
- .dropWhile(name => _running.isDefinedAt(name) || !_build_graph.is_minimal(name))
- .nextOption()
+ if (_running.size < (build_context.max_jobs max 1)) {
+ _pending.filter(entry => entry.is_ready && !_running.isDefinedAt(entry.name))
+ .sortBy(_.name)(build_context.ordering)
+ .headOption.map(_.name)
}
else None
}
private def next_numa_node(): Option[Int] = synchronized {
- _numa_nodes.next(used =
- Set.from(for { job <- _running.valuesIterator; i <- job.numa_node } yield i))
+ val available = build_context.numa_nodes.zipWithIndex
+ if (available.isEmpty) None
+ else {
+ val used = Set.from(for (job <- _running.valuesIterator; i <- job.numa_node) yield i)
+ val index = _numa_index
+ val candidates = available.drop(index) ::: available.take(index)
+ val (n, i) =
+ candidates.find({ case (n, i) => i == index && !used(n) }) orElse
+ candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
+ _numa_index = (i + 1) % available.length
+ Some(n)
+ }
}
- private def test_running(): Boolean = synchronized { !_build_graph.is_empty }
-
private def stop_running(): Unit = synchronized { _running.valuesIterator.foreach(_.terminate()) }
private def finished_running(): List[Build_Job.Build_Session] = synchronized {
@@ -227,16 +261,6 @@
names.map(_results.apply)
}
- private def session_finished(session_name: String, process_result: Process_Result): String =
- "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
-
- private def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = {
- val props = build_log.session_timing
- val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
- val timing = Markup.Timing_Properties.get(props)
- "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")"
- }
-
private def finish_job(job: Build_Job.Build_Session): Unit = {
val session_name = job.session_name
val process_result = job.join
@@ -278,8 +302,8 @@
process_result.err_lines.foreach(progress.echo)
if (process_result.ok) {
- if (verbose) progress.echo(session_timing(session_name, build_log))
- progress.echo(session_finished(session_name, process_result))
+ if (verbose) progress.echo(Build_Process.session_timing(session_name, build_log))
+ progress.echo(Build_Process.session_finished(session_name, process_result))
}
else {
progress.echo(session_name + " FAILED")
@@ -304,7 +328,7 @@
}
else SHA1.flat_shasum(ancestor_results.map(_.output_heap))
- val do_store = build_heap || build_context.build_heap(session_name)
+ val do_store = build_context.do_store(session_name)
val (current, output_heap) = {
store.try_open_database(session_name) match {
case Some(db) =>
@@ -312,7 +336,7 @@
case Some(build) =>
val output_heap = store.find_heap_shasum(session_name)
val current =
- !fresh_build &&
+ !build_context.fresh_build &&
build.ok &&
build.sources == build_deps.sources_shasum(session_name) &&
build.input_heaps == input_heaps &&
@@ -332,7 +356,7 @@
add_result(session_name, true, output_heap, Process_Result.ok)
}
}
- else if (no_build) {
+ else if (build_context.no_build) {
progress.echo_if(verbose, "Skipping " + session_name + " ...")
synchronized {
remove_pending(session_name)
@@ -356,7 +380,7 @@
val numa_node = next_numa_node()
job_running(session_name,
new Build_Job.Build_Session(progress, session_background, store, do_store,
- resources, session_setup, input_heaps, numa_node))
+ resources, build_context.session_setup, input_heaps, numa_node))
}
job.start()
}
@@ -374,18 +398,25 @@
build_options.seconds("editor_input_delay").sleep()
}
- def run(): Map[String, Build_Process.Result] = {
- while (test_running()) {
- if (progress.stopped) stop_running()
+ def run(): Map[String, Process_Result] = {
+ if (test_pending()) {
+ while (test_pending()) {
+ if (progress.stopped) stop_running()
+
+ for (job <- finished_running()) finish_job(job)
- for (job <- finished_running()) finish_job(job)
-
- next_pending() match {
- case Some(session_name) => start_job(session_name)
- case None => sleep()
+ next_pending() match {
+ case Some(session_name) => start_job(session_name)
+ case None => sleep()
+ }
+ }
+ synchronized {
+ for ((name, result) <- _results) yield name -> result.process_result
}
}
-
- synchronized { _results }
+ else {
+ progress.echo_warning("Nothing to build")
+ Map.empty[String, Process_Result]
+ }
}
}
--- a/src/Pure/Tools/dump.scala Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/Tools/dump.scala Mon Feb 20 21:53:15 2023 +0100
@@ -97,9 +97,8 @@
skip_base: Boolean = false
): Context = {
val session_options: Options = {
- val options0 = if (NUMA.enabled) NUMA.policy_options(options) else options
val options1 =
- options0 +
+ NUMA.perhaps_policy_options(options) +
"parallel_proofs=0" +
"completion_limit=0" +
"editor_tracing_messages=0"