# HG changeset patch # User traytel # Date 1707895884 -3600 # Node ID 1b3770369ee7b3c2c38d07aeac6988be16963fea # Parent 3e27ab965a36941f6911a10886e7ff8068680e60# Parent f933e915362456c2b377162e89439b8e568fac7b merged diff -r 3e27ab965a36 -r 1b3770369ee7 src/Pure/Build/build_schedule.scala --- a/src/Pure/Build/build_schedule.scala Tue Feb 13 14:31:09 2024 +0100 +++ b/src/Pure/Build/build_schedule.scala Wed Feb 14 08:31:24 2024 +0100 @@ -14,25 +14,11 @@ object Build_Schedule { /* organized historic timing information (extracted from build logs) */ - case class Timing_Entry(job_name: String, hostname: String, threads: Int, timing: Timing) { + case class Result(job_name: String, hostname: String, threads: Int, timing: Timing) { def elapsed: Time = timing.elapsed def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms) } - case class Timing_Entries(entries: List[Timing_Entry]) { - require(entries.nonEmpty) - - def is_empty = entries.isEmpty - def size = entries.length - - lazy val by_job = entries.groupBy(_.job_name).view.mapValues(Timing_Entries(_)).toMap - lazy val by_threads = entries.groupBy(_.threads).view.mapValues(Timing_Entries(_)).toMap - lazy val by_hostname = entries.groupBy(_.hostname).view.mapValues(Timing_Entries(_)).toMap - - def mean_time: Time = Timing_Data.mean_time(entries.map(_.elapsed)) - def best_entry: Timing_Entry = entries.minBy(_.elapsed.ms) - } - object Timing_Data { def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2) @@ -44,8 +30,8 @@ val baseline = Time.minutes(5).scale(host_factor) val gc = Time.seconds(10).scale(host_factor) List( - Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)), - Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc))) + Result("dummy", host.name, 1, Timing(baseline, baseline, gc)), + Result("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc))) } def make( @@ -73,10 +59,10 @@ else measurements.groupMap(_._1)(_._2).toList.map { case ((job_name, hostname, threads), timings) => - Timing_Entry(job_name, hostname, threads, median_timing(timings)) + Result(job_name, hostname, threads, median_timing(timings)) } - new Timing_Data(Timing_Entries(entries), host_infos) + new Timing_Data(new Facet(entries), host_infos) } def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = { @@ -91,18 +77,41 @@ make(host_infos, build_history) } + + + /* data facets */ + + object Facet { + def unapply(facet: Facet): Option[List[Result]] = Some(facet.results) + } + + class Facet private[Timing_Data](val results: List[Result]) { + require(results.nonEmpty) + + def is_empty = results.isEmpty + + def size = results.length + + lazy val by_job = results.groupBy(_.job_name).view.mapValues(new Facet(_)).toMap + lazy val by_threads = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap + lazy val by_hostname = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap + + def mean_time: Time = Timing_Data.mean_time(results.map(_.elapsed)) + + def best_result: Result = results.minBy(_.elapsed.ms) + } } - class Timing_Data private(data: Timing_Entries, val host_infos: Host_Infos) { + class Timing_Data private(facet: Timing_Data.Facet, val host_infos: Host_Infos) { private def inflection_point(last_mono: Int, next: Int): Int = last_mono + ((next - last_mono) / 2) def best_threads(job_name: String, max_threads: Int): Int = { val worse_threads = - data.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap { - case (hostname, data) => - val best_threads = data.best_entry.threads - data.by_threads.keys.toList.sorted.find(_ > best_threads).map( + facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap { + case (hostname, facet) => + val best_threads = facet.best_result.threads + facet.by_threads.keys.toList.sorted.find(_ > best_threads).map( inflection_point(best_threads, _)) } (max_threads :: worse_threads).min @@ -170,31 +179,31 @@ } private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = { - def unify(hostname: String, data: Timing_Entries) = - data.mean_time.scale(hostname_factor(hostname, on_host)) + def unify(hostname: String, facet: Timing_Data.Facet) = + facet.mean_time.scale(hostname_factor(hostname, on_host)) for { - data <- data.by_job.get(job_name).toList - (threads, data) <- data.by_threads - entries = data.by_hostname.toList.map(unify) + facet <- facet.by_job.get(job_name).toList + (threads, facet) <- facet.by_threads + entries = facet.by_hostname.toList.map(unify) } yield threads -> Timing_Data.median_time(entries) } def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = { - def try_approximate(data: Timing_Entries): Option[Time] = { + def try_approximate(facet: Timing_Data.Facet): Option[Time] = { val entries = - data.by_threads.toList match { - case List((i, Timing_Entries(List(entry)))) if i != 1 => - (i, data.mean_time) :: entry.proper_cpu.map(1 -> _).toList - case entries => entries.map((threads, data) => threads -> data.mean_time) + facet.by_threads.toList match { + case List((i, Timing_Data.Facet(List(result)))) if i != 1 => + (i, facet.mean_time) :: result.proper_cpu.map(1 -> _).toList + case entries => entries.map((threads, facet) => threads -> facet.mean_time) } if (entries.size < 2) None else Some(approximate_threads(entries, threads)) } for { - data <- data.by_job.get(job_name) - data <- data.by_hostname.get(hostname) - time <- data.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(data)) + facet <- facet.by_job.get(job_name) + facet <- facet.by_hostname.get(hostname) + time <- facet.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(facet)) } yield time } @@ -203,8 +212,8 @@ val estimates = for { - (hostname, data) <- data.by_hostname - job_name <- data.by_job.keys + (hostname, facet) <- facet.by_hostname + job_name <- facet.by_job.keys from_time <- estimate_threads(job_name, hostname, from) to_time <- estimate_threads(job_name, hostname, to) } yield from_time.ms.toDouble / to_time.ms @@ -214,8 +223,8 @@ // unify hosts val estimates = for { - (job_name, data) <- data.by_job - hostname = data.by_hostname.keys.head + (job_name, facet) <- facet.by_job + hostname = facet.by_hostname.keys.head entries = unify_hosts(job_name, hostname) if entries.length > 1 } yield @@ -239,26 +248,26 @@ def estimate(job_name: String, hostname: String, threads: Int): Time = { def estimate: Time = - data.by_job.get(job_name) match { + facet.by_job.get(job_name) match { case None => // no data for job, take average of other jobs for given threads - val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads)) + val job_estimates = facet.by_job.keys.flatMap(estimate_threads(_, hostname, threads)) if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates) else { // no other job to estimate from, use global curve to approximate any other job - val (threads1, data1) = data.by_threads.head - data1.mean_time.scale(global_threads_factor(threads1, threads)) + val (threads1, facet1) = facet.by_threads.head + facet1.mean_time.scale(global_threads_factor(threads1, threads)) } - case Some(data) => - data.by_threads.get(threads) match { + case Some(facet) => + facet.by_threads.get(threads) match { case None => // interpolate threads estimate_threads(job_name, hostname, threads).map(_.scale( FACTOR_NO_THREADS_SAME_MACHINE)).getOrElse { // per machine, try to approximate config for threads val approximated = for { - hostname1 <- data.by_hostname.keys + hostname1 <- facet.by_hostname.keys estimate <- estimate_threads(job_name, hostname1, threads) factor = hostname_factor(hostname1, hostname) } yield estimate.scale(factor) @@ -281,11 +290,11 @@ } } - case Some(data) => // time for job/thread exists, interpolate machine if necessary - data.by_hostname.get(hostname).map(_.mean_time).getOrElse { + case Some(facet) => // time for job/thread exists, interpolate machine if necessary + facet.by_hostname.get(hostname).map(_.mean_time).getOrElse { Timing_Data.median_time( - data.by_hostname.toList.map((hostname1, data) => - data.mean_time.scale(hostname_factor(hostname1, hostname)))).scale( + facet.by_hostname.toList.map((hostname1, facet) => + facet.mean_time.scale(hostname_factor(hostname1, hostname)))).scale( FACTOR_THREADS_OTHER_MACHINE) } } @@ -347,8 +356,8 @@ def available(state: Build_Process.State): Resources = { val allocated = - state.running.values.map(_.node_info).groupMapReduce(the_host)(List(_))(_ ::: _) - Resources(this, allocated) + state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _) + new Resources(this, allocated) } } @@ -360,9 +369,9 @@ Build_Process.Job(job_name, "", "", node_info, Date(start_time), None) } - case class Resources( - host_infos: Host_Infos, - allocated_nodes: Map[Host, List[Node_Info]] + class Resources( + val host_infos: Host_Infos, + allocated_nodes: Map[String, List[Node_Info]] ) { def unused_nodes(host: Host, threads: Int): List[Node_Info] = if (!available(host, threads)) Nil @@ -374,11 +383,11 @@ def unused_nodes(threads: Int): List[Node_Info] = host_infos.hosts.flatMap(unused_nodes(_, threads)) - def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil) + def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host.name, Nil) - def allocate(node: Node_Info): Resources = { - val host = host_infos.the_host(node) - copy(allocated_nodes = allocated_nodes + (host -> (node :: allocated(host)))) + def allocate(node_info: Node_Info): Resources = { + val host = host_infos.the_host(node_info) + new Resources(host_infos, allocated_nodes + (host.name -> (node_info :: allocated(host)))) } def try_allocate_tasks( @@ -538,7 +547,8 @@ val host_preds = for { - (name, (pred_node, _)) <- finished.graph.iterator.toSet + name <- finished.graph.keys + pred_node = finished.graph.get_node(name) if pred_node.node_info.hostname == job.node_info.hostname if pred_node.end.time <= node.start.time } yield name @@ -556,35 +566,49 @@ def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty } - trait Scheduler { def build_schedule(build_state: Build_Process.State): Schedule } + trait Scheduler { def schedule(build_state: Build_Process.State): Schedule } + + trait Priority_Rule { def select_next(state: Build_Process.State): List[Config] } - abstract class Heuristic(timing_data: Timing_Data, build_uuid: String) - extends Scheduler { - val host_infos = timing_data.host_infos - val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds) - - def next(state: Build_Process.State): List[Config] - - def build_schedule(build_state: Build_Process.State): Schedule = { + case class Generation_Scheme( + priority_rule: Priority_Rule, + timing_data: Timing_Data, + build_uuid: String + ) extends Scheduler { + def schedule(build_state: Build_Process.State): Schedule = { @tailrec def simulate(state: State): State = if (state.is_finished) state else { - val state1 = next(state.build_state).foldLeft(state)(_.start(_)).step(timing_data) + val state1 = + priority_rule + .select_next(state.build_state) + .foldLeft(state)(_.start(_)) + .step(timing_data) simulate(state1) } val start = Date.now() + val name = "generation scheme (" + priority_rule + ")" val end_state = - simulate(State(build_state, start.time, Schedule(build_uuid, toString, start, Graph.empty))) + simulate(State(build_state, start.time, Schedule(build_uuid, name, start, Graph.empty))) end_state.finished } } - class Default_Heuristic(timing_data: Timing_Data, options: Options, build_uuid: String) - extends Heuristic(timing_data, build_uuid) { - override def toString: String = "default build heuristic" + case class Optimizer(schedulers: List[Scheduler]) extends Scheduler { + require(schedulers.nonEmpty) + + def schedule(state: Build_Process.State): Schedule = + schedulers.map(_.schedule(state)).minBy(_.duration.ms) + } + + + /* priority rules */ + + class Default_Heuristic(host_infos: Host_Infos, options: Options) extends Priority_Rule { + override def toString: String = "default heuristic" def host_threads(host: Host): Int = { val m = (options ++ host.build.options).int("threads") @@ -594,7 +618,7 @@ def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] = sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _)) - def next(state: Build_Process.State): List[Config] = { + def select_next(state: Build_Process.State): List[Config] = { val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name) val resources = host_infos.available(state) @@ -607,89 +631,6 @@ } } - class Meta_Heuristic(heuristics: List[Heuristic]) extends Scheduler { - require(heuristics.nonEmpty) - - def best_result(state: Build_Process.State): (Heuristic, Schedule) = - heuristics.map(heuristic => - heuristic -> heuristic.build_schedule(state)).minBy(_._2.duration.ms) - - def next(state: Build_Process.State): List[Config] = best_result(state)._1.next(state) - - def build_schedule(state: Build_Process.State): Schedule = best_result(state)._2 - } - - - /* heuristics */ - - abstract class Path_Heuristic( - timing_data: Timing_Data, - sessions_structure: Sessions.Structure, - max_threads_limit: Int, - build_uuid: String - ) extends Heuristic(timing_data, build_uuid) { - /* pre-computed properties for efficient heuristic */ - - val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit - - type Node = String - val build_graph = sessions_structure.build_graph - - val minimals = build_graph.minimals - val maximals = build_graph.maximals - - def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet - val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap - - def best_time(node: Node): Time = { - val host = ordered_hosts.last - val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus - timing_data.estimate(node, host.name, threads) - } - val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap - - val succs_max_time_ms = build_graph.node_height(best_times(_).ms) - def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node) - def max_time(task: Build_Process.Task): Time = max_time(task.name) - - def path_times(minimals: List[Node]): Map[Node, Time] = { - def time_ms(node: Node): Long = best_times(node).ms - val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals) - path_times_ms.view.mapValues(Time.ms).toMap - } - - def path_max_times(minimals: List[Node]): Map[Node, Time] = - path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap - - def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = { - def start(node: Node): (Node, Time) = node -> best_times(node) - - def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = - node -> (time - elapsed) - - def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) = - if (running.isEmpty) (0, running) - else { - def get_next(node: Node): List[Node] = - build_graph.imm_succs(node).filter(pred).filter( - build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList - - val (next, elapsed) = running.minBy(_._2.ms) - val (remaining, finished) = - running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero) - - val running1 = - remaining.map(pass_time(elapsed)).toMap ++ - finished.map(_._1).flatMap(get_next).map(start) - val (res, running2) = parallel_paths(running1) - (res max running.size, running2) - } - - parallel_paths(running.toMap)._1 - } - } - - object Path_Time_Heuristic { sealed trait Critical_Criterion case class Absolute_Time(time: Time) extends Critical_Criterion { @@ -725,9 +666,8 @@ host_criterion: Path_Time_Heuristic.Host_Criterion, timing_data: Timing_Data, sessions_structure: Sessions.Structure, - build_uuid: String, max_threads_limit: Int = 8 - ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit, build_uuid) { + ) extends Priority_Rule { import Path_Time_Heuristic.* override def toString: Node = { @@ -739,11 +679,85 @@ "path time heuristic (" + params.mkString(", ") + ")" } - def next(state: Build_Process.State): List[Config] = { + /* pre-computed properties for efficient heuristic */ + val host_infos = timing_data.host_infos + val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds) + + val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit + + type Node = String + val build_graph = sessions_structure.build_graph + + val minimals = build_graph.minimals + val maximals = build_graph.maximals + + def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet + val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap + + val best_threads = + build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap + + def best_time(node: Node): Time = { + val host = ordered_hosts.last + val threads = best_threads(node) min host.info.num_cpus + timing_data.estimate(node, host.name, threads) + } + val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap + + val succs_max_time_ms = build_graph.node_height(best_times(_).ms) + def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node) + def max_time(task: Build_Process.Task): Time = max_time(task.name) + + def path_times(minimals: List[Node]): Map[Node, Time] = { + def time_ms(node: Node): Long = best_times(node).ms + val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals) + path_times_ms.view.mapValues(Time.ms).toMap + } + + def path_max_times(minimals: List[Node]): Map[Node, Time] = + path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap + + val node_degrees = + build_graph.keys.map(node => node -> build_graph.imm_succs(node).size).toMap + + def parallel_paths( + running: List[(Node, Time)], + nodes: Set[Node] = build_graph.keys.toSet, + max: Int = Int.MaxValue + ): Int = + if (nodes.nonEmpty && nodes.map(node_degrees.apply).max > max) max + else { + def start(node: Node): (Node, Time) = node -> best_times(node) + + def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = + node -> (time - elapsed) + + def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) = + if (running.size >= max) (max, running) + else if (running.isEmpty) (0, running) + else { + def get_next(node: Node): List[Node] = + build_graph.imm_succs(node).intersect(nodes).filter( + build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList + + val (next, elapsed) = running.minBy(_._2.ms) + val (remaining, finished) = + running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero) + + val running1 = + remaining.map(pass_time(elapsed)).toMap ++ + finished.map(_._1).flatMap(get_next).map(start) + val (res, running2) = parallel_paths(running1) + (res max running.size, running2) + } + + parallel_paths(running.toMap)._1 + } + + def select_next(state: Build_Process.State): List[Config] = { val resources = host_infos.available(state) - def best_threads(task: Build_Process.Task): Int = - timing_data.best_threads(task.name, max_threads) + def best_threads(task: Build_Process.Task): Int = this.best_threads(task.name) val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads) @@ -754,7 +768,7 @@ def remaining_time(node: Node): (Node, Time) = state.running.get(node) match { - case None => node -> best_time(node) + case None => node -> best_times(node) case Some(job) => val estimate = timing_data.estimate(job.name, job.node_info.hostname, @@ -762,10 +776,13 @@ node -> ((Time.now() - job.start_date.time + estimate) max Time.zero) } - val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time)) val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse + val is_parallelizable = + available_nodes.length >= parallel_paths( + state.ready.map(_.name).map(remaining_time), + max = available_nodes.length + 1) - if (max_parallel <= available_nodes.length) { + if (is_parallelizable) { val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task))) resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1 } @@ -787,13 +804,13 @@ def parallel_threads(task: Build_Process.Task): Int = this.parallel_threads match { case Fixed_Thread(threads) => threads - case Time_Based_Threads(f) => f(best_time(task.name)) + case Time_Based_Threads(f) => f(best_times(task.name)) } val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task))) val max_critical_parallel = - parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains) + parallel_paths(critical_minimals.map(remaining_time), critical_nodes) val max_critical_hosts = available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length @@ -1002,7 +1019,7 @@ else { val start = Time.now() - val new_schedule = scheduler.build_schedule(state).update(state) + val new_schedule = scheduler.schedule(state).update(state) val schedule = if (_schedule.is_empty) new_schedule else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering) @@ -1230,11 +1247,10 @@ parallel <- parallel_threads machine_split <- machine_splits } yield - Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure, - context.build_uuid) - val default_heuristic = - Default_Heuristic(timing_data, context.build_options, context.build_uuid) - new Meta_Heuristic(default_heuristic :: path_time_heuristics) + Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure) + val default_heuristic = Default_Heuristic(timing_data.host_infos, context.build_options) + val heuristics = default_heuristic :: path_time_heuristics + Optimizer(heuristics.map(Generation_Scheme(_, timing_data, context.build_uuid))) } override def open_build_process( @@ -1318,7 +1334,8 @@ def schedule_msg(res: Exn.Result[Schedule]): String = res match { case Exn.Res(schedule) => schedule.message case _ => "" } - Timing.timeit(scheduler.build_schedule(build_state), schedule_msg, output = progress.echo(_)) + progress.echo("Building schedule...") + Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_)) } using(store.open_server()) { server =>