--- a/src/Pure/Build/build_schedule.scala Tue Feb 13 11:34:28 2024 +0100
+++ b/src/Pure/Build/build_schedule.scala Tue Feb 13 11:57:41 2024 +0100
@@ -14,25 +14,11 @@
object Build_Schedule {
/* organized historic timing information (extracted from build logs) */
- case class Timing_Entry(job_name: String, hostname: String, threads: Int, timing: Timing) {
+ case class Result(job_name: String, hostname: String, threads: Int, timing: Timing) {
def elapsed: Time = timing.elapsed
def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms)
}
- case class Timing_Entries(entries: List[Timing_Entry]) {
- require(entries.nonEmpty)
-
- def is_empty = entries.isEmpty
- def size = entries.length
-
- lazy val by_job = entries.groupBy(_.job_name).view.mapValues(Timing_Entries(_)).toMap
- lazy val by_threads = entries.groupBy(_.threads).view.mapValues(Timing_Entries(_)).toMap
- lazy val by_hostname = entries.groupBy(_.hostname).view.mapValues(Timing_Entries(_)).toMap
-
- def mean_time: Time = Timing_Data.mean_time(entries.map(_.elapsed))
- def best_entry: Timing_Entry = entries.minBy(_.elapsed.ms)
- }
-
object Timing_Data {
def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2)
@@ -44,8 +30,8 @@
val baseline = Time.minutes(5).scale(host_factor)
val gc = Time.seconds(10).scale(host_factor)
List(
- Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)),
- Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
+ Result("dummy", host.name, 1, Timing(baseline, baseline, gc)),
+ Result("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
}
def make(
@@ -73,10 +59,10 @@
else
measurements.groupMap(_._1)(_._2).toList.map {
case ((job_name, hostname, threads), timings) =>
- Timing_Entry(job_name, hostname, threads, median_timing(timings))
+ Result(job_name, hostname, threads, median_timing(timings))
}
- new Timing_Data(Timing_Entries(entries), host_infos)
+ new Timing_Data(new Facet(entries), host_infos)
}
def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = {
@@ -91,18 +77,41 @@
make(host_infos, build_history)
}
+
+
+ /* data facets */
+
+ object Facet {
+ def unapply(facet: Facet): Option[List[Result]] = Some(facet.results)
+ }
+
+ class Facet private[Timing_Data](val results: List[Result]) {
+ require(results.nonEmpty)
+
+ def is_empty = results.isEmpty
+
+ def size = results.length
+
+ lazy val by_job = results.groupBy(_.job_name).view.mapValues(new Facet(_)).toMap
+ lazy val by_threads = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap
+ lazy val by_hostname = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap
+
+ def mean_time: Time = Timing_Data.mean_time(results.map(_.elapsed))
+
+ def best_result: Result = results.minBy(_.elapsed.ms)
+ }
}
- class Timing_Data private(data: Timing_Entries, val host_infos: Host_Infos) {
+ class Timing_Data private(facet: Timing_Data.Facet, val host_infos: Host_Infos) {
private def inflection_point(last_mono: Int, next: Int): Int =
last_mono + ((next - last_mono) / 2)
def best_threads(job_name: String, max_threads: Int): Int = {
val worse_threads =
- data.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
- case (hostname, data) =>
- val best_threads = data.best_entry.threads
- data.by_threads.keys.toList.sorted.find(_ > best_threads).map(
+ facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
+ case (hostname, facet) =>
+ val best_threads = facet.best_result.threads
+ facet.by_threads.keys.toList.sorted.find(_ > best_threads).map(
inflection_point(best_threads, _))
}
(max_threads :: worse_threads).min
@@ -170,31 +179,31 @@
}
private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = {
- def unify(hostname: String, data: Timing_Entries) =
- data.mean_time.scale(hostname_factor(hostname, on_host))
+ def unify(hostname: String, facet: Timing_Data.Facet) =
+ facet.mean_time.scale(hostname_factor(hostname, on_host))
for {
- data <- data.by_job.get(job_name).toList
- (threads, data) <- data.by_threads
- entries = data.by_hostname.toList.map(unify)
+ facet <- facet.by_job.get(job_name).toList
+ (threads, facet) <- facet.by_threads
+ entries = facet.by_hostname.toList.map(unify)
} yield threads -> Timing_Data.median_time(entries)
}
def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = {
- def try_approximate(data: Timing_Entries): Option[Time] = {
+ def try_approximate(facet: Timing_Data.Facet): Option[Time] = {
val entries =
- data.by_threads.toList match {
- case List((i, Timing_Entries(List(entry)))) if i != 1 =>
- (i, data.mean_time) :: entry.proper_cpu.map(1 -> _).toList
- case entries => entries.map((threads, data) => threads -> data.mean_time)
+ facet.by_threads.toList match {
+ case List((i, Timing_Data.Facet(List(result)))) if i != 1 =>
+ (i, facet.mean_time) :: result.proper_cpu.map(1 -> _).toList
+ case entries => entries.map((threads, facet) => threads -> facet.mean_time)
}
if (entries.size < 2) None else Some(approximate_threads(entries, threads))
}
for {
- data <- data.by_job.get(job_name)
- data <- data.by_hostname.get(hostname)
- time <- data.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(data))
+ facet <- facet.by_job.get(job_name)
+ facet <- facet.by_hostname.get(hostname)
+ time <- facet.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(facet))
} yield time
}
@@ -203,8 +212,8 @@
val estimates =
for {
- (hostname, data) <- data.by_hostname
- job_name <- data.by_job.keys
+ (hostname, facet) <- facet.by_hostname
+ job_name <- facet.by_job.keys
from_time <- estimate_threads(job_name, hostname, from)
to_time <- estimate_threads(job_name, hostname, to)
} yield from_time.ms.toDouble / to_time.ms
@@ -214,8 +223,8 @@
// unify hosts
val estimates =
for {
- (job_name, data) <- data.by_job
- hostname = data.by_hostname.keys.head
+ (job_name, facet) <- facet.by_job
+ hostname = facet.by_hostname.keys.head
entries = unify_hosts(job_name, hostname)
if entries.length > 1
} yield
@@ -239,26 +248,26 @@
def estimate(job_name: String, hostname: String, threads: Int): Time = {
def estimate: Time =
- data.by_job.get(job_name) match {
+ facet.by_job.get(job_name) match {
case None =>
// no data for job, take average of other jobs for given threads
- val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
+ val job_estimates = facet.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
else {
// no other job to estimate from, use global curve to approximate any other job
- val (threads1, data1) = data.by_threads.head
- data1.mean_time.scale(global_threads_factor(threads1, threads))
+ val (threads1, facet1) = facet.by_threads.head
+ facet1.mean_time.scale(global_threads_factor(threads1, threads))
}
- case Some(data) =>
- data.by_threads.get(threads) match {
+ case Some(facet) =>
+ facet.by_threads.get(threads) match {
case None => // interpolate threads
estimate_threads(job_name, hostname, threads).map(_.scale(
FACTOR_NO_THREADS_SAME_MACHINE)).getOrElse {
// per machine, try to approximate config for threads
val approximated =
for {
- hostname1 <- data.by_hostname.keys
+ hostname1 <- facet.by_hostname.keys
estimate <- estimate_threads(job_name, hostname1, threads)
factor = hostname_factor(hostname1, hostname)
} yield estimate.scale(factor)
@@ -281,11 +290,11 @@
}
}
- case Some(data) => // time for job/thread exists, interpolate machine if necessary
- data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
+ case Some(facet) => // time for job/thread exists, interpolate machine if necessary
+ facet.by_hostname.get(hostname).map(_.mean_time).getOrElse {
Timing_Data.median_time(
- data.by_hostname.toList.map((hostname1, data) =>
- data.mean_time.scale(hostname_factor(hostname1, hostname)))).scale(
+ facet.by_hostname.toList.map((hostname1, facet) =>
+ facet.mean_time.scale(hostname_factor(hostname1, hostname)))).scale(
FACTOR_THREADS_OTHER_MACHINE)
}
}
@@ -376,9 +385,9 @@
def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil)
- def allocate(node: Node_Info): Resources = {
- val host = host_infos.the_host(node)
- copy(allocated_nodes = allocated_nodes + (host -> (node :: allocated(host))))
+ def allocate(node_info: Node_Info): Resources = {
+ val host = host_infos.the_host(node_info)
+ copy(allocated_nodes = allocated_nodes + (host -> (node_info :: allocated(host))))
}
def try_allocate_tasks(