--- a/src/Pure/Tools/build_schedule.scala Fri Nov 24 20:58:12 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala Fri Nov 24 20:58:29 2023 +0100
@@ -13,104 +13,186 @@
object Build_Schedule {
val engine_name = "build_schedule"
- object Config {
- def from_job(job: Build_Process.Job): Config = Config(job.name, job.node_info)
- }
-
- case class Config(job_name: String, node_info: Node_Info) {
- def job_of(start_time: Time): Build_Process.Job =
- Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
- }
/* organized historic timing information (extracted from build logs) */
case class Timing_Entry(job_name: String, hostname: String, threads: Int, elapsed: Time)
+ case class Timing_Entries(entries: List[Timing_Entry]) {
+ require(entries.nonEmpty)
- class Timing_Data private(data: List[Timing_Entry], val host_infos: Host_Infos) {
- require(data.nonEmpty)
+ def is_empty = entries.isEmpty
+ def size = entries.length
- def is_empty = data.isEmpty
- def size = data.length
+ lazy val by_job = entries.groupBy(_.job_name).view.mapValues(Timing_Entries(_)).toMap
+ lazy val by_threads = entries.groupBy(_.threads).view.mapValues(Timing_Entries(_)).toMap
+ lazy val by_hostname = entries.groupBy(_.hostname).view.mapValues(Timing_Entries(_)).toMap
+
+ def mean_time: Time = Timing_Data.mean_time(entries.map(_.elapsed))
+ def best_entry: Timing_Entry = entries.minBy(_.elapsed.ms)
+ }
- private lazy val by_job =
- data.groupBy(_.job_name).view.mapValues(new Timing_Data(_, host_infos)).toMap
- private lazy val by_threads =
- data.groupBy(_.threads).view.mapValues(new Timing_Data(_, host_infos)).toMap
- private lazy val by_hostname =
- data.groupBy(_.hostname).view.mapValues(new Timing_Data(_, host_infos)).toMap
+ class Timing_Data private(data: Timing_Entries, val host_infos: Host_Infos) {
+ private def inflection_point(last_mono: Int, next: Int): Int =
+ last_mono + ((next - last_mono) / 2)
- def mean_time: Time = Timing_Data.mean_time(data.map(_.elapsed))
-
- private def best_entry: Timing_Entry = data.minBy(_.elapsed.ms)
-
- def best_threads(job_name: String): Option[Int] = by_job.get(job_name).map(_.best_entry.threads)
-
- def best_time(job_name: String): Time =
- by_job.get(job_name).map(_.best_entry.elapsed).getOrElse(
- estimate_config(Config(job_name, Node_Info(best_entry.hostname, None, Nil))))
+ def best_threads(job_name: String, max_threads: Int): Int = {
+ val worse_threads =
+ data.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
+ case (hostname, data) =>
+ val best_threads = data.best_entry.threads
+ data.by_threads.keys.toList.sorted.find(_ > best_threads).map(
+ inflection_point(best_threads, _))
+ }
+ (max_threads :: worse_threads).min
+ }
private def hostname_factor(from: String, to: String): Double =
host_infos.host_factor(host_infos.the_host(from), host_infos.the_host(to))
- def approximate_threads(threads: Int): Option[Time] = {
- val approximations =
- by_job.values.filter(_.size > 1).map { data =>
- val (ref_hostname, x0) =
- data.by_hostname.toList.flatMap((hostname, data) =>
- data.by_threads.keys.map(hostname -> _)).minBy((_, n) => Math.abs(n - threads))
+ private def approximate_threads(entries_unsorted: List[(Int, Time)], threads: Int): Time = {
+ val entries = entries_unsorted.sortBy(_._1)
+
+ def sorted_prefix[A](xs: List[A], f: A => Long): List[A] =
+ xs match {
+ case x1 :: x2 :: xs =>
+ if (f(x1) <= f(x2)) x1 :: sorted_prefix(x2 :: xs, f) else x1 :: Nil
+ case xs => xs
+ }
+
+ def linear(p0: (Int, Time), p1: (Int, Time)): Time = {
+ val a = (p1._2 - p0._2).scale(1.0 / (p1._1 - p0._1))
+ val b = p0._2 - a.scale(p0._1)
+ Time.ms((a.scale(threads) + b).ms max 0)
+ }
- def unify_hosts(data: Timing_Data): List[Time] =
- data.by_hostname.toList.map((hostname, data) =>
- data.mean_time.scale(hostname_factor(hostname, ref_hostname)))
+ val mono_prefix = sorted_prefix(entries, e => -e._2.ms)
+
+ val is_mono = entries == mono_prefix
+ val in_prefix = mono_prefix.length > 1 && threads <= mono_prefix.last._1
+ val in_inflection =
+ !is_mono && mono_prefix.length > 1 && threads < entries.drop(mono_prefix.length).head._1
+ if (is_mono || in_prefix || in_inflection) {
+ // Model with Amdahl's law
+ val t_p =
+ Timing_Data.median_time(for {
+ (n, t0) <- mono_prefix
+ (m, t1) <- mono_prefix
+ if m != n
+ } yield (t0 - t1).scale(n.toDouble * m / (m - n)))
+ val t_c =
+ Timing_Data.median_time(for ((n, t) <- mono_prefix) yield t - t_p.scale(1.0 / n))
- val entries =
- data.by_threads.toList.map((threads, data) =>
- threads -> Timing_Data.median_time(unify_hosts(data)))
+ def model(threads: Int): Time = t_c + t_p.scale(1.0 / threads)
+
+ if (is_mono || in_prefix) model(threads)
+ else {
+ val post_inflection = entries.drop(mono_prefix.length).head
+ val inflection_threads = inflection_point(mono_prefix.last._1, post_inflection._1)
- val y0 = data.by_hostname(ref_hostname).by_threads(x0).mean_time
- val (x1, y1_data) =
- data.by_hostname(ref_hostname).by_threads.toList.minBy((n, _) => Math.abs(n - threads))
- val y1 = y1_data.mean_time
+ if (threads <= inflection_threads) model(threads)
+ else linear((inflection_threads, model(inflection_threads)), post_inflection)
+ }
+ } else {
+ // Piecewise linear
+ val (p0, p1) =
+ if (entries.head._1 <= threads && threads <= entries.last._1) {
+ val split = entries.partition(_._1 <= threads)
+ (split._1.last, split._2.head)
+ } else {
+ val piece = if (threads < entries.head._1) entries.take(2) else entries.takeRight(2)
+ (piece.head, piece.last)
+ }
- val a = (y1.ms - y0.ms).toDouble / (x1 - x0)
- val b = y0.ms - a * x0
- Time.ms((a * threads + b).toLong)
- }
- if (approximations.isEmpty) None else Some(Timing_Data.mean_time(approximations))
+ linear(p0, p1)
+ }
+ }
+
+ private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = {
+ def unify(hostname: String, data: Timing_Entries) =
+ data.mean_time.scale(hostname_factor(hostname, on_host))
+
+ for {
+ data <- data.by_job.get(job_name).toList
+ (threads, data) <- data.by_threads
+ entries = data.by_hostname.toList.map(unify)
+ } yield threads -> Timing_Data.median_time(entries)
}
- def threads_factor(divided: Int, divisor: Int): Double =
- (approximate_threads(divided), approximate_threads(divisor)) match {
- case (Some(dividend), Some(divisor)) => dividend.ms.toDouble / divisor.ms
- case _ => divided.toDouble / divisor
- }
+ def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = {
+ for {
+ data <- data.by_job.get(job_name)
+ data <- data.by_hostname.get(hostname)
+ entries = data.by_threads.toList.map((threads, data) => threads -> data.mean_time)
+ time <- data.by_threads.get(threads).map(_.mean_time).orElse(
+ if (data.by_threads.size < 2) None else Some(approximate_threads(entries, threads)))
+ } yield time
+ }
+
+ def global_threads_factor(from: Int, to: Int): Double = {
+ def median(xs: Iterable[Double]): Double = xs.toList.sorted.apply(xs.size / 2)
+
+ val estimates =
+ for {
+ (hostname, data) <- data.by_hostname
+ job_name <- data.by_job.keys
+ from_time <- estimate_threads(job_name, hostname, from)
+ to_time <- estimate_threads(job_name, hostname, to)
+ } yield from_time.ms.toDouble / to_time.ms
- def estimate_config(config: Config): Time =
- by_job.get(config.job_name) match {
- case None => mean_time
+ if (estimates.nonEmpty) median(estimates)
+ else {
+ // unify hosts
+ val estimates =
+ for {
+ (job_name, data) <- data.by_job
+ hostname = data.by_hostname.keys.head
+ entries = unify_hosts(job_name, hostname)
+ if entries.length > 1
+ } yield
+ approximate_threads(entries, from).ms.toDouble / approximate_threads(entries, to).ms
+
+ if (estimates.nonEmpty) median(estimates)
+ else from.toDouble / to.toDouble
+ }
+ }
+
+ def estimate(job_name: String, hostname: String, threads: Int): Time =
+ data.by_job.get(job_name) match {
+ case None =>
+ // no data for job, take average of other jobs for given threads
+ val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
+ if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
+ else {
+ // no other job to estimate from, use global curve to approximate any other job
+ val (threads1, data1) = data.by_threads.head
+ data1.mean_time.scale(global_threads_factor(threads1, threads))
+ }
+
case Some(data) =>
- val hostname = config.node_info.hostname
- val threads = host_infos.num_threads(config.node_info)
data.by_threads.get(threads) match {
case None => // interpolate threads
- data.by_hostname.get(hostname).flatMap(
- _.approximate_threads(threads)).getOrElse {
- // per machine, try to approximate config for threads
- val approximated =
- data.by_hostname.toList.flatMap((hostname1, data) =>
- data.approximate_threads(threads).map(time =>
- time.scale(hostname_factor(hostname1, hostname))))
+ estimate_threads(job_name, hostname, threads).getOrElse {
+ // per machine, try to approximate config for threads
+ val approximated =
+ for {
+ hostname1 <- data.by_hostname.keys
+ estimate <- estimate_threads(job_name, hostname1, threads)
+ factor = hostname_factor(hostname1, hostname)
+ } yield estimate.scale(factor)
- if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
+ if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
+ else {
+ // no single machine where config can be approximated, unify data points
+ val unified_entries = unify_hosts(job_name, hostname)
+
+ if (unified_entries.length > 1) approximate_threads(unified_entries, threads)
else {
- // no machine where config can be approximated
- data.approximate_threads(threads).getOrElse {
- // only single data point, use global curve to approximate
- val global_factor = threads_factor(data.by_threads.keys.head, threads)
- data.by_threads.values.head.mean_time.scale(global_factor)
- }
+ // only single data point, use global curve to approximate
+ val (job_threads, job_time) = unified_entries.head
+ job_time.scale(global_threads_factor(job_threads, threads))
}
}
+ }
case Some(data) => // time for job/thread exists, interpolate machine
data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
@@ -123,7 +205,7 @@
}
object Timing_Data {
- def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).drop(obs.length / 2).head
+ def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).apply(obs.length / 2)
def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size)
private def dummy_entries(host: Host, host_factor: Double) =
@@ -159,7 +241,7 @@
Timing_Entry(job_name, hostname, threads, median_time(timings))
}
- new Timing_Data(entries, host_infos)
+ new Timing_Data(Timing_Entries(entries), host_infos)
}
}
@@ -186,6 +268,8 @@
}
class Host_Infos private(val hosts: List[Host]) {
+ require(hosts.nonEmpty)
+
private val by_hostname = hosts.map(host => host.info.hostname -> host).toMap
def host_factor(from: Host, to: Host): Double =
@@ -210,7 +294,16 @@
}
- /* offline tracking of resource allocations */
+ /* offline tracking of job configurations and resource allocations */
+
+ object Config {
+ def from_job(job: Build_Process.Job): Config = Config(job.name, job.node_info)
+ }
+
+ case class Config(job_name: String, node_info: Node_Info) {
+ def job_of(start_time: Time): Build_Process.Job =
+ Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
+ }
case class Resources(
host_infos: Host_Infos,
@@ -303,7 +396,8 @@
val remaining =
build_state.running.values.toList.map { job =>
val elapsed = current_time - job.start_date.time
- val predicted = timing_data.estimate_config(Config.from_job(job))
+ val threads = timing_data.host_infos.num_threads(job.node_info)
+ val predicted = timing_data.estimate(job.name, job.node_info.hostname, threads)
val remaining = if (elapsed > predicted) Time.zero else predicted - elapsed
job -> remaining
}
@@ -323,7 +417,17 @@
def build_duration(build_state: Build_Process.State): Time
}
- abstract class Heuristic(timing_data: Timing_Data) extends Scheduler {
+ abstract class Heuristic(timing_data: Timing_Data, max_threads_limit: Int) extends Scheduler {
+ val host_infos = timing_data.host_infos
+ val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
+ val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
+
+ def best_time(job_name: String): Time = {
+ val host = ordered_hosts.last
+ val threads = timing_data.best_threads(job_name, max_threads) min host.info.num_cpus
+ timing_data.estimate(job_name, host.info.hostname, threads)
+ }
+
def build_duration(build_state: Build_Process.State): Time = {
@tailrec
def simulate(state: State): State =
@@ -352,12 +456,11 @@
/* heuristics */
- class Timing_Heuristic(
- threshold: Time,
+ abstract class Path_Heuristic(
timing_data: Timing_Data,
sessions_structure: Sessions.Structure,
- max_threads: Int = 8
- ) extends Heuristic(timing_data) {
+ max_threads_limit: Int
+ ) extends Heuristic(timing_data, max_threads_limit) {
/* pre-computed properties for efficient heuristic */
type Node = String
@@ -367,7 +470,7 @@
val maximals_preds =
all_maximals.map(node => node -> build_graph.all_preds(List(node)).toSet).toMap
- val best_times = build_graph.keys.map(node => node -> timing_data.best_time(node)).toMap
+ val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
val remaining_time_ms = build_graph.node_height(best_times(_).ms)
def elapsed_times(node: Node): Map[Node, Time] =
@@ -383,10 +486,6 @@
.groupMapReduce(_._1)(_._2)(_ max _)
}
- val critical_path_nodes =
- build_graph.keys.map(node =>
- node -> path_times(node).filter((_, time) => time > threshold).keySet).toMap
-
def parallel_paths(minimals: Set[Node], pred: Node => Boolean = _ => true): Int = {
def start(node: Node): (Node, Time) = node -> best_times(node)
@@ -413,41 +512,47 @@
parallel_paths(minimals.map(start).toMap)._1
}
-
+ }
- /* scheduling */
-
- val host_infos = timing_data.host_infos
+ class Timing_Heuristic(
+ threshold: Time,
+ timing_data: Timing_Data,
+ sessions_structure: Sessions.Structure,
+ max_threads_limit: Int = 8
+ ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit) {
+ val critical_path_nodes =
+ build_graph.keys.map(node =>
+ node -> path_times(node).filter((_, time) => time > threshold).keySet).toMap
def next(state: Build_Process.State): List[Config] = {
val resources = host_infos.available(state)
def best_threads(task: Build_Process.Task): Int =
- timing_data.best_threads(task.name).getOrElse(
- host_infos.hosts.map(_.info.num_cpus).max min max_threads)
+ timing_data.best_threads(task.name, max_threads)
+
+ val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
- val ordered_hosts =
- host_infos.hosts.sorted(host_infos.host_speeds).reverse.map(_ -> max_threads)
-
- val fully_parallelizable =
- parallel_paths(state.ready.map(_.name).toSet) <= resources.unused_nodes(max_threads).length
+ val resources0 = host_infos.available(state.copy(running = Map.empty))
+ val max_parallel = parallel_paths(state.ready.map(_.name).toSet)
+ val fully_parallelizable = max_parallel <= resources0.unused_nodes(max_threads).length
if (fully_parallelizable) {
- val all_tasks = state.ready.map(task => (task, best_threads(task), best_threads(task)))
- resources.try_allocate_tasks(ordered_hosts, all_tasks)._1
+ val all_tasks = state.next_ready.map(task => (task, best_threads(task), best_threads(task)))
+ resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
}
else {
val critical_nodes = state.ready.toSet.flatMap(task => critical_path_nodes(task.name))
val (critical, other) =
- state.ready.sortBy(task => remaining_time_ms(task.name)).reverse.partition(task =>
+ state.next_ready.sortBy(task => remaining_time_ms(task.name)).reverse.partition(task =>
critical_nodes.contains(task.name))
val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task)))
val other_tasks = other.map(task => (task, 1, best_threads(task)))
- val (critical_hosts, other_hosts) =
- ordered_hosts.splitAt(parallel_paths(critical.map(_.name).toSet, critical_nodes.contains))
+ val critical_minimals = critical_nodes.intersect(state.ready.map(_.name).toSet)
+ val max_critical_parallel = parallel_paths(critical_minimals, critical_nodes.contains)
+ val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(max_critical_parallel)
val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks)
val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks)
@@ -617,10 +722,10 @@
override def next_jobs(state: Build_Process.State): List[String] = {
val finalize_limit = if (build_context.master) Int.MaxValue else 0
- if (progress.stopped) state.ready.map(_.name).take(finalize_limit)
+ if (progress.stopped) state.next_ready.map(_.name).take(finalize_limit)
else if (cache.is_current(state)) cache.configs.map(_.job_name).filterNot(state.is_running)
else {
- val current = state.ready.filter(task => is_current(state, task.name))
+ val current = state.next_ready.filter(task => is_current(state, task.name))
if (current.nonEmpty) current.map(_.name).take(finalize_limit)
else {
val start = Time.now()