# HG changeset patch # User wenzelm # Date 1676979728 -3600 # Node ID 0231e62956a6e46e536c06407fda6c91a1534c15 # Parent 0bec014c0f9fe187ad2bc99d8ce15b55c4d81779 clarified state: more explicit type as plain value, which is also easier to sync with external db; diff -r 0bec014c0f9f -r 0231e62956a6 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Tue Feb 21 12:13:35 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Tue Feb 21 12:42:08 2023 +0100 @@ -13,7 +13,7 @@ object Build_Process { - /* static information */ + /* static context */ object Session_Context { def empty(session: String, timeout: Time): Session_Context = @@ -150,7 +150,7 @@ } - /* main */ + /* dynamic state */ case class Entry(name: String, deps: List[String]) { def is_ready: Boolean = deps.isEmpty @@ -166,6 +166,55 @@ def ok: Boolean = process_result.ok } + sealed case class State( + numa_index: Int = 0, + pending: List[Entry] = Nil, + running: Map[String, Build_Job] = Map.empty, + results: Map[String, Build_Process.Result] = Map.empty + ) { + def is_pending: Boolean = pending.nonEmpty + + def remove_pending(name: String): State = + copy(pending = pending.flatMap( + entry => if (entry.name == name) None else Some(entry.resolve(name)))) + + def is_running(name: String): Boolean = running.isDefinedAt(name) + + def numa_running: Set[Int] = + Set.from(for (job <- running.valuesIterator; i <- job.numa_node) yield i) + + def stop_running(): Unit = running.valuesIterator.foreach(_.terminate()) + + def finished_running(): List[Build_Job.Build_Session] = + List.from( + running.valuesIterator.flatMap { + case job: Build_Job.Build_Session if job.is_finished => Some(job) + case _ => None + }) + + def add_running(name: String, job: Build_Job): State = + copy(running = running + (name -> job)) + + def remove_running(name: String): State = + copy(running = running - name) + + def add_result( + name: String, + current: Boolean, + output_heap: SHA1.Shasum, + process_result: Process_Result + ): State = { + val result = Build_Process.Result(current, output_heap, process_result) + copy(results = results + (name -> result)) + } + + def get_results(names: List[String]): List[Build_Process.Result] = + names.map(results.apply) + } + + + /* main process */ + def session_finished(session_name: String, process_result: Process_Result): String = "Finished " + session_name + " (" + process_result.timing.message_resources + ")" @@ -192,24 +241,18 @@ } // global state - protected var _numa_index = 0 - protected var _pending: List[Build_Process.Entry] = init_pending() - protected var _running = Map.empty[String, Build_Job] - protected var _results = Map.empty[String, Build_Process.Result] + protected var _state = init_state() - protected def init_pending(): List[Build_Process.Entry] = - (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator) - yield Build_Process.Entry(name, preds.toList)).toList + protected def init_state(): Build_Process.State = + Build_Process.State(pending = + (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator) + yield Build_Process.Entry(name, preds.toList)).toList) - protected def is_pending(): Boolean = synchronized { _pending.nonEmpty } - - protected def remove_pending(name: String): Unit = synchronized { - _pending = _pending.flatMap(entry => if (entry.name == name) None else Some(entry.resolve(name))) - } + protected def is_pending(): Boolean = synchronized { _state.is_pending } protected def next_pending(): Option[String] = synchronized { - if (_running.size < (build_context.max_jobs max 1)) { - _pending.filter(entry => entry.is_ready && !_running.isDefinedAt(entry.name)) + if (_state.running.size < (build_context.max_jobs max 1)) { + _state.pending.filter(entry => entry.is_ready && !_state.is_running(entry.name)) .sortBy(_.name)(build_context.ordering) .headOption.map(_.name) } @@ -220,51 +263,28 @@ val available = build_context.numa_nodes.zipWithIndex if (available.isEmpty) None else { - val used = Set.from(for (job <- _running.valuesIterator; i <- job.numa_node) yield i) - val index = _numa_index + val used = _state.numa_running + val index = _state.numa_index val candidates = available.drop(index) ::: available.take(index) val (n, i) = candidates.find({ case (n, i) => i == index && !used(n) }) orElse candidates.find({ case (n, _) => !used(n) }) getOrElse candidates.head - _numa_index = (i + 1) % available.length + _state = _state.copy(numa_index = (i + 1) % available.length) Some(n) } } - protected def stop_running(): Unit = synchronized { - _running.valuesIterator.foreach(_.terminate()) - } + protected def stop_running(): Unit = synchronized { _state.stop_running() } protected def finished_running(): List[Build_Job.Build_Session] = synchronized { - List.from( - _running.valuesIterator.flatMap { - case job: Build_Job.Build_Session if job.is_finished => Some(job) - case _ => None - }) + _state.finished_running() } protected def job_running(name: String, job: Build_Job): Build_Job = synchronized { - _running += (name -> job) + _state = _state.add_running(name, job) job } - protected def remove_running(name: String): Unit = synchronized { - _running -= name - } - - protected def add_result( - name: String, - current: Boolean, - output_heap: SHA1.Shasum, - process_result: Process_Result - ): Unit = synchronized { - _results += (name -> Build_Process.Result(current, output_heap, process_result)) - } - - protected def get_results(names: List[String]): List[Build_Process.Result] = synchronized { - names.map(_results.apply) - } - protected def finish_job(job: Build_Job.Build_Session): Unit = { val session_name = job.session_name val process_result = job.join @@ -315,17 +335,19 @@ } synchronized { - remove_pending(session_name) - remove_running(session_name) - add_result(session_name, false, output_heap, process_result_tail) + _state = _state. + remove_pending(session_name). + remove_running(session_name). + add_result(session_name, false, output_heap, process_result_tail) } } protected def start_job(session_name: String): Unit = { - val ancestor_results = - get_results( + val ancestor_results = synchronized { + _state.get_results( build_deps.sessions_structure.build_requirements(List(session_name)). filterNot(_ == session_name)) + } val input_heaps = if (ancestor_results.isEmpty) { SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE"))) @@ -356,15 +378,17 @@ if (all_current) { synchronized { - remove_pending(session_name) - add_result(session_name, true, output_heap, Process_Result.ok) + _state = _state. + remove_pending(session_name). + add_result(session_name, true, output_heap, Process_Result.ok) } } else if (build_context.no_build) { progress.echo_if(verbose, "Skipping " + session_name + " ...") synchronized { - remove_pending(session_name) - add_result(session_name, false, output_heap, Process_Result.error) + _state = _state. + remove_pending(session_name). + add_result(session_name, false, output_heap, Process_Result.error) } } else if (ancestor_results.forall(_.ok) && !progress.stopped) { @@ -391,8 +415,9 @@ else { progress.echo(session_name + " CANCELLED") synchronized { - remove_pending(session_name) - add_result(session_name, false, output_heap, Process_Result.undefined) + _state = _state. + remove_pending(session_name). + add_result(session_name, false, output_heap, Process_Result.undefined) } } } @@ -415,7 +440,7 @@ } } synchronized { - for ((name, result) <- _results) yield name -> result.process_result + for ((name, result) <- _state.results) yield name -> result.process_result } } else {