--- a/src/Pure/Build/build_schedule.scala Tue Feb 13 17:18:50 2024 +0000
+++ b/src/Pure/Build/build_schedule.scala Tue Feb 13 17:18:57 2024 +0000
@@ -14,25 +14,11 @@
object Build_Schedule {
/* organized historic timing information (extracted from build logs) */
- case class Timing_Entry(job_name: String, hostname: String, threads: Int, timing: Timing) {
+ case class Result(job_name: String, hostname: String, threads: Int, timing: Timing) {
def elapsed: Time = timing.elapsed
def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms)
- case class Timing_Entries(entries: List[Timing_Entry]) {
- require(entries.nonEmpty)
- def is_empty = entries.isEmpty
- def size = entries.length
- lazy val by_job = entries.groupBy(_.job_name).view.mapValues(Timing_Entries(_)).toMap
- lazy val by_threads = entries.groupBy(_.threads).view.mapValues(Timing_Entries(_)).toMap
- lazy val by_hostname = entries.groupBy(_.hostname).view.mapValues(Timing_Entries(_)).toMap
- def mean_time: Time = Timing_Data.mean_time(entries.map(_.elapsed))
- def best_entry: Timing_Entry = entries.minBy(_.elapsed.ms)
- }
object Timing_Data {
def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2)
@@ -44,8 +30,8 @@
val baseline = Time.minutes(5).scale(host_factor)
val gc = Time.seconds(10).scale(host_factor)
- Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)),
- Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
+ Result("dummy", host.name, 1, Timing(baseline, baseline, gc)),
+ Result("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
def make(
@@ -73,10 +59,10 @@
measurements.groupMap(_._1)(_._2).toList.map {
case ((job_name, hostname, threads), timings) =>
- Timing_Entry(job_name, hostname, threads, median_timing(timings))
+ Result(job_name, hostname, threads, median_timing(timings))
- new Timing_Data(Timing_Entries(entries), host_infos)
+ new Timing_Data(new Facet(entries), host_infos)
def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = {
@@ -91,18 +77,41 @@
make(host_infos, build_history)
+ /* data facets */
+ object Facet {
+ def unapply(facet: Facet): Option[List[Result]] = Some(facet.results)
+ }
+ class Facet private[Timing_Data](val results: List[Result]) {
+ require(results.nonEmpty)
+ def is_empty = results.isEmpty
+ def size = results.length
+ lazy val by_job = results.groupBy(_.job_name).view.mapValues(new Facet(_)).toMap
+ lazy val by_threads = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap
+ lazy val by_hostname = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap
+ def mean_time: Time = Timing_Data.mean_time(results.map(_.elapsed))
+ def best_result: Result = results.minBy(_.elapsed.ms)
+ }
- class Timing_Data private(data: Timing_Entries, val host_infos: Host_Infos) {
+ class Timing_Data private(facet: Timing_Data.Facet, val host_infos: Host_Infos) {
private def inflection_point(last_mono: Int, next: Int): Int =
last_mono + ((next - last_mono) / 2)
def best_threads(job_name: String, max_threads: Int): Int = {
val worse_threads =
- data.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
- case (hostname, data) =>
- val best_threads = data.best_entry.threads
- data.by_threads.keys.toList.sorted.find(_ > best_threads).map(
+ facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
+ case (hostname, facet) =>
+ val best_threads = facet.best_result.threads
+ facet.by_threads.keys.toList.sorted.find(_ > best_threads).map(
inflection_point(best_threads, _))
(max_threads :: worse_threads).min
@@ -170,31 +179,31 @@
private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = {
- def unify(hostname: String, data: Timing_Entries) =
- data.mean_time.scale(hostname_factor(hostname, on_host))
+ def unify(hostname: String, facet: Timing_Data.Facet) =
+ facet.mean_time.scale(hostname_factor(hostname, on_host))
for {
- data <- data.by_job.get(job_name).toList
- (threads, data) <- data.by_threads
- entries = data.by_hostname.toList.map(unify)
+ facet <- facet.by_job.get(job_name).toList
+ (threads, facet) <- facet.by_threads
+ entries = facet.by_hostname.toList.map(unify)
} yield threads -> Timing_Data.median_time(entries)
def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = {
- def try_approximate(data: Timing_Entries): Option[Time] = {
+ def try_approximate(facet: Timing_Data.Facet): Option[Time] = {
val entries =
- data.by_threads.toList match {
- case List((i, Timing_Entries(List(entry)))) if i != 1 =>
- (i, data.mean_time) :: entry.proper_cpu.map(1 -> _).toList
- case entries => entries.map((threads, data) => threads -> data.mean_time)
+ facet.by_threads.toList match {
+ case List((i, Timing_Data.Facet(List(result)))) if i != 1 =>
+ (i, facet.mean_time) :: result.proper_cpu.map(1 -> _).toList
+ case entries => entries.map((threads, facet) => threads -> facet.mean_time)
if (entries.size < 2) None else Some(approximate_threads(entries, threads))
for {
- data <- data.by_job.get(job_name)
- data <- data.by_hostname.get(hostname)
- time <- data.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(data))
+ facet <- facet.by_job.get(job_name)
+ facet <- facet.by_hostname.get(hostname)
+ time <- facet.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(facet))
} yield time
@@ -203,8 +212,8 @@
val estimates =
for {
- (hostname, data) <- data.by_hostname
- job_name <- data.by_job.keys
+ (hostname, facet) <- facet.by_hostname
+ job_name <- facet.by_job.keys
from_time <- estimate_threads(job_name, hostname, from)
to_time <- estimate_threads(job_name, hostname, to)
} yield from_time.ms.toDouble / to_time.ms
@@ -214,8 +223,8 @@
// unify hosts
val estimates =
for {
- (job_name, data) <- data.by_job
- hostname = data.by_hostname.keys.head
+ (job_name, facet) <- facet.by_job
+ hostname = facet.by_hostname.keys.head
entries = unify_hosts(job_name, hostname)
if entries.length > 1
} yield
@@ -239,26 +248,26 @@
def estimate(job_name: String, hostname: String, threads: Int): Time = {
def estimate: Time =
- data.by_job.get(job_name) match {
+ facet.by_job.get(job_name) match {
case None =>
// no data for job, take average of other jobs for given threads
- val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
+ val job_estimates = facet.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
else {
// no other job to estimate from, use global curve to approximate any other job
- val (threads1, data1) = data.by_threads.head
- data1.mean_time.scale(global_threads_factor(threads1, threads))
+ val (threads1, facet1) = facet.by_threads.head
+ facet1.mean_time.scale(global_threads_factor(threads1, threads))
- case Some(data) =>
- data.by_threads.get(threads) match {
+ case Some(facet) =>
+ facet.by_threads.get(threads) match {
case None => // interpolate threads
estimate_threads(job_name, hostname, threads).map(_.scale(
// per machine, try to approximate config for threads
val approximated =
for {
- hostname1 <- data.by_hostname.keys
+ hostname1 <- facet.by_hostname.keys
estimate <- estimate_threads(job_name, hostname1, threads)
factor = hostname_factor(hostname1, hostname)
} yield estimate.scale(factor)
@@ -281,11 +290,11 @@
- case Some(data) => // time for job/thread exists, interpolate machine if necessary
- data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
+ case Some(facet) => // time for job/thread exists, interpolate machine if necessary
+ facet.by_hostname.get(hostname).map(_.mean_time).getOrElse {
- data.by_hostname.toList.map((hostname1, data) =>
- data.mean_time.scale(hostname_factor(hostname1, hostname)))).scale(
+ facet.by_hostname.toList.map((hostname1, facet) =>
+ facet.mean_time.scale(hostname_factor(hostname1, hostname)))).scale(
@@ -347,8 +356,8 @@
def available(state: Build_Process.State): Resources = {
val allocated =
- state.running.values.map(_.node_info).groupMapReduce(the_host)(List(_))(_ ::: _)
- Resources(this, allocated)
+ state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _)
+ new Resources(this, allocated)
@@ -360,9 +369,9 @@
Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
- case class Resources(
- host_infos: Host_Infos,
- allocated_nodes: Map[Host, List[Node_Info]]
+ class Resources(
+ val host_infos: Host_Infos,
+ allocated_nodes: Map[String, List[Node_Info]]
) {
def unused_nodes(host: Host, threads: Int): List[Node_Info] =
if (!available(host, threads)) Nil
@@ -374,11 +383,11 @@
def unused_nodes(threads: Int): List[Node_Info] =
host_infos.hosts.flatMap(unused_nodes(_, threads))
- def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil)
+ def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host.name, Nil)
- def allocate(node: Node_Info): Resources = {
- val host = host_infos.the_host(node)
- copy(allocated_nodes = allocated_nodes + (host -> (node :: allocated(host))))
+ def allocate(node_info: Node_Info): Resources = {
+ val host = host_infos.the_host(node_info)
+ new Resources(host_infos, allocated_nodes + (host.name -> (node_info :: allocated(host))))
def try_allocate_tasks(
@@ -538,7 +547,8 @@
val host_preds =
for {
- (name, (pred_node, _)) <- finished.graph.iterator.toSet
+ name <- finished.graph.keys
+ pred_node = finished.graph.get_node(name)
if pred_node.node_info.hostname == job.node_info.hostname
if pred_node.end.time <= node.start.time
} yield name
@@ -556,35 +566,49 @@
def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
- trait Scheduler { def build_schedule(build_state: Build_Process.State): Schedule }
+ trait Scheduler { def schedule(build_state: Build_Process.State): Schedule }
+ trait Priority_Rule { def select_next(state: Build_Process.State): List[Config] }
- abstract class Heuristic(timing_data: Timing_Data, build_uuid: String)
- extends Scheduler {
- val host_infos = timing_data.host_infos
- val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
- def next(state: Build_Process.State): List[Config]
- def build_schedule(build_state: Build_Process.State): Schedule = {
+ case class Generation_Scheme(
+ priority_rule: Priority_Rule,
+ timing_data: Timing_Data,
+ build_uuid: String
+ ) extends Scheduler {
+ def schedule(build_state: Build_Process.State): Schedule = {
def simulate(state: State): State =
if (state.is_finished) state
else {
- val state1 = next(state.build_state).foldLeft(state)(_.start(_)).step(timing_data)
+ val state1 =
+ priority_rule
+ .select_next(state.build_state)
+ .foldLeft(state)(_.start(_))
+ .step(timing_data)
val start = Date.now()
+ val name = "generation scheme (" + priority_rule + ")"
val end_state =
- simulate(State(build_state, start.time, Schedule(build_uuid, toString, start, Graph.empty)))
+ simulate(State(build_state, start.time, Schedule(build_uuid, name, start, Graph.empty)))
- class Default_Heuristic(timing_data: Timing_Data, options: Options, build_uuid: String)
- extends Heuristic(timing_data, build_uuid) {
- override def toString: String = "default build heuristic"
+ case class Optimizer(schedulers: List[Scheduler]) extends Scheduler {
+ require(schedulers.nonEmpty)
+ def schedule(state: Build_Process.State): Schedule =
+ schedulers.map(_.schedule(state)).minBy(_.duration.ms)
+ }
+ /* priority rules */
+ class Default_Heuristic(host_infos: Host_Infos, options: Options) extends Priority_Rule {
+ override def toString: String = "default heuristic"
def host_threads(host: Host): Int = {
val m = (options ++ host.build.options).int("threads")
@@ -594,7 +618,7 @@
def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] =
sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _))
- def next(state: Build_Process.State): List[Config] = {
+ def select_next(state: Build_Process.State): List[Config] = {
val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name)
val resources = host_infos.available(state)
@@ -607,89 +631,6 @@
- class Meta_Heuristic(heuristics: List[Heuristic]) extends Scheduler {
- require(heuristics.nonEmpty)
- def best_result(state: Build_Process.State): (Heuristic, Schedule) =
- heuristics.map(heuristic =>
- heuristic -> heuristic.build_schedule(state)).minBy(_._2.duration.ms)
- def next(state: Build_Process.State): List[Config] = best_result(state)._1.next(state)
- def build_schedule(state: Build_Process.State): Schedule = best_result(state)._2
- }
- /* heuristics */
- abstract class Path_Heuristic(
- timing_data: Timing_Data,
- sessions_structure: Sessions.Structure,
- max_threads_limit: Int,
- build_uuid: String
- ) extends Heuristic(timing_data, build_uuid) {
- /* pre-computed properties for efficient heuristic */
- val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
- type Node = String
- val build_graph = sessions_structure.build_graph
- val minimals = build_graph.minimals
- val maximals = build_graph.maximals
- def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
- val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap
- def best_time(node: Node): Time = {
- val host = ordered_hosts.last
- val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus
- timing_data.estimate(node, host.name, threads)
- }
- val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
- val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
- def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node)
- def max_time(task: Build_Process.Task): Time = max_time(task.name)
- def path_times(minimals: List[Node]): Map[Node, Time] = {
- def time_ms(node: Node): Long = best_times(node).ms
- val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals)
- path_times_ms.view.mapValues(Time.ms).toMap
- }
- def path_max_times(minimals: List[Node]): Map[Node, Time] =
- path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
- def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
- def start(node: Node): (Node, Time) = node -> best_times(node)
- def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
- node -> (time - elapsed)
- def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
- if (running.isEmpty) (0, running)
- else {
- def get_next(node: Node): List[Node] =
- build_graph.imm_succs(node).filter(pred).filter(
- build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList
- val (next, elapsed) = running.minBy(_._2.ms)
- val (remaining, finished) =
- running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
- val running1 =
- remaining.map(pass_time(elapsed)).toMap ++
- finished.map(_._1).flatMap(get_next).map(start)
- val (res, running2) = parallel_paths(running1)
- (res max running.size, running2)
- }
- parallel_paths(running.toMap)._1
- }
- }
object Path_Time_Heuristic {
sealed trait Critical_Criterion
case class Absolute_Time(time: Time) extends Critical_Criterion {
@@ -725,9 +666,8 @@
host_criterion: Path_Time_Heuristic.Host_Criterion,
timing_data: Timing_Data,
sessions_structure: Sessions.Structure,
- build_uuid: String,
max_threads_limit: Int = 8
- ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit, build_uuid) {
+ ) extends Priority_Rule {
import Path_Time_Heuristic.*
override def toString: Node = {
@@ -739,11 +679,85 @@
"path time heuristic (" + params.mkString(", ") + ")"
- def next(state: Build_Process.State): List[Config] = {
+ /* pre-computed properties for efficient heuristic */
+ val host_infos = timing_data.host_infos
+ val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
+ val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
+ type Node = String
+ val build_graph = sessions_structure.build_graph
+ val minimals = build_graph.minimals
+ val maximals = build_graph.maximals
+ def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
+ val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap
+ val best_threads =
+ build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap
+ def best_time(node: Node): Time = {
+ val host = ordered_hosts.last
+ val threads = best_threads(node) min host.info.num_cpus
+ timing_data.estimate(node, host.name, threads)
+ }
+ val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
+ val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
+ def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node)
+ def max_time(task: Build_Process.Task): Time = max_time(task.name)
+ def path_times(minimals: List[Node]): Map[Node, Time] = {
+ def time_ms(node: Node): Long = best_times(node).ms
+ val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals)
+ path_times_ms.view.mapValues(Time.ms).toMap
+ }
+ def path_max_times(minimals: List[Node]): Map[Node, Time] =
+ path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
+ val node_degrees =
+ build_graph.keys.map(node => node -> build_graph.imm_succs(node).size).toMap
+ def parallel_paths(
+ running: List[(Node, Time)],
+ nodes: Set[Node] = build_graph.keys.toSet,
+ max: Int = Int.MaxValue
+ ): Int =
+ if (nodes.nonEmpty && nodes.map(node_degrees.apply).max > max) max
+ else {
+ def start(node: Node): (Node, Time) = node -> best_times(node)
+ def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
+ node -> (time - elapsed)
+ def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
+ if (running.size >= max) (max, running)
+ else if (running.isEmpty) (0, running)
+ else {
+ def get_next(node: Node): List[Node] =
+ build_graph.imm_succs(node).intersect(nodes).filter(
+ build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList
+ val (next, elapsed) = running.minBy(_._2.ms)
+ val (remaining, finished) =
+ running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
+ val running1 =
+ remaining.map(pass_time(elapsed)).toMap ++
+ finished.map(_._1).flatMap(get_next).map(start)
+ val (res, running2) = parallel_paths(running1)
+ (res max running.size, running2)
+ }
+ parallel_paths(running.toMap)._1
+ }
+ def select_next(state: Build_Process.State): List[Config] = {
val resources = host_infos.available(state)
- def best_threads(task: Build_Process.Task): Int =
- timing_data.best_threads(task.name, max_threads)
+ def best_threads(task: Build_Process.Task): Int = this.best_threads(task.name)
val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
@@ -754,7 +768,7 @@
def remaining_time(node: Node): (Node, Time) =
state.running.get(node) match {
- case None => node -> best_time(node)
+ case None => node -> best_times(node)
case Some(job) =>
val estimate =
timing_data.estimate(job.name, job.node_info.hostname,
@@ -762,10 +776,13 @@
node -> ((Time.now() - job.start_date.time + estimate) max Time.zero)
- val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time))
val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse
+ val is_parallelizable =
+ available_nodes.length >= parallel_paths(
+ state.ready.map(_.name).map(remaining_time),
+ max = available_nodes.length + 1)
- if (max_parallel <= available_nodes.length) {
+ if (is_parallelizable) {
val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task)))
resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
@@ -787,13 +804,13 @@
def parallel_threads(task: Build_Process.Task): Int =
this.parallel_threads match {
case Fixed_Thread(threads) => threads
- case Time_Based_Threads(f) => f(best_time(task.name))
+ case Time_Based_Threads(f) => f(best_times(task.name))
val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))
val max_critical_parallel =
- parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains)
+ parallel_paths(critical_minimals.map(remaining_time), critical_nodes)
val max_critical_hosts =
@@ -1002,7 +1019,7 @@
else {
val start = Time.now()
- val new_schedule = scheduler.build_schedule(state).update(state)
+ val new_schedule = scheduler.schedule(state).update(state)
val schedule =
if (_schedule.is_empty) new_schedule
else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering)
@@ -1230,11 +1247,10 @@
parallel <- parallel_threads
machine_split <- machine_splits
} yield
- Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure,
- context.build_uuid)
- val default_heuristic =
- Default_Heuristic(timing_data, context.build_options, context.build_uuid)
- new Meta_Heuristic(default_heuristic :: path_time_heuristics)
+ Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure)
+ val default_heuristic = Default_Heuristic(timing_data.host_infos, context.build_options)
+ val heuristics = default_heuristic :: path_time_heuristics
+ Optimizer(heuristics.map(Generation_Scheme(_, timing_data, context.build_uuid)))
override def open_build_process(
@@ -1318,7 +1334,8 @@
def schedule_msg(res: Exn.Result[Schedule]): String =
res match { case Exn.Res(schedule) => schedule.message case _ => "" }
- Timing.timeit(scheduler.build_schedule(build_state), schedule_msg, output = progress.echo(_))
+ progress.echo("Building schedule...")
+ Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_))
using(store.open_server()) { server =>