merged
authorwenzelm
Mon, 20 Feb 2023 21:53:15 +0100
changeset 77321 cf6947717650
parent 77309 cc292dafc527 (current diff)
parent 77320 7a6fa60298cd (diff)
child 77325 5158dc9d096b
child 77326 b3f8aad678e9
merged
--- a/src/Pure/ML/ml_process.scala	Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/ML/ml_process.scala	Mon Feb 20 21:53:15 2023 +0100
@@ -117,9 +117,9 @@
     bash_env.put("ISABELLE_TMP", File.standard_path(isabelle_tmp))
     bash_env.put("POLYSTATSDIR", isabelle_tmp.getAbsolutePath)
 
-    Bash.process(
-      options.string("ML_process_policy") + """ "$ML_HOME/poly" -q """ +
-        Bash.strings(bash_args),
+    val policy = options.string("ML_process_policy") match { case "" => "" case s => s + " " }
+
+    Bash.process(policy + """"$ML_HOME/poly" -q """ + Bash.strings(bash_args),
       cwd = cwd,
       env = bash_env,
       redirect = redirect,
--- a/src/Pure/System/numa.scala	Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/System/numa.scala	Mon Feb 20 21:53:15 2023 +0100
@@ -32,29 +32,39 @@
 
   /* CPU policy via numactl tool */
 
-  lazy val numactl_available: Boolean = Isabelle_System.bash("numactl -m0 -N0 true").ok
+  def numactl(node: Int): String = "numactl -m" + node + " -N" + node
+  def numactl_ok(node: Int): Boolean = Isabelle_System.bash(numactl(node) + " true").ok
 
-  def policy(node: Int): String =
-    if (numactl_available) "numactl -m" + node + " -N" + node else ""
+  def policy(node: Int): String = if (numactl_ok(node)) numactl(node) else ""
 
-  def policy_options(options: Options, numa_node: Option[Int] = Some(0)): Options =
+  def policy_options(options: Options, numa_node: Option[Int]): Options =
     numa_node match {
       case None => options
       case Some(n) => options.string("ML_process_policy") = policy(n)
     }
 
+  def perhaps_policy_options(options: Options): Options = {
+    val numa_node =
+      try {
+        nodes() match {
+          case ns if ns.length >= 2 && numactl_ok(ns.head) => Some(ns.head)
+          case _ => None
+        }
+      }
+      catch { case ERROR(_) => None }
+    policy_options(options, numa_node)
+  }
+
 
   /* shuffling of CPU nodes */
 
-  def enabled: Boolean =
-    try { nodes().length >= 2 && numactl_available }
-    catch { case ERROR(_) => false }
-
   def enabled_warning(progress: Progress, enabled: Boolean): Boolean = {
     def warning =
-      if (nodes().length < 2) Some("no NUMA nodes available")
-      else if (!numactl_available) Some("bad numactl tool")
-      else None
+      nodes() match {
+        case ns if ns.length < 2 => Some("no NUMA nodes available")
+        case ns if !numactl_ok(ns.head) => Some("bad numactl tool")
+        case _ => None
+      }
 
     enabled &&
       (warning match {
@@ -62,21 +72,4 @@
         case _ => true
       })
   }
-
-  class Nodes(enabled: Boolean = true) {
-    private val available = nodes().zipWithIndex
-    private var next_index = 0
-
-    def next(used: Int => Boolean = _ => false): Option[Int] = synchronized {
-      if (!enabled || available.isEmpty) None
-      else {
-        val candidates = available.drop(next_index) ::: available.take(next_index)
-        val (n, i) =
-          candidates.find({ case (n, i) => i == next_index && !used(n) }) orElse
-            candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
-        next_index = (i + 1) % available.length
-        Some(n)
-      }
-    }
-  }
 }
--- a/src/Pure/Tools/build.scala	Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/Tools/build.scala	Mon Feb 20 21:53:15 2023 +0100
@@ -11,14 +11,24 @@
 object Build {
   /** build with results **/
 
-  class Results private[Build](
+  object Results {
+    def apply(context: Build_Process.Context, results: Map[String, Process_Result]): Results =
+      new Results(context.store, context.deps, results)
+  }
+
+  class Results private(
     val store: Sessions.Store,
     val deps: Sessions.Deps,
-    val sessions_ok: List[String],
     results: Map[String, Process_Result]
   ) {
     def cache: Term.Cache = store.cache
 
+    def sessions_ok: List[String] =
+      (for {
+        name <- deps.sessions_structure.build_topological_order.iterator
+        result <- results.get(name) if result.ok
+      } yield name).toList
+
     def info(name: String): Sessions.Info = deps.sessions_structure(name)
     def sessions: Set[String] = results.keySet
     def cancelled(name: String): Boolean = !results(name).defined
@@ -117,7 +127,11 @@
 
     /* build process and results */
 
-    val build_context = Build_Process.Context(store, build_deps, progress = progress)
+    val build_context =
+      Build_Process.Context(store, build_deps, progress = progress,
+        build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs,
+        fresh_build = fresh_build, no_build = no_build, verbose = verbose,
+        session_setup = session_setup)
 
     store.prepare_output_dir()
 
@@ -131,35 +145,12 @@
       }
     }
 
-    val results = {
-      val build_results =
-        if (build_deps.is_empty) {
-          progress.echo_warning("Nothing to build")
-          Map.empty[String, Build_Process.Result]
-        }
-        else {
-          Isabelle_Thread.uninterruptible {
-            val build_process =
-              new Build_Process(build_context, build_heap = build_heap,
-                numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build,
-                no_build = no_build, verbose = verbose, session_setup = session_setup)
-            build_process.run()
-          }
-        }
-
-      val sessions_ok: List[String] =
-        (for {
-          name <- build_deps.sessions_structure.build_topological_order.iterator
-          result <- build_results.get(name)
-          if result.ok
-        } yield name).toList
-
-      val results =
-        (for ((name, result) <- build_results.iterator)
-          yield (name, result.process_result)).toMap
-
-      new Results(store, build_deps, sessions_ok, results)
-    }
+    val results =
+      Isabelle_Thread.uninterruptible {
+        val build_process = new Build_Process(build_context)
+        val res = build_process.run()
+        Results(build_context, res)
+      }
 
     if (export_files) {
       for (name <- full_sessions_selection.iterator if results(name).ok) {
--- a/src/Pure/Tools/build_process.scala	Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/Tools/build_process.scala	Mon Feb 20 21:53:15 2023 +0100
@@ -9,7 +9,6 @@
 
 
 import scala.math.Ordering
-import scala.collection.immutable.SortedSet
 import scala.annotation.tailrec
 
 
@@ -68,7 +67,14 @@
     def apply(
       store: Sessions.Store,
       deps: Sessions.Deps,
-      progress: Progress = new Progress
+      progress: Progress = new Progress,
+      build_heap: Boolean = false,
+      numa_shuffling: Boolean = false,
+      max_jobs: Int = 1,
+      fresh_build: Boolean = false,
+      no_build: Boolean = false,
+      verbose: Boolean = false,
+      session_setup: (String, Session) => Unit = (_, _) => ()
     ): Context = {
       val sessions_structure = deps.sessions_structure
       val build_graph = sessions_structure.build_graph
@@ -113,7 +119,10 @@
             }
         }
 
-      new Context(store, deps, sessions, ordering, progress)
+      val numa_nodes = if (numa_shuffling) NUMA.nodes() else Nil
+      new Context(store, deps, sessions, ordering, progress, numa_nodes,
+        build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build,
+        no_build = no_build, verbose = verbose, session_setup)
     }
   }
 
@@ -122,20 +131,33 @@
     val deps: Sessions.Deps,
     sessions: Map[String, Session_Context],
     val ordering: Ordering[String],
-    val progress: Progress
+    val progress: Progress,
+    val numa_nodes: List[Int],
+    val build_heap: Boolean,
+    val max_jobs: Int,
+    val fresh_build: Boolean,
+    val no_build: Boolean,
+    val verbose: Boolean,
+    val session_setup: (String, Session) => Unit
   ) {
     def sessions_structure: Sessions.Structure = deps.sessions_structure
 
     def apply(session: String): Session_Context =
       sessions.getOrElse(session, Session_Context.empty(session, Time.zero))
 
-    def build_heap(session: String): Boolean =
-      Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session)
+    def do_store(session: String): Boolean =
+      build_heap || Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session)
   }
 
 
   /* main */
 
