--- a/src/Pure/Tools/build_schedule.scala Fri Dec 08 20:47:03 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala Fri Dec 08 20:56:21 2023 +0100
@@ -8,6 +8,7 @@
import Host.Node_Info
import scala.annotation.tailrec
+import scala.collection.mutable
object Build_Schedule {
@@ -17,6 +18,7 @@
def elapsed: Time = timing.elapsed
def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms)
}
+
case class Timing_Entries(entries: List[Timing_Entry]) {
require(entries.nonEmpty)
@@ -31,6 +33,66 @@
def best_entry: Timing_Entry = entries.minBy(_.elapsed.ms)
}
+ object Timing_Data {
+ def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2)
+
+ def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).apply(obs.length / 2)
+
+ def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size)
+
+ private def dummy_entries(host: Host, host_factor: Double) = {
+ val baseline = Time.minutes(5).scale(host_factor)
+ val gc = Time.seconds(10).scale(host_factor)
+ List(
+ Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)),
+ Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
+ }
+
+ def make(
+ host_infos: Host_Infos,
+ build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)],
+ ): Timing_Data = {
+ val hosts = host_infos.hosts
+ val measurements =
+ for {
+ (meta_info, build_info) <- build_history
+ build_host = meta_info.get(Build_Log.Prop.build_host)
+ (job_name, session_info) <- build_info.sessions.toList
+ if build_info.finished_sessions.contains(job_name)
+ hostname <- session_info.hostname.orElse(build_host).toList
+ host <- hosts.find(_.info.hostname == hostname).toList
+ threads = session_info.threads.getOrElse(host.info.num_cpus)
+ } yield (job_name, hostname, threads) -> session_info.timing
+
+ val entries =
+ if (measurements.isEmpty) {
+ val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last
+ host_infos.hosts.flatMap(host =>
+ dummy_entries(host, host_infos.host_factor(default_host, host)))
+ }
+ else
+ measurements.groupMap(_._1)(_._2).toList.map {
+ case ((job_name, hostname, threads), timings) =>
+ Timing_Entry(job_name, hostname, threads, median_timing(timings))
+ }
+
+ new Timing_Data(Timing_Entries(entries), host_infos)
+ }
+
+ def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = {
+ val build_history =
+ for {
+ log_name <- log_database.execute_query_statement(
+ Build_Log.private_data.meta_info_table.select(List(Build_Log.Column.log_name)),
+ List.from[String], res => res.string(Build_Log.Column.log_name))
+ meta_info <- Build_Log.private_data.read_meta_info(log_database, log_name)
+ build_info = Build_Log.private_data.read_build_info(log_database, log_name)
+ } yield (meta_info, build_info)
+
+ make(host_infos, build_history)
+ }
+ }
+
class Timing_Data private(data: Timing_Entries, val host_infos: Host_Infos) {
private def inflection_point(last_mono: Int, next: Int): Int =
last_mono + ((next - last_mono) / 2)
@@ -62,7 +124,7 @@
def linear(p0: (Int, Time), p1: (Int, Time)): Time = {
val a = (p1._2 - p0._2).scale(1.0 / (p1._1 - p0._1))
val b = p0._2 - a.scale(p0._1)
- Time.ms((a.scale(threads) + b).ms max 0)
+ (a.scale(threads) + b) max Time.zero
}
val mono_prefix = sorted_prefix(entries, e => -e._2.ms)
@@ -82,7 +144,7 @@
val t_c =
Timing_Data.median_time(for ((n, t) <- mono_prefix) yield t - t_p.scale(1.0 / n))
- def model(threads: Int): Time = Time.ms((t_c + t_p.scale(1.0 / threads)).ms max 0)
+ def model(threads: Int): Time = (t_c + t_p.scale(1.0 / threads)) max Time.zero
if (is_mono || in_prefix) model(threads)
else {
@@ -164,109 +226,62 @@
}
}
- def estimate(job_name: String, hostname: String, threads: Int): Time =
- data.by_job.get(job_name) match {
- case None =>
- // no data for job, take average of other jobs for given threads
- val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
- if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
- else {
- // no other job to estimate from, use global curve to approximate any other job
- val (threads1, data1) = data.by_threads.head
- data1.mean_time.scale(global_threads_factor(threads1, threads))
- }
+ private var cache: Map[(String, String, Int), Time] = Map.empty
+ def estimate(job_name: String, hostname: String, threads: Int): Time = {
+ def estimate: Time =
+ data.by_job.get(job_name) match {
+ case None =>
+ // no data for job, take average of other jobs for given threads
+ val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
+ if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
+ else {
+ // no other job to estimate from, use global curve to approximate any other job
+ val (threads1, data1) = data.by_threads.head
+ data1.mean_time.scale(global_threads_factor(threads1, threads))
+ }
- case Some(data) =>
- data.by_threads.get(threads) match {
- case None => // interpolate threads
- estimate_threads(job_name, hostname, threads).getOrElse {
- // per machine, try to approximate config for threads
- val approximated =
- for {
- hostname1 <- data.by_hostname.keys
- estimate <- estimate_threads(job_name, hostname1, threads)
- factor = hostname_factor(hostname1, hostname)
- } yield estimate.scale(factor)
+ case Some(data) =>
+ data.by_threads.get(threads) match {
+ case None => // interpolate threads
+ estimate_threads(job_name, hostname, threads).getOrElse {
+ // per machine, try to approximate config for threads
+ val approximated =
+ for {
+ hostname1 <- data.by_hostname.keys
+ estimate <- estimate_threads(job_name, hostname1, threads)
+ factor = hostname_factor(hostname1, hostname)
+ } yield estimate.scale(factor)
- if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
- else {
- // no single machine where config can be approximated, unify data points
- val unified_entries = unify_hosts(job_name, hostname)
+ if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
+ else {
+ // no single machine where config can be approximated, unify data points
+ val unified_entries = unify_hosts(job_name, hostname)
- if (unified_entries.length > 1) approximate_threads(unified_entries, threads)
- else {
- // only single data point, use global curve to approximate
- val (job_threads, job_time) = unified_entries.head
- job_time.scale(global_threads_factor(job_threads, threads))
+ if (unified_entries.length > 1) approximate_threads(unified_entries, threads)
+ else {
+ // only single data point, use global curve to approximate
+ val (job_threads, job_time) = unified_entries.head
+ job_time.scale(global_threads_factor(job_threads, threads))
+ }
}
}
- }
-
- case Some(data) => // time for job/thread exists, interpolate machine
- data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
- Timing_Data.median_time(
- data.by_hostname.toList.map((hostname1, data) =>
- data.mean_time.scale(hostname_factor(hostname1, hostname))))
- }
- }
- }
- }
-
- object Timing_Data {
- def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2)
- def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).apply(obs.length / 2)
- def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size)
-
- private def dummy_entries(host: Host, host_factor: Double) = {
- val baseline = Time.minutes(5).scale(host_factor)
- val gc = Time.seconds(10).scale(host_factor)
- List(
- Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)),
- Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
- }
- def make(
- host_infos: Host_Infos,
- build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)],
- ): Timing_Data = {
- val hosts = host_infos.hosts
- val measurements =
- for {
- (meta_info, build_info) <- build_history
- build_host = meta_info.get(Build_Log.Prop.build_host)
- (job_name, session_info) <- build_info.sessions.toList
- if build_info.finished_sessions.contains(job_name)
- hostname <- session_info.hostname.orElse(build_host).toList
- host <- hosts.find(_.info.hostname == hostname).toList
- threads = session_info.threads.getOrElse(host.info.num_cpus)
- } yield (job_name, hostname, threads) -> session_info.timing
+ case Some(data) => // time for job/thread exists, interpolate machine
+ data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
+ Timing_Data.median_time(
+ data.by_hostname.toList.map((hostname1, data) =>
+ data.mean_time.scale(hostname_factor(hostname1, hostname))))
+ }
+ }
+ }
- val entries =
- if (measurements.isEmpty) {
- val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last
- host_infos.hosts.flatMap(host =>
- dummy_entries(host, host_infos.host_factor(default_host, host)))
- }
- else
- measurements.groupMap(_._1)(_._2).toList.map {
- case ((job_name, hostname, threads), timings) =>
- Timing_Entry(job_name, hostname, threads, median_timing(timings))
- }
-
- new Timing_Data(Timing_Entries(entries), host_infos)
- }
-
- def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = {
- val build_history =
- for {
- log_name <- log_database.execute_query_statement(
- Build_Log.private_data.meta_info_table.select(List(Build_Log.Column.log_name)),
- List.from[String], res => res.string(Build_Log.Column.log_name))
- meta_info <- Build_Log.private_data.read_meta_info(log_database, log_name)
- build_info = Build_Log.private_data.read_build_info(log_database, log_name)
- } yield (meta_info, build_info)
-
- make(host_infos, build_history)
+ cache.get(job_name, hostname, threads) match {
+ case Some(time) => time
+ case None =>
+ val time = estimate
+ cache = cache + ((job_name, hostname, threads) -> time)
+ time
+ }
}
}
@@ -324,10 +339,6 @@
/* offline tracking of job configurations and resource allocations */
- object Config {
- def from_job(job: Build_Process.Job): Config = Config(job.name, job.node_info)
- }
-
case class Config(job_name: String, node_info: Node_Info) {
def job_of(start_time: Time): Build_Process.Job =
Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
@@ -420,15 +431,66 @@
}
type Graph = isabelle.Graph[String, Node]
+
+ def init(build_uuid: String): Schedule = Schedule(build_uuid, "none", Date.now(), Graph.empty)
}
- case class Schedule(generator: String, start: Date, graph: Schedule.Graph) {
+ case class Schedule(
+ build_uuid: String,
+ generator: String,
+ start: Date,
+ graph: Schedule.Graph,
+ serial: Long = 0,
+ ) {
+ require(serial >= 0, "serial underflow")
+ def inc_serial: Schedule = {
+ require(serial < Long.MaxValue, "serial overflow")
+ copy(serial = serial + 1)
+ }
+
def end: Date =
if (graph.is_empty) start
else graph.maximals.map(graph.get_node).map(_.end).maxBy(_.unix_epoch)
def duration: Time = end.time - start.time
def message: String = "Estimated " + duration.message_hms + " build time with " + generator
+
+ def deviation(other: Schedule): Time = Time.ms((end.time - other.end.time).ms.abs)
+
+ def num_built(state: Build_Process.State): Int = graph.keys.count(state.results.contains)
+ def elapsed(): Time = Time.now() - start.time
+ def is_outdated(state: Build_Process.State, time_limit: Time, built_limit: Int): Boolean =
+ graph.is_empty || (elapsed() > time_limit && num_built(state) > built_limit)
+
+ def next(hostname: String, state: Build_Process.State): List[String] =
+ for {
+ task <- state.next_ready
+ node = graph.get_node(task.name)
+ if hostname == node.node_info.hostname
+ if graph.imm_preds(node.job_name).subsetOf(state.results.keySet)
+ } yield task.name
+
+ def update(state: Build_Process.State): Schedule = {
+ val start1 = Date.now()
+ val pending = state.pending.map(_.name).toSet
+
+ def shift_elapsed(graph: Schedule.Graph, name: String): Schedule.Graph =
+ graph.map_node(name, { node =>
+ val elapsed = start1.time - state.running(name).start_date.time
+ node.copy(duration = node.duration - elapsed)
+ })
+
+ def shift_starts(graph: Schedule.Graph, name: String): Schedule.Graph =
+ graph.map_node(name, { node =>
+ val starts = start1 :: graph.imm_preds(node.job_name).toList.map(graph.get_node(_).end)
+ node.copy(start = starts.max(Date.Ordering))
+ })
+
+ val graph0 = state.running.keys.foldLeft(graph.restrict(pending.contains))(shift_elapsed)
+ val graph1 = graph0.topological_order.foldLeft(graph0)(shift_starts)
+
+ copy(start = start1, graph = graph1)
+ }
}
case class State(build_state: Build_Process.State, current_time: Time, finished: Schedule) {
@@ -453,10 +515,16 @@
val now = current_time + elapsed
val node = Schedule.Node(job.name, job.node_info, job.start_date, now - job.start_date.time)
- val preds =
+ val host_preds =
+ for {
+ (name, (node, _)) <- finished.graph.iterator.toSet
+ if node.node_info.hostname == job.node_info.hostname
+ } yield name
+ val build_preds =
build_state.sessions.graph.imm_preds(job.name).filter(finished.graph.defined)
- val graph =
- preds.foldLeft(finished.graph.new_node(job.name, node))(_.add_edge(_, job.name))
+ val preds = build_preds ++ host_preds
+
+ val graph = preds.foldLeft(finished.graph.new_node(job.name, node))(_.add_edge(_, job.name))
val build_state1 = build_state.remove_running(job.name).remove_pending(job.name)
State(build_state1, now, finished.copy(graph = graph))
@@ -466,15 +534,15 @@
def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
}
- trait Scheduler {
- def next(state: Build_Process.State): List[Config]
- def build_schedule(build_state: Build_Process.State): Schedule
- }
+ trait Scheduler { def build_schedule(build_state: Build_Process.State): Schedule }
- abstract class Heuristic(timing_data: Timing_Data) extends Scheduler {
+ abstract class Heuristic(timing_data: Timing_Data, build_uuid: String)
+ extends Scheduler {
val host_infos = timing_data.host_infos
val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
+ def next(state: Build_Process.State): List[Config]
+
def build_schedule(build_state: Build_Process.State): Schedule = {
@tailrec
def simulate(state: State): State =
@@ -486,21 +554,21 @@
val start = Date.now()
val end_state =
- simulate(State(build_state, start.time, Schedule(toString, start, Graph.empty)))
+ simulate(State(build_state, start.time, Schedule(build_uuid, toString, start, Graph.empty)))
end_state.finished
}
}
- class Default_Heuristic(timing_data: Timing_Data, options: Options)
- extends Heuristic(timing_data) {
+ class Default_Heuristic(timing_data: Timing_Data, options: Options, build_uuid: String)
+ extends Heuristic(timing_data, build_uuid) {
override def toString: String = "default build heuristic"
def host_threads(host: Host): Int = {
val m = (options ++ host.build.options).int("threads")
if (m > 0) m else (host.num_cpus max 1) min 8
}
-
+
def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] =
sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _))
@@ -535,8 +603,9 @@
abstract class Path_Heuristic(
timing_data: Timing_Data,
sessions_structure: Sessions.Structure,
- max_threads_limit: Int
- ) extends Heuristic(timing_data) {
+ max_threads_limit: Int,
+ build_uuid: String
+ ) extends Heuristic(timing_data, build_uuid) {
/* pre-computed properties for efficient heuristic */
val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
@@ -570,7 +639,7 @@
def path_max_times(minimals: List[Node]): Map[Node, Time] =
path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
- def parallel_paths(minimals: List[Node], pred: Node => Boolean = _ => true): Int = {
+ def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
def start(node: Node): (Node, Time) = node -> best_times(node)
def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
@@ -594,40 +663,59 @@
(res max running.size, running2)
}
- parallel_paths(minimals.map(start).toMap)._1
+ parallel_paths(running.toMap)._1
}
}
object Path_Time_Heuristic {
- sealed trait Criterion
- case class Absolute_Time(time: Time) extends Criterion {
+ sealed trait Critical_Criterion
+ case class Absolute_Time(time: Time) extends Critical_Criterion {
override def toString: String = "absolute time (" + time.message_hms + ")"
}
- case class Relative_Time(factor: Double) extends Criterion {
+ case class Relative_Time(factor: Double) extends Critical_Criterion {
override def toString: String = "relative time (" + factor + ")"
}
- sealed trait Strategy
- case class Fixed_Thread(threads: Int) extends Strategy {
+ sealed trait Parallel_Strategy
+ case class Fixed_Thread(threads: Int) extends Parallel_Strategy {
override def toString: String = "fixed threads (" + threads + ")"
}
- case class Time_Based_Threads(f: Time => Int) extends Strategy {
+ case class Time_Based_Threads(f: Time => Int) extends Parallel_Strategy {
override def toString: String = "time based threads"
}
+
+ sealed trait Host_Criterion
+ case object Critical_Nodes extends Host_Criterion {
+ override def toString: String = "per critical node"
+ }
+ case class Fixed_Fraction(fraction: Double) extends Host_Criterion {
+ override def toString: String = "fixed fraction (" + fraction + ")"
+ }
+ case class Host_Speed(min_factor: Double) extends Host_Criterion {
+ override def toString: String = "host speed (" + min_factor + ")"
+ }
}
class Path_Time_Heuristic(
- is_critical: Path_Time_Heuristic.Criterion,
- parallel_threads: Path_Time_Heuristic.Strategy,
+ is_critical: Path_Time_Heuristic.Critical_Criterion,
+ parallel_threads: Path_Time_Heuristic.Parallel_Strategy,
+ host_criterion: Path_Time_Heuristic.Host_Criterion,
timing_data: Timing_Data,
sessions_structure: Sessions.Structure,
+ build_uuid: String,
max_threads_limit: Int = 8
- ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit) {
+ ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit, build_uuid) {
import Path_Time_Heuristic.*
- override def toString: Node =
- "path time heuristic (critical: " + is_critical + ", parallel: " + parallel_threads + ")"
+ override def toString: Node = {
+ val params =
+ List(
+ "critical: " + is_critical,
+ "parallel: " + parallel_threads,
+ "fast hosts: " + host_criterion)
+ "path time heuristic (" + params.mkString(", ") + ")"
+ }
def next(state: Build_Process.State): List[Config] = {
val resources = host_infos.available(state)
@@ -637,13 +725,25 @@
val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
- val resources0 = host_infos.available(state.copy(running = Map.empty))
- val max_parallel = parallel_paths(state.ready.map(_.name))
- val fully_parallelizable = max_parallel <= resources0.unused_nodes(max_threads).length
+ val available_nodes =
+ host_infos.available(state.copy(running = Map.empty))
+ .unused_nodes(max_threads)
+ .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse
+ def remaining_time(node: Node): (Node, Time) =
+ state.running.get(node) match {
+ case None => node -> best_time(node)
+ case Some(job) =>
+ val estimate =
+ timing_data.estimate(job.name, job.node_info.hostname,
+ host_infos.num_threads(job.node_info))
+ node -> ((Time.now() - job.start_date.time + estimate) max Time.zero)
+ }
+
+ val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time))
val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse
- if (fully_parallelizable) {
+ if (max_parallel <= available_nodes.length) {
val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task)))
resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
}
@@ -670,8 +770,24 @@
val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))
- val max_critical_parallel = parallel_paths(critical_minimals, critical_nodes.contains)
- val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(max_critical_parallel)
+ val max_critical_parallel =
+ parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains)
+ val max_critical_hosts =
+ available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length
+
+ val split =
+ this.host_criterion match {
+ case Critical_Nodes => max_critical_hosts
+ case Fixed_Fraction(fraction) =>
+ ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts
+ case Host_Speed(min_factor) =>
+ val best = rev_ordered_hosts.head._1.info.benchmark_score.get
+ val num_fast =
+ rev_ordered_hosts.count(_._1.info.benchmark_score.exists(_ >= best * min_factor))
+ num_fast min max_critical_hosts
+ }
+
+ val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split)
val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks)
val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks)
@@ -704,12 +820,49 @@
}
catch { case exn: Throwable => close(); throw exn }
+ private val _build_database: Option[SQL.Database] =
+ try {
+ for (db <- store.maybe_open_build_database(server = server)) yield {
+ if (build_context.master) {
+ Build_Schedule.private_data.transaction_lock(
+ db,
+ create = true,
+ label = "Build_Schedule.build_database"
+ ) { Build_Schedule.private_data.clean_build_schedules(db) }
+ db.vacuum(Build_Schedule.private_data.tables.list)
+ }
+ db
+ }
+ }
+ catch { case exn: Throwable => close(); throw exn }
+
override def close(): Unit = {
super.close()
Option(_log_database).foreach(_.close())
+ Option(_build_database).flatten.foreach(_.close())
}
+ /* global state: internal var vs. external database */
+
+ private var _schedule = Schedule.init(build_uuid)
+
+ override protected def synchronized_database[A](label: String)(body: => A): A =
+ super.synchronized_database(label) {
+ _build_database match {
+ case None => body
+ case Some(db) =>
+ Build_Schedule.private_data.transaction_lock(db, label = label) {
+ val old_schedule = Build_Schedule.private_data.pull_schedule(db, _schedule)
+ _schedule = old_schedule
+ val res = body
+ _schedule = Build_Schedule.private_data.update_schedule(db, _schedule, old_schedule)
+ res
+ }
+ }
+ }
+
+
/* previous results via build log */
override def open_build_cluster(): Build_Cluster = {
@@ -790,21 +943,8 @@
/* build process */
- case class Cache(state: Build_Process.State, configs: List[Config], estimate: Date) {
- def is_current(state: Build_Process.State): Boolean =
- this.state.pending.nonEmpty && this.state.results == state.results
- def is_current_estimate(estimate: Date): Boolean =
- Math.abs((this.estimate.time - estimate.time).ms) < Time.minutes(1).ms
- }
-
- private var cache = Cache(Build_Process.State(), Nil, Date.now())
-
- override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = {
- val configs =
- if (cache.is_current(state)) cache.configs
- else scheduler.next(state)
- configs.find(_.job_name == session_name).get.node_info
- }
+ override def next_node_info(state: Build_Process.State, session_name: String): Node_Info =
+ _schedule.graph.get_node(session_name).node_info
def is_current(state: Build_Process.State, session_name: String): Boolean =
state.ancestor_results(session_name) match {
@@ -829,30 +969,31 @@
case _ => false
}
- override def next_jobs(state: Build_Process.State): List[String] = {
- val finalize_limit = if (build_context.master) Int.MaxValue else 0
-
- if (progress.stopped) state.next_ready.map(_.name).take(finalize_limit)
- else if (cache.is_current(state)) cache.configs.map(_.job_name).filterNot(state.is_running)
+ override def next_jobs(state: Build_Process.State): List[String] =
+ if (!progress.stopped && !_schedule.is_outdated(state, Time.minutes(3), 10))
+ _schedule.next(hostname, state)
+ else if (!build_context.master) Nil
+ else if (progress.stopped) state.next_ready.map(_.name)
else {
val current = state.next_ready.filter(task => is_current(state, task.name))
- if (current.nonEmpty) current.map(_.name).take(finalize_limit)
+ if (current.nonEmpty) current.map(_.name)
else {
val start = Time.now()
- val next = scheduler.next(state)
- val schedule = scheduler.build_schedule(state)
+
+ val new_schedule = scheduler.build_schedule(state).update(state)
+ val schedule =
+ if (_schedule.graph.is_empty) new_schedule
+ else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering)
+
val elapsed = Time.now() - start
val timing_msg = if (elapsed.is_relevant) " (took " + elapsed.message + ")" else ""
- progress.echo_if(build_context.master && !cache.is_current_estimate(schedule.end),
- schedule.message + timing_msg)
+ progress.echo_if(_schedule.deviation(schedule).minutes > 1, schedule.message + timing_msg)
- val configs = next.filter(_.node_info.hostname == hostname)
- cache = Cache(state, configs, schedule.end)
- configs.map(_.job_name)
+ _schedule = schedule
+ _schedule.next(hostname, state)
}
}
- }
override def run(): Build.Results = {
val results = super.run()
@@ -861,6 +1002,167 @@
}
}
+
+ /** SQL data model of build schedule, extending isabelle_build database */
+
+ object private_data extends SQL.Data("isabelle_build") {
+ import Build_Process.private_data.{Base, Generic}
+
+
+ /* schedule */
+
+ object Schedules {
+ val build_uuid = Generic.build_uuid.make_primary_key
+ val generator = SQL.Column.string("generator")
+ val start = SQL.Column.date("start")
+ val serial = SQL.Column.long("serial")
+
+ val table = make_table(List(build_uuid, generator, start, serial), name = "schedules")
+ }
+
+ def read_serial(db: SQL.Database, build_uuid: String = ""): Long =
+ db.execute_query_statementO[Long](
+ Schedules.table.select(List(Schedules.serial.max), sql =
+ SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))),
+ _.long(Schedules.serial)).getOrElse(0L)
+
+ def read_scheduled_builds_domain(db: SQL.Database): List[String] =
+ db.execute_query_statement(
+ Schedules.table.select(List(Schedules.build_uuid)),
+ List.from[String], res => res.string(Schedules.build_uuid))
+
+ def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = {
+ val schedules =
+ db.execute_query_statement(Schedules.table.select(sql =
+ SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))),
+ List.from[Schedule],
+ { res =>
+ val build_uuid = res.string(Schedules.build_uuid)
+ val generator = res.string(Schedules.generator)
+ val start = res.date(Schedules.start)
+ Schedule(build_uuid, generator, start, Graph.empty)
+ })
+
+ for (schedule <- schedules.sortBy(_.start)(Date.Ordering)) yield {
+ val nodes = private_data.read_nodes(db, build_uuid = schedule.build_uuid)
+ schedule.copy(graph = Graph.make(nodes))
+ }
+ }
+
+ def write_schedule(db: SQL.Database, schedule: Schedule): Unit = {
+ db.execute_statement(
+ Schedules.table.delete(Schedules.build_uuid.where_equal(schedule.build_uuid)))
+ db.execute_statement(Schedules.table.insert(), { stmt =>
+ stmt.string(1) = schedule.build_uuid
+ stmt.string(2) = schedule.generator
+ stmt.date(3) = schedule.start
+ stmt.long(4) = schedule.serial
+ })
+ update_nodes(db, schedule.build_uuid, schedule.graph.dest)
+ }
+
+
+ /* nodes */
+
+ object Nodes {
+ val build_uuid = Generic.build_uuid.make_primary_key
+ val name = Generic.name.make_primary_key
+ val succs = SQL.Column.string("succs")
+ val hostname = SQL.Column.string("hostname")
+ val numa_node = SQL.Column.int("numa_node")
+ val rel_cpus = SQL.Column.string("rel_cpus")
+ val start = SQL.Column.date("start")
+ val duration = SQL.Column.long("duration")
+
+ val table =
+ make_table(
+ List(build_uuid, name, succs, hostname, numa_node, rel_cpus, start, duration),
+ name = "schedule_nodes")
+ }
+
+ type Nodes = List[((String, Schedule.Node), List[String])]
+
+ def read_nodes(db: SQL.Database, build_uuid: String = ""): Nodes = {
+ db.execute_query_statement(
+ Nodes.table.select(sql =
+ SQL.where(if_proper(build_uuid, Nodes.build_uuid.equal(build_uuid)))),
+ List.from[((String, Schedule.Node), List[String])],
+ { res =>
+ val name = res.string(Nodes.name)
+ val succs = split_lines(res.string(Nodes.succs))
+ val hostname = res.string(Nodes.hostname)
+ val numa_node = res.get_int(Nodes.numa_node)
+ val rel_cpus = res.string(Nodes.rel_cpus)
+ val start = res.date(Nodes.start)
+ val duration = Time.ms(res.long(Nodes.duration))
+
+ val node_info = Node_Info(hostname, numa_node, isabelle.Host.Range.from(rel_cpus))
+ ((name, Schedule.Node(name, node_info, start, duration)), succs)
+ }
+ )
+ }
+
+ def update_nodes(db: SQL.Database, build_uuid: String, nodes: Nodes): Unit = {
+ db.execute_statement(Nodes.table.delete(Nodes.build_uuid.where_equal(build_uuid)))
+ db.execute_batch_statement(Nodes.table.insert(), batch =
+ for (((name, node), succs) <- nodes) yield { (stmt: SQL.Statement) =>
+ stmt.string(1) = build_uuid
+ stmt.string(2) = name
+ stmt.string(3) = cat_lines(succs)
+ stmt.string(4) = node.node_info.hostname
+ stmt.int(5) = node.node_info.numa_node
+ stmt.string(6) = isabelle.Host.Range(node.node_info.rel_cpus)
+ stmt.date(7) = node.start
+ stmt.long(8) = node.duration.ms
+ })
+ }
+
+ def pull_schedule(db: SQL.Database, old_schedule: Schedule): Build_Schedule.Schedule = {
+ val serial_db = read_serial(db)
+ if (serial_db == old_schedule.serial) old_schedule
+ else {
+ read_schedules(db, old_schedule.build_uuid) match {
+ case Nil => old_schedule
+ case schedules => Library.the_single(schedules)
+ }
+ }
+ }
+
+ def update_schedule(db: SQL.Database, schedule: Schedule, old_schedule: Schedule): Schedule = {
+ val changed =
+ schedule.generator != old_schedule.generator ||
+ schedule.start != old_schedule.start ||
+ schedule.graph != old_schedule.graph
+
+ val schedule1 =
+ if (changed) schedule.copy(serial = old_schedule.serial).inc_serial else schedule
+ if (schedule1.serial != schedule.serial) write_schedule(db, schedule1)
+
+ schedule1
+ }
+
+ def remove_schedules(db: SQL.Database, remove: List[String]): Unit =
+ if (remove.nonEmpty) {
+ val sql = Generic.build_uuid.where_member(remove)
+ db.execute_statement(SQL.MULTI(tables.map(_.delete(sql = sql))))
+ }
+
+ def clean_build_schedules(db: SQL.Database): Unit = {
+ val running_builds_domain =
+ db.execute_query_statement(
+ Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)),
+ List.from[String], res => res.string(Base.build_uuid))
+
+ val (remove, _) =
+ Library.symmetric_difference(read_scheduled_builds_domain(db), running_builds_domain)
+
+ remove_schedules(db, remove)
+ }
+
+ override val tables = SQL.Tables(Schedules.table, Nodes.table)
+ }
+
+
class Engine extends Build.Engine("build_schedule") {
def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = {
@@ -880,15 +1182,23 @@
case time if time < Time.minutes(5) => 4
case _ => 8
}))
+ val machine_splits =
+ List(
+ Path_Time_Heuristic.Critical_Nodes,
+ Path_Time_Heuristic.Fixed_Fraction(0.3),
+ Path_Time_Heuristic.Host_Speed(0.9))
val path_time_heuristics =
for {
is_critical <- is_criticals
parallel <- parallel_threads
- } yield Path_Time_Heuristic(is_critical, parallel, timing_data, sessions_structure)
- val heuristics = Default_Heuristic(timing_data, context.build_options) :: path_time_heuristics
-
- new Meta_Heuristic(heuristics)
+ machine_split <- machine_splits
+ } yield
+ Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure,
+ context.build_uuid)
+ val default_heuristic =
+ Default_Heuristic(timing_data, context.build_options, context.build_uuid)
+ new Meta_Heuristic(default_heuristic :: path_time_heuristics)
}
override def open_build_process(
@@ -985,4 +1295,239 @@
}
}
}
+
+ def write_schedule_graphic(schedule: Schedule, output: Path): Unit = {
+ import java.awt.geom.{GeneralPath, Rectangle2D}
+ import java.awt.{BasicStroke, Color, Graphics2D}
+
+ val line_height = isabelle.graphview.Metrics.default.height
+ val char_width = isabelle.graphview.Metrics.default.char_width
+ val padding = isabelle.graphview.Metrics.default.space_width
+ val gap = isabelle.graphview.Metrics.default.gap
+
+ val graph = schedule.graph
+
+ def text_width(text: String): Double = text.length * char_width
+
+ val generator_height = line_height + padding
+ val hostname_height = generator_height + line_height + padding
+ def time_height(time: Time): Double = time.seconds
+ def date_height(date: Date): Double = time_height(date.time - schedule.start.time)
+
+ val hosts = graph.iterator.map(_._2._1).toList.groupBy(_.node_info.hostname)
+
+ def node_width(node: Schedule.Node): Double = 2 * padding + text_width(node.job_name)
+
+ case class Range(start: Double, stop: Double) {
+ def proper: List[Range] = if (start < stop) List(this) else Nil
+ def width: Double = stop - start
+ }
+
+ val rel_node_ranges =
+ hosts.toList.flatMap { (hostname, nodes) =>
+ val sorted = nodes.sortBy(node => (node.start.time.ms, node.end.time.ms, node.job_name))
+ sorted.foldLeft((List.empty[Schedule.Node], Map.empty[Schedule.Node, Range])) {
+ case ((nodes, allocated), node) =>
+ val width = node_width(node) + padding
+ val parallel = nodes.filter(_.end.time > node.start.time)
+ val (last, slots) =
+ parallel.sortBy(allocated(_).start).foldLeft((0D, List.empty[Range])) {
+ case ((start, ranges), node1) =>
+ val node_range = allocated(node1)
+ (node_range.stop, ranges ::: Range(start, node_range.start).proper)
+ }
+ val start =
+ (Range(last, Double.MaxValue) :: slots.filter(_.width >= width)).minBy(_.width).start
+ (node :: parallel, allocated + (node -> Range(start, start + width)))
+ }._2
+ }.toMap
+
+ def host_width(hostname: String) =
+ 2 * padding + (hosts(hostname).map(rel_node_ranges(_).stop).max max text_width(hostname))
+
+ def graph_height(graph: Graph[String, Schedule.Node]): Double =
+ date_height(graph.maximals.map(graph.get_node(_).end).maxBy(_.unix_epoch))
+
+ val height = (hostname_height + 2 * padding + graph_height(graph)).ceil.toInt
+ val (last, host_starts) =
+ hosts.keys.foldLeft((0D, Map.empty[String, Double])) {
+ case ((previous, starts), hostname) =>
+ (previous + gap + host_width(hostname), starts + (hostname -> previous))
+ }
+ val width = (last - gap).ceil.toInt
+
+ def node_start(node: Schedule.Node): Double =
+ host_starts(node.node_info.hostname) + padding + rel_node_ranges(node).start
+
+ def paint(gfx: Graphics2D): Unit = {
+ gfx.setColor(Color.LIGHT_GRAY)
+ gfx.fillRect(0, 0, width, height)
+ gfx.setRenderingHints(isabelle.graphview.Metrics.rendering_hints)
+ gfx.setFont(isabelle.graphview.Metrics.default.font)
+ gfx.setStroke(new BasicStroke(1, BasicStroke.CAP_BUTT, BasicStroke.JOIN_ROUND))
+
+ draw_string(schedule.generator + ", build time: " + schedule.duration.message_hms, padding, 0)
+
+ def draw_host(x: Double, hostname: String): Double = {
+ val nodes = hosts(hostname).map(_.job_name).toSet
+ val width = host_width(hostname)
+ val height = 2 * padding + graph_height(graph.restrict(nodes.contains))
+ val padding1 = ((width - text_width(hostname)) / 2) max 0
+ val rect = new Rectangle2D.Double(x, hostname_height, width, height)
+ gfx.setColor(Color.BLACK)
+ gfx.draw(rect)
+ gfx.setColor(Color.GRAY)
+ gfx.fill(rect)
+ draw_string(hostname, x + padding1, generator_height)
+ x + gap + width
+ }
+
+ def draw_string(str: String, x: Double, y: Double): Unit = {
+ gfx.setColor(Color.BLACK)
+ gfx.drawString(str, x.toInt, (y + line_height).toInt)
+ }
+
+ def node_rect(node: Schedule.Node): Rectangle2D.Double = {
+ val x = node_start(node)
+ val y = hostname_height + padding + date_height(node.start)
+ val width = node_width(node)
+ val height = time_height(node.duration)
+ new Rectangle2D.Double(x, y, width, height)
+ }
+
+ def draw_node(node: Schedule.Node): Rectangle2D.Double = {
+ val rect = node_rect(node)
+ gfx.setColor(Color.BLACK)
+ gfx.draw(rect)
+ gfx.setColor(Color.WHITE)
+ gfx.fill(rect)
+
+ def add_text(y: Double, text: String): Double =
+ if (line_height > rect.height - y || text_width(text) + 2 * padding > rect.width) y
+ else {
+ val padding1 = padding min ((rect.height - (y + line_height)) / 2)
+ draw_string(text, rect.x + padding, rect.y + y + padding1)
+ y + padding1 + line_height
+ }
+
+ val node_info = node.node_info
+
+ val duration_str = "(" + node.duration.message_hms + ")"
+ val node_str =
+ "on " + proper_string(node_info.toString.stripPrefix(node_info.hostname)).getOrElse("all")
+ val start_str = "Start: " + (node.start.time - schedule.start.time).message_hms
+
+ List(node.job_name, duration_str, node_str, start_str).foldLeft(0D)(add_text)
+
+ rect
+ }
+
+ def draw_arrow(from: Schedule.Node, to: Rectangle2D.Double, curve: Double = 10): Unit = {
+ val from_rect = node_rect(from)
+
+ val path = new GeneralPath()
+ path.moveTo(from_rect.getCenterX, from_rect.getMaxY)
+ path.lineTo(to.getCenterX, to.getMinY)
+
+ gfx.setColor(Color.BLUE)
+ gfx.draw(path)
+ }
+
+ hosts.keys.foldLeft(0D)(draw_host)
+
+ graph.topological_order.foreach { job_name =>
+ val node = graph.get_node(job_name)
+ val rect = draw_node(node)
+
+ graph.imm_preds(job_name).foreach(pred => draw_arrow(graph.get_node(pred), rect))
+ }
+ }
+
+ val name = output.file_name
+ if (File.is_png(name)) Graphics_File.write_png(output.file, paint, width, height)
+ else if (File.is_pdf(name)) Graphics_File.write_pdf(output.file, paint, width, height)
+ else error("Bad type of file: " + quote(name) + " (.png or .pdf expected)")
+ }
+
+
+ /* command-line wrapper */
+
+ val isabelle_tool = Isabelle_Tool("build_schedule", "generate build schedule", Scala_Project.here,
+ { args =>
+ var afp_root: Option[Path] = None
+ val base_sessions = new mutable.ListBuffer[String]
+ val select_dirs = new mutable.ListBuffer[Path]
+ val build_hosts = new mutable.ListBuffer[Build_Cluster.Host]
+ var numa_shuffling = false
+ var output_file: Option[Path] = None
+ var requirements = false
+ val exclude_session_groups = new mutable.ListBuffer[String]
+ var all_sessions = false
+ val dirs = new mutable.ListBuffer[Path]
+ val session_groups = new mutable.ListBuffer[String]
+ var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS)
+ var verbose = false
+ val exclude_sessions = new mutable.ListBuffer[String]
+
+ val getopts = Getopts("""
+Usage: isabelle build_schedule [OPTIONS] [SESSIONS ...]
+
+ Options are:
+ -A ROOT include AFP with given root directory (":" for """ + AFP.BASE.implode + """)
+ -B NAME include session NAME and all descendants
+ -D DIR include session directory and select its sessions
+ -H HOSTS additional build cluster host specifications, of the form
+ "NAMES:PARAMETERS" (separated by commas)
+ -N cyclic shuffling of NUMA CPU nodes (performance tuning)
+ -O FILE output file
+ -R refer to requirements of selected sessions
+ -X NAME exclude sessions from group NAME and all descendants
+ -a select all sessions
+ -d DIR include session directory
+ -g NAME select session group NAME
+ -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME)
+ -v verbose
+ -x NAME exclude session NAME and all descendants
+
+ Generate build graph.
+""",
+ "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))),
+ "B:" -> (arg => base_sessions += arg),
+ "D:" -> (arg => select_dirs += Path.explode(arg)),
+ "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.global, arg)),
+ "N" -> (_ => numa_shuffling = true),
+ "O:" -> (arg => output_file = Some(Path.explode(arg))),
+ "R" -> (_ => requirements = true),
+ "X:" -> (arg => exclude_session_groups += arg),
+ "a" -> (_ => all_sessions = true),
+ "d:" -> (arg => dirs += Path.explode(arg)),
+ "g:" -> (arg => session_groups += arg),
+ "o:" -> (arg => options = options + arg),
+ "v" -> (_ => verbose = true),
+ "x:" -> (arg => exclude_sessions += arg))
+
+ val sessions = getopts(args)
+
+ val progress = new Console_Progress(verbose = verbose)
+
+ val schedule =
+ build_schedule(options,
+ selection = Sessions.Selection(
+ requirements = requirements,
+ all_sessions = all_sessions,
+ base_sessions = base_sessions.toList,
+ exclude_session_groups = exclude_session_groups.toList,
+ exclude_sessions = exclude_sessions.toList,
+ session_groups = session_groups.toList,
+ sessions = sessions),
+ progress = progress,
+ afp_root = afp_root,
+ dirs = dirs.toList,
+ select_dirs = select_dirs.toList,
+ numa_shuffling = isabelle.Host.numa_check(progress, numa_shuffling),
+ build_hosts = build_hosts.toList)
+
+ if (!schedule.graph.is_empty && output_file.nonEmpty)
+ write_schedule_graphic(schedule, output_file.get)
+ })
}