--- a/src/Pure/Tools/build_process.scala Tue Feb 21 12:13:35 2023 +0100
+++ b/src/Pure/Tools/build_process.scala Tue Feb 21 12:42:08 2023 +0100
@@ -13,7 +13,7 @@
object Build_Process {
- /* static information */
+ /* static context */
object Session_Context {
def empty(session: String, timeout: Time): Session_Context =
@@ -150,7 +150,7 @@
}
- /* main */
+ /* dynamic state */
case class Entry(name: String, deps: List[String]) {
def is_ready: Boolean = deps.isEmpty
@@ -166,6 +166,55 @@
def ok: Boolean = process_result.ok
}
+ sealed case class State(
+ numa_index: Int = 0,
+ pending: List[Entry] = Nil,
+ running: Map[String, Build_Job] = Map.empty,
+ results: Map[String, Build_Process.Result] = Map.empty
+ ) {
+ def is_pending: Boolean = pending.nonEmpty
+
+ def remove_pending(name: String): State =
+ copy(pending = pending.flatMap(
+ entry => if (entry.name == name) None else Some(entry.resolve(name))))
+
+ def is_running(name: String): Boolean = running.isDefinedAt(name)
+
+ def numa_running: Set[Int] =
+ Set.from(for (job <- running.valuesIterator; i <- job.numa_node) yield i)
+
+ def stop_running(): Unit = running.valuesIterator.foreach(_.terminate())
+
+ def finished_running(): List[Build_Job.Build_Session] =
+ List.from(
+ running.valuesIterator.flatMap {
+ case job: Build_Job.Build_Session if job.is_finished => Some(job)
+ case _ => None
+ })
+
+ def add_running(name: String, job: Build_Job): State =
+ copy(running = running + (name -> job))
+
+ def remove_running(name: String): State =
+ copy(running = running - name)
+
+ def add_result(
+ name: String,
+ current: Boolean,
+ output_heap: SHA1.Shasum,
+ process_result: Process_Result
+ ): State = {
+ val result = Build_Process.Result(current, output_heap, process_result)
+ copy(results = results + (name -> result))
+ }
+
+ def get_results(names: List[String]): List[Build_Process.Result] =
+ names.map(results.apply)
+ }
+
+
+ /* main process */
+
def session_finished(session_name: String, process_result: Process_Result): String =
"Finished " + session_name + " (" + process_result.timing.message_resources + ")"
@@ -192,24 +241,18 @@
}
// global state
- protected var _numa_index = 0
- protected var _pending: List[Build_Process.Entry] = init_pending()
- protected var _running = Map.empty[String, Build_Job]
- protected var _results = Map.empty[String, Build_Process.Result]
+ protected var _state = init_state()
- protected def init_pending(): List[Build_Process.Entry] =
- (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator)
- yield Build_Process.Entry(name, preds.toList)).toList
+ protected def init_state(): Build_Process.State =
+ Build_Process.State(pending =
+ (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator)
+ yield Build_Process.Entry(name, preds.toList)).toList)
- protected def is_pending(): Boolean = synchronized { _pending.nonEmpty }
-
- protected def remove_pending(name: String): Unit = synchronized {
- _pending = _pending.flatMap(entry => if (entry.name == name) None else Some(entry.resolve(name)))
- }
+ protected def is_pending(): Boolean = synchronized { _state.is_pending }
protected def next_pending(): Option[String] = synchronized {
- if (_running.size < (build_context.max_jobs max 1)) {
- _pending.filter(entry => entry.is_ready && !_running.isDefinedAt(entry.name))
+ if (_state.running.size < (build_context.max_jobs max 1)) {
+ _state.pending.filter(entry => entry.is_ready && !_state.is_running(entry.name))
.sortBy(_.name)(build_context.ordering)
.headOption.map(_.name)
}
@@ -220,51 +263,28 @@
val available = build_context.numa_nodes.zipWithIndex
if (available.isEmpty) None
else {
- val used = Set.from(for (job <- _running.valuesIterator; i <- job.numa_node) yield i)
- val index = _numa_index
+ val used = _state.numa_running
+ val index = _state.numa_index
val candidates = available.drop(index) ::: available.take(index)
val (n, i) =
candidates.find({ case (n, i) => i == index && !used(n) }) orElse
candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head
- _numa_index = (i + 1) % available.length
+ _state = _state.copy(numa_index = (i + 1) % available.length)
Some(n)
}
}
- protected def stop_running(): Unit = synchronized {
- _running.valuesIterator.foreach(_.terminate())
- }
+ protected def stop_running(): Unit = synchronized { _state.stop_running() }
protected def finished_running(): List[Build_Job.Build_Session] = synchronized {
- List.from(
- _running.valuesIterator.flatMap {
- case job: Build_Job.Build_Session if job.is_finished => Some(job)
- case _ => None
- })
+ _state.finished_running()
}
protected def job_running(name: String, job: Build_Job): Build_Job = synchronized {
- _running += (name -> job)
+ _state = _state.add_running(name, job)
job
}
- protected def remove_running(name: String): Unit = synchronized {
- _running -= name
- }
-
- protected def add_result(
- name: String,
- current: Boolean,
- output_heap: SHA1.Shasum,
- process_result: Process_Result
- ): Unit = synchronized {
- _results += (name -> Build_Process.Result(current, output_heap, process_result))
- }
-
- protected def get_results(names: List[String]): List[Build_Process.Result] = synchronized {
- names.map(_results.apply)
- }
-
protected def finish_job(job: Build_Job.Build_Session): Unit = {
val session_name = job.session_name
val process_result = job.join
@@ -315,17 +335,19 @@
}
synchronized {
- remove_pending(session_name)
- remove_running(session_name)
- add_result(session_name, false, output_heap, process_result_tail)
+ _state = _state.
+ remove_pending(session_name).
+ remove_running(session_name).
+ add_result(session_name, false, output_heap, process_result_tail)
}
}
protected def start_job(session_name: String): Unit = {
- val ancestor_results =
- get_results(
+ val ancestor_results = synchronized {
+ _state.get_results(
build_deps.sessions_structure.build_requirements(List(session_name)).
filterNot(_ == session_name))
+ }
val input_heaps =
if (ancestor_results.isEmpty) {
SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE")))
@@ -356,15 +378,17 @@
if (all_current) {
synchronized {
- remove_pending(session_name)
- add_result(session_name, true, output_heap, Process_Result.ok)
+ _state = _state.
+ remove_pending(session_name).
+ add_result(session_name, true, output_heap, Process_Result.ok)
}
}
else if (build_context.no_build) {
progress.echo_if(verbose, "Skipping " + session_name + " ...")
synchronized {
- remove_pending(session_name)
- add_result(session_name, false, output_heap, Process_Result.error)
+ _state = _state.
+ remove_pending(session_name).
+ add_result(session_name, false, output_heap, Process_Result.error)
}
}
else if (ancestor_results.forall(_.ok) && !progress.stopped) {
@@ -391,8 +415,9 @@
else {
progress.echo(session_name + " CANCELLED")
synchronized {
- remove_pending(session_name)
- add_result(session_name, false, output_heap, Process_Result.undefined)
+ _state = _state.
+ remove_pending(session_name).
+ add_result(session_name, false, output_heap, Process_Result.undefined)
}
}
}
@@ -415,7 +440,7 @@
}
}
synchronized {
- for ((name, result) <- _results) yield name -> result.process_result
+ for ((name, result) <- _state.results) yield name -> result.process_result
}
}
else {