+  case class Entry(name: String, deps: List[String]) {
+    def is_ready: Boolean = deps.isEmpty
+    def resolve(dep: String): Entry =
+      if (deps.contains(dep)) copy(deps = deps.filterNot(_ == dep)) else this
+  }
+
   case class Result(
     current: Boolean,
     output_heap: SHA1.Shasum,
@@ -143,22 +165,24 @@
   ) {
     def ok: Boolean = process_result.ok
   }
+
+  def session_finished(session_name: String, process_result: Process_Result): String =
+    "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
+
+  def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = {
+    val props = build_log.session_timing
+    val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
+    val timing = Markup.Timing_Properties.get(props)
+    "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")"
+  }
 }
 
-class Build_Process(
-  build_context: Build_Process.Context,
-  build_heap: Boolean = false,
-  numa_shuffling: Boolean = false,
-  max_jobs: Int = 1,
-  fresh_build: Boolean = false,
-  no_build: Boolean = false,
-  verbose: Boolean = false,
-  session_setup: (String, Session) => Unit = (_, _) => ()
-) {
+class Build_Process(build_context: Build_Process.Context) {
   private val store = build_context.store
   private val build_options = store.options
   private val build_deps = build_context.deps
   private val progress = build_context.progress
+  private val verbose = build_context.verbose
 
   private val log =
     build_options.string("system_log") match {
@@ -168,33 +192,43 @@
     }
 
   // global state
-  private val _numa_nodes = new NUMA.Nodes(numa_shuffling)
-  private var _build_graph = build_context.sessions_structure.build_graph
-  private var _build_order = SortedSet.from(_build_graph.keys)(build_context.ordering)
+  private var _numa_index = 0
+  private var _pending: List[Build_Process.Entry] =
+    (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator)
+      yield Build_Process.Entry(name, preds.toList)).toList
   private var _running = Map.empty[String, Build_Job]
   private var _results = Map.empty[String, Build_Process.Result]
 
+  private def test_pending(): Boolean = synchronized { _pending.nonEmpty }
+
   private def remove_pending(name: String): Unit = synchronized {
-    _build_graph = _build_graph.del_node(name)
-    _build_order = _build_order - name
+    _pending = _pending.flatMap(entry => if (entry.name == name) None else Some(entry.resolve(name)))
   }
 
   private def next_pending(): Option[String] = synchronized {
-    if (_running.size < (max_jobs max 1)) {
-      _build_order.iterator
-        .dropWhile(name => _running.isDefinedAt(name) || !_build_graph.is_minimal(name))
-        .nextOption()
+    if (_running.size < (build_context.max_jobs max 1)) {
+      _pending.filter(entry => entry.is_ready && !_running.isDefinedAt(entry.name))
+        .sortBy(_.name)(build_context.ordering)
+        .headOption.map(_.name)
     }
     else None
   }
 
   private def next_numa_node(): Option[Int] = synchronized {
-    _numa_nodes.next(used =
-      Set.from(for { job <- _running.valuesIterator; i <- job.numa_node } yield i))
+    val available = build_context.numa_nodes.zipWithIndex
+    if (available.isEmpty) None
+    else {
+      val used = Set.from(for (job <- _running.valuesIterator; i <- job.numa_node) yield i)
+      val index = _numa_index
+      val candidates = available.drop(index) ::: available.take(index)
+      val (n, i) =
+        candidates.find({ case (n, i) => i == index && !used(n) }) orElse
+        candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
+      _numa_index = (i + 1) % available.length
+      Some(n)
+    }
   }
 
-  private def test_running(): Boolean = synchronized { !_build_graph.is_empty }
-
   private def stop_running(): Unit = synchronized { _running.valuesIterator.foreach(_.terminate()) }
 
   private def finished_running(): List[Build_Job.Build_Session] = synchronized {
@@ -227,16 +261,6 @@
     names.map(_results.apply)
   }
 
-  private def session_finished(session_name: String, process_result: Process_Result): String =
-    "Finished " + session_name + " (" + process_result.timing.message_resources + ")"
-
-  private def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = {
-    val props = build_log.session_timing
-    val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
-    val timing = Markup.Timing_Properties.get(props)
-    "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")"
-  }
-
   private def finish_job(job: Build_Job.Build_Session): Unit = {
     val session_name = job.session_name
     val process_result = job.join
@@ -278,8 +302,8 @@
     process_result.err_lines.foreach(progress.echo)
 
     if (process_result.ok) {
-      if (verbose) progress.echo(session_timing(session_name, build_log))
-      progress.echo(session_finished(session_name, process_result))
+      if (verbose) progress.echo(Build_Process.session_timing(session_name, build_log))
+      progress.echo(Build_Process.session_finished(session_name, process_result))
     }
     else {
       progress.echo(session_name + " FAILED")
@@ -304,7 +328,7 @@
       }
       else SHA1.flat_shasum(ancestor_results.map(_.output_heap))
 
-    val do_store = build_heap || build_context.build_heap(session_name)
+    val do_store = build_context.do_store(session_name)
     val (current, output_heap) = {
       store.try_open_database(session_name) match {
         case Some(db) =>
@@ -312,7 +336,7 @@
             case Some(build) =>
               val output_heap = store.find_heap_shasum(session_name)
               val current =
-                !fresh_build &&
+                !build_context.fresh_build &&
                 build.ok &&
                 build.sources == build_deps.sources_shasum(session_name) &&
                 build.input_heaps == input_heaps &&
@@ -332,7 +356,7 @@
         add_result(session_name, true, output_heap, Process_Result.ok)
       }
     }
-    else if (no_build) {
+    else if (build_context.no_build) {
       progress.echo_if(verbose, "Skipping " + session_name + " ...")
       synchronized {
         remove_pending(session_name)
@@ -356,7 +380,7 @@
           val numa_node = next_numa_node()
           job_running(session_name,
             new Build_Job.Build_Session(progress, session_background, store, do_store,
-              resources, session_setup, input_heaps, numa_node))
+              resources, build_context.session_setup, input_heaps, numa_node))
         }
       job.start()
     }
@@ -374,18 +398,25 @@
       build_options.seconds("editor_input_delay").sleep()
     }
 
