# HG changeset patch # User wenzelm # Date 1676926395 -3600 # Node ID cf6947717650ceed631f318368215f12d073743e # Parent cc292dafc52735abf523713cbcf7d28b2cd27f2b# Parent 7a6fa60298cdd2602395fc148073719329d94077 merged diff -r cc292dafc527 -r cf6947717650 src/Pure/ML/ml_process.scala --- a/src/Pure/ML/ml_process.scala Mon Feb 20 13:59:42 2023 +0100 +++ b/src/Pure/ML/ml_process.scala Mon Feb 20 21:53:15 2023 +0100 @@ -117,9 +117,9 @@ bash_env.put("ISABELLE_TMP", File.standard_path(isabelle_tmp)) bash_env.put("POLYSTATSDIR", isabelle_tmp.getAbsolutePath) - Bash.process( - options.string("ML_process_policy") + """ "$ML_HOME/poly" -q """ + - Bash.strings(bash_args), + val policy = options.string("ML_process_policy") match { case "" => "" case s => s + " " } + + Bash.process(policy + """"$ML_HOME/poly" -q """ + Bash.strings(bash_args), cwd = cwd, env = bash_env, redirect = redirect, diff -r cc292dafc527 -r cf6947717650 src/Pure/System/numa.scala --- a/src/Pure/System/numa.scala Mon Feb 20 13:59:42 2023 +0100 +++ b/src/Pure/System/numa.scala Mon Feb 20 21:53:15 2023 +0100 @@ -32,29 +32,39 @@ /* CPU policy via numactl tool */ - lazy val numactl_available: Boolean = Isabelle_System.bash("numactl -m0 -N0 true").ok + def numactl(node: Int): String = "numactl -m" + node + " -N" + node + def numactl_ok(node: Int): Boolean = Isabelle_System.bash(numactl(node) + " true").ok - def policy(node: Int): String = - if (numactl_available) "numactl -m" + node + " -N" + node else "" + def policy(node: Int): String = if (numactl_ok(node)) numactl(node) else "" - def policy_options(options: Options, numa_node: Option[Int] = Some(0)): Options = + def policy_options(options: Options, numa_node: Option[Int]): Options = numa_node match { case None => options case Some(n) => options.string("ML_process_policy") = policy(n) } + def perhaps_policy_options(options: Options): Options = { + val numa_node = + try { + nodes() match { + case ns if ns.length >= 2 && numactl_ok(ns.head) => Some(ns.head) + case _ => None + } + } + catch { case ERROR(_) => None } + policy_options(options, numa_node) + } + /* shuffling of CPU nodes */ - def enabled: Boolean = - try { nodes().length >= 2 && numactl_available } - catch { case ERROR(_) => false } - def enabled_warning(progress: Progress, enabled: Boolean): Boolean = { def warning = - if (nodes().length < 2) Some("no NUMA nodes available") - else if (!numactl_available) Some("bad numactl tool") - else None + nodes() match { + case ns if ns.length < 2 => Some("no NUMA nodes available") + case ns if !numactl_ok(ns.head) => Some("bad numactl tool") + case _ => None + } enabled && (warning match { @@ -62,21 +72,4 @@ case _ => true }) } - - class Nodes(enabled: Boolean = true) { - private val available = nodes().zipWithIndex - private var next_index = 0 - - def next(used: Int => Boolean = _ => false): Option[Int] = synchronized { - if (!enabled || available.isEmpty) None - else { - val candidates = available.drop(next_index) ::: available.take(next_index) - val (n, i) = - candidates.find({ case (n, i) => i == next_index && !used(n) }) orElse - candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head - next_index = (i + 1) % available.length - Some(n) - } - } - } } diff -r cc292dafc527 -r cf6947717650 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Mon Feb 20 13:59:42 2023 +0100 +++ b/src/Pure/Tools/build.scala Mon Feb 20 21:53:15 2023 +0100 @@ -11,14 +11,24 @@ object Build { /** build with results **/ - class Results private[Build]( + object Results { + def apply(context: Build_Process.Context, results: Map[String, Process_Result]): Results = + new Results(context.store, context.deps, results) + } + + class Results private( val store: Sessions.Store, val deps: Sessions.Deps, - val sessions_ok: List[String], results: Map[String, Process_Result] ) { def cache: Term.Cache = store.cache + def sessions_ok: List[String] = + (for { + name <- deps.sessions_structure.build_topological_order.iterator + result <- results.get(name) if result.ok + } yield name).toList + def info(name: String): Sessions.Info = deps.sessions_structure(name) def sessions: Set[String] = results.keySet def cancelled(name: String): Boolean = !results(name).defined @@ -117,7 +127,11 @@ /* build process and results */ - val build_context = Build_Process.Context(store, build_deps, progress = progress) + val build_context = + Build_Process.Context(store, build_deps, progress = progress, + build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs, + fresh_build = fresh_build, no_build = no_build, verbose = verbose, + session_setup = session_setup) store.prepare_output_dir() @@ -131,35 +145,12 @@ } } - val results = { - val build_results = - if (build_deps.is_empty) { - progress.echo_warning("Nothing to build") - Map.empty[String, Build_Process.Result] - } - else { - Isabelle_Thread.uninterruptible { - val build_process = - new Build_Process(build_context, build_heap = build_heap, - numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build, - no_build = no_build, verbose = verbose, session_setup = session_setup) - build_process.run() - } - } - - val sessions_ok: List[String] = - (for { - name <- build_deps.sessions_structure.build_topological_order.iterator - result <- build_results.get(name) - if result.ok - } yield name).toList - - val results = - (for ((name, result) <- build_results.iterator) - yield (name, result.process_result)).toMap - - new Results(store, build_deps, sessions_ok, results) - } + val results = + Isabelle_Thread.uninterruptible { + val build_process = new Build_Process(build_context) + val res = build_process.run() + Results(build_context, res) + } if (export_files) { for (name <- full_sessions_selection.iterator if results(name).ok) { diff -r cc292dafc527 -r cf6947717650 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Mon Feb 20 13:59:42 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Mon Feb 20 21:53:15 2023 +0100 @@ -9,7 +9,6 @@ import scala.math.Ordering -import scala.collection.immutable.SortedSet import scala.annotation.tailrec @@ -68,7 +67,14 @@ def apply( store: Sessions.Store, deps: Sessions.Deps, - progress: Progress = new Progress + progress: Progress = new Progress, + build_heap: Boolean = false, + numa_shuffling: Boolean = false, + max_jobs: Int = 1, + fresh_build: Boolean = false, + no_build: Boolean = false, + verbose: Boolean = false, + session_setup: (String, Session) => Unit = (_, _) => () ): Context = { val sessions_structure = deps.sessions_structure val build_graph = sessions_structure.build_graph @@ -113,7 +119,10 @@ } } - new Context(store, deps, sessions, ordering, progress) + val numa_nodes = if (numa_shuffling) NUMA.nodes() else Nil + new Context(store, deps, sessions, ordering, progress, numa_nodes, + build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build, + no_build = no_build, verbose = verbose, session_setup) } } @@ -122,20 +131,33 @@ val deps: Sessions.Deps, sessions: Map[String, Session_Context], val ordering: Ordering[String], - val progress: Progress + val progress: Progress, + val numa_nodes: List[Int], + val build_heap: Boolean, + val max_jobs: Int, + val fresh_build: Boolean, + val no_build: Boolean, + val verbose: Boolean, + val session_setup: (String, Session) => Unit ) { def sessions_structure: Sessions.Structure = deps.sessions_structure def apply(session: String): Session_Context = sessions.getOrElse(session, Session_Context.empty(session, Time.zero)) - def build_heap(session: String): Boolean = - Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session) + def do_store(session: String): Boolean = + build_heap || Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session) } /* main */ + case class Entry(name: String, deps: List[String]) { + def is_ready: Boolean = deps.isEmpty + def resolve(dep: String): Entry = + if (deps.contains(dep)) copy(deps = deps.filterNot(_ == dep)) else this + } + case class Result( current: Boolean, output_heap: SHA1.Shasum, @@ -143,22 +165,24 @@ ) { def ok: Boolean = process_result.ok } + + def session_finished(session_name: String, process_result: Process_Result): String = + "Finished " + session_name + " (" + process_result.timing.message_resources + ")" + + def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = { + val props = build_log.session_timing + val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 + val timing = Markup.Timing_Properties.get(props) + "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")" + } } -class Build_Process( - build_context: Build_Process.Context, - build_heap: Boolean = false, - numa_shuffling: Boolean = false, - max_jobs: Int = 1, - fresh_build: Boolean = false, - no_build: Boolean = false, - verbose: Boolean = false, - session_setup: (String, Session) => Unit = (_, _) => () -) { +class Build_Process(build_context: Build_Process.Context) { private val store = build_context.store private val build_options = store.options private val build_deps = build_context.deps private val progress = build_context.progress + private val verbose = build_context.verbose private val log = build_options.string("system_log") match { @@ -168,33 +192,43 @@ } // global state - private val _numa_nodes = new NUMA.Nodes(numa_shuffling) - private var _build_graph = build_context.sessions_structure.build_graph - private var _build_order = SortedSet.from(_build_graph.keys)(build_context.ordering) + private var _numa_index = 0 + private var _pending: List[Build_Process.Entry] = + (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator) + yield Build_Process.Entry(name, preds.toList)).toList private var _running = Map.empty[String, Build_Job] private var _results = Map.empty[String, Build_Process.Result] + private def test_pending(): Boolean = synchronized { _pending.nonEmpty } + private def remove_pending(name: String): Unit = synchronized { - _build_graph = _build_graph.del_node(name) - _build_order = _build_order - name + _pending = _pending.flatMap(entry => if (entry.name == name) None else Some(entry.resolve(name))) } private def next_pending(): Option[String] = synchronized { - if (_running.size < (max_jobs max 1)) { - _build_order.iterator - .dropWhile(name => _running.isDefinedAt(name) || !_build_graph.is_minimal(name)) - .nextOption() + if (_running.size < (build_context.max_jobs max 1)) { + _pending.filter(entry => entry.is_ready && !_running.isDefinedAt(entry.name)) + .sortBy(_.name)(build_context.ordering) + .headOption.map(_.name) } else None } private def next_numa_node(): Option[Int] = synchronized { - _numa_nodes.next(used = - Set.from(for { job <- _running.valuesIterator; i <- job.numa_node } yield i)) + val available = build_context.numa_nodes.zipWithIndex + if (available.isEmpty) None + else { + val used = Set.from(for (job <- _running.valuesIterator; i <- job.numa_node) yield i) + val index = _numa_index + val candidates = available.drop(index) ::: available.take(index) + val (n, i) = + candidates.find({ case (n, i) => i == index && !used(n) }) orElse + candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head + _numa_index = (i + 1) % available.length + Some(n) + } } - private def test_running(): Boolean = synchronized { !_build_graph.is_empty } - private def stop_running(): Unit = synchronized { _running.valuesIterator.foreach(_.terminate()) } private def finished_running(): List[Build_Job.Build_Session] = synchronized { @@ -227,16 +261,6 @@ names.map(_results.apply) } - private def session_finished(session_name: String, process_result: Process_Result): String = - "Finished " + session_name + " (" + process_result.timing.message_resources + ")" - - private def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = { - val props = build_log.session_timing - val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 - val timing = Markup.Timing_Properties.get(props) - "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")" - } - private def finish_job(job: Build_Job.Build_Session): Unit = { val session_name = job.session_name val process_result = job.join @@ -278,8 +302,8 @@ process_result.err_lines.foreach(progress.echo) if (process_result.ok) { - if (verbose) progress.echo(session_timing(session_name, build_log)) - progress.echo(session_finished(session_name, process_result)) + if (verbose) progress.echo(Build_Process.session_timing(session_name, build_log)) + progress.echo(Build_Process.session_finished(session_name, process_result)) } else { progress.echo(session_name + " FAILED") @@ -304,7 +328,7 @@ } else SHA1.flat_shasum(ancestor_results.map(_.output_heap)) - val do_store = build_heap || build_context.build_heap(session_name) + val do_store = build_context.do_store(session_name) val (current, output_heap) = { store.try_open_database(session_name) match { case Some(db) => @@ -312,7 +336,7 @@ case Some(build) => val output_heap = store.find_heap_shasum(session_name) val current = - !fresh_build && + !build_context.fresh_build && build.ok && build.sources == build_deps.sources_shasum(session_name) && build.input_heaps == input_heaps && @@ -332,7 +356,7 @@ add_result(session_name, true, output_heap, Process_Result.ok) } } - else if (no_build) { + else if (build_context.no_build) { progress.echo_if(verbose, "Skipping " + session_name + " ...") synchronized { remove_pending(session_name) @@ -356,7 +380,7 @@ val numa_node = next_numa_node() job_running(session_name, new Build_Job.Build_Session(progress, session_background, store, do_store, - resources, session_setup, input_heaps, numa_node)) + resources, build_context.session_setup, input_heaps, numa_node)) } job.start() } @@ -374,18 +398,25 @@ build_options.seconds("editor_input_delay").sleep() } - def run(): Map[String, Build_Process.Result] = { - while (test_running()) { - if (progress.stopped) stop_running() + def run(): Map[String, Process_Result] = { + if (test_pending()) { + while (test_pending()) { + if (progress.stopped) stop_running() + + for (job <- finished_running()) finish_job(job) - for (job <- finished_running()) finish_job(job) - - next_pending() match { - case Some(session_name) => start_job(session_name) - case None => sleep() + next_pending() match { + case Some(session_name) => start_job(session_name) + case None => sleep() + } + } + synchronized { + for ((name, result) <- _results) yield name -> result.process_result } } - - synchronized { _results } + else { + progress.echo_warning("Nothing to build") + Map.empty[String, Process_Result] + } } } diff -r cc292dafc527 -r cf6947717650 src/Pure/Tools/dump.scala --- a/src/Pure/Tools/dump.scala Mon Feb 20 13:59:42 2023 +0100 +++ b/src/Pure/Tools/dump.scala Mon Feb 20 21:53:15 2023 +0100 @@ -97,9 +97,8 @@ skip_base: Boolean = false ): Context = { val session_options: Options = { - val options0 = if (NUMA.enabled) NUMA.policy_options(options) else options val options1 = - options0 + + NUMA.perhaps_policy_options(options) + "parallel_proofs=0" + "completion_limit=0" + "editor_tracing_messages=0"