-  def run(): Map[String, Build_Process.Result] = {
-    while (test_running()) {
-      if (progress.stopped) stop_running()
+  def run(): Map[String, Process_Result] = {
+    if (test_pending()) {
+      while (test_pending()) {
+        if (progress.stopped) stop_running()
+
+        for (job <- finished_running()) finish_job(job)
 
-      for (job <- finished_running()) finish_job(job)
-
-      next_pending() match {
-        case Some(session_name) => start_job(session_name)
-        case None => sleep()
+        next_pending() match {
+          case Some(session_name) => start_job(session_name)
+          case None => sleep()
+        }
+      }
+      synchronized {
+        for ((name, result) <- _results) yield name -> result.process_result
       }
     }
-
-    synchronized { _results }
+    else {
+      progress.echo_warning("Nothing to build")
+      Map.empty[String, Process_Result]
+    }
   }
 }
--- a/src/Pure/Tools/dump.scala	Mon Feb 20 13:59:42 2023 +0100
+++ b/src/Pure/Tools/dump.scala	Mon Feb 20 21:53:15 2023 +0100
@@ -97,9 +97,8 @@
       skip_base: Boolean = false
     ): Context = {
       val session_options: Options = {
-        val options0 = if (NUMA.enabled) NUMA.policy_options(options) else options
         val options1 =
-          options0 +
+          NUMA.perhaps_policy_options(options) +
             "parallel_proofs=0" +
             "completion_limit=0" +
             "editor_tracing_messages=0"