--- a/src/Pure/Build/build_schedule.scala Sat Mar 16 09:05:17 2024 +0100
+++ b/src/Pure/Build/build_schedule.scala Sun Mar 17 07:45:12 2024 +0100
@@ -7,8 +7,10 @@
import Host.Node_Info
+
import scala.annotation.tailrec
import scala.collection.mutable
+import scala.Ordering.Implicits.seqOrdering
object Build_Schedule {
@@ -101,7 +103,7 @@
lazy val by_threads: Map[Int, Facet] = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap
lazy val by_hostname: Map[String, Facet] = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap
- def mean_time: Time = Timing_Data.mean_time(results.map(_.elapsed))
+ def median_time: Time = Timing_Data.median_time(results.map(_.elapsed))
def best_result: Result = results.minBy(_.elapsed.ms)
}
@@ -120,7 +122,7 @@
facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
case (hostname, facet) =>
val best_threads = facet.best_result.threads
- facet.by_threads.keys.toList.sorted.find(_ > best_threads).map(
+ facet.by_threads.keys.toList.sorted.find(_ > best_threads).map(
inflection_point(best_threads, _))
}
(max_threads :: worse_threads).min
@@ -189,13 +191,13 @@
private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = {
def unify(hostname: String, facet: Timing_Data.Facet) =
- facet.mean_time.scale(hostname_factor(hostname, on_host))
+ facet.median_time.scale(hostname_factor(hostname, on_host))
for {
facet <- facet.by_job.get(job_name).toList
(threads, facet) <- facet.by_threads
entries = facet.by_hostname.toList.map(unify)
- } yield threads -> Timing_Data.median_time(entries)
+ } yield threads -> Timing_Data.mean_time(entries)
}
def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = {
@@ -203,8 +205,8 @@
val entries =
facet.by_threads.toList match {
case List((i, Timing_Data.Facet(List(result)))) if i != 1 =>
- (i, facet.mean_time) :: result.proper_cpu.map(1 -> _).toList
- case entries => entries.map((threads, facet) => threads -> facet.mean_time)
+ (i, facet.median_time) :: result.proper_cpu.map(1 -> _).toList
+ case entries => entries.map((threads, facet) => threads -> facet.median_time)
}
if (entries.size < 2) None else Some(approximate_threads(entries, threads))
}
@@ -212,7 +214,7 @@
for {
facet <- facet.by_job.get(job_name)
facet <- facet.by_hostname.get(hostname)
- time <- facet.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(facet))
+ time <- facet.by_threads.get(threads).map(_.median_time).orElse(try_approximate(facet))
} yield time
}
@@ -245,10 +247,10 @@
}
private var cache: Map[(String, String, Int), Time] = Map.empty
-
-
+
+
/* approximation factors -- penalize estimations with less information */
-
+
val FACTOR_NO_THREADS_GLOBAL_CURVE = 2.5
val FACTOR_NO_THREADS_UNIFY_MACHINES = 1.7
val FACTOR_NO_THREADS_OTHER_MACHINE = 1.5
@@ -274,7 +276,7 @@
else {
// no other job to estimate from, use global curve to approximate any other job
val (threads1, facet1) = facet.by_threads.head
- facet1.mean_time.scale(global_threads_factor(threads1, threads))
+ facet1.median_time.scale(global_threads_factor(threads1, threads))
}
}
@@ -291,7 +293,7 @@
factor = hostname_factor(hostname1, hostname)
} yield estimate.scale(factor)
- if (approximated.nonEmpty)
+ if (approximated.nonEmpty)
Timing_Data.mean_time(approximated).scale(FACTOR_NO_THREADS_OTHER_MACHINE)
else {
// no single machine where config can be approximated, unify data points
@@ -310,10 +312,10 @@
}
case Some(facet) => // time for job/thread exists, interpolate machine if necessary
- facet.by_hostname.get(hostname).map(_.mean_time).getOrElse {
- Timing_Data.median_time(
+ facet.by_hostname.get(hostname).map(_.median_time).getOrElse {
+ Timing_Data.mean_time(
facet.by_hostname.toList.map((hostname1, facet) =>
- facet.mean_time.scale(hostname_factor(hostname1, hostname)))).scale(
+ facet.median_time.scale(hostname_factor(hostname1, hostname)))).scale(
FACTOR_THREADS_OTHER_MACHINE)
}
}
@@ -489,6 +491,55 @@
type Graph = isabelle.Graph[String, Node]
def init(build_uuid: String): Schedule = Schedule(build_uuid, "none", Date.now(), Graph.empty)
+
+
+ /* file representation */
+
+ def write(value: Schedule, file: Path): Unit = {
+ import XML.Encode._
+
+ def time: T[Time] = (time => long(time.ms))
+ def date: T[Date] = (date => time(date.time))
+ def node_info: T[Node_Info] =
+ (node_info => triple(string, option(int), list(int))(
+ (node_info.hostname, node_info.numa_node, node_info.rel_cpus)))
+ def node: T[Node] =
+ (node => pair(string, pair(node_info, pair(date, time)))(
+ (node.job_name, (node.node_info, (node.start, node.duration)))))
+ def schedule: T[Schedule] =
+ (schedule =>
+ pair(string, pair(string, pair(date, pair(Graph.encode(string, node), long))))((
+ schedule.build_uuid, (schedule.generator, (schedule.start, (schedule.graph,
+ schedule.serial))))))
+
+ File.write(file, YXML.string_of_body(schedule(value)))
+ }
+
+ def read(file: Path): Schedule = {
+ import XML.Decode._
+
+ def time: T[Time] = { body => Time.ms(long(body)) }
+ def date: T[Date] = { body => Date(time(body)) }
+ def node_info: T[Node_Info] =
+ { body =>
+ val (hostname, numa_node, rel_cpus) = triple(string, option(int), list(int))(body)
+ Node_Info(hostname, numa_node, rel_cpus)
+ }
+ val node: T[Schedule.Node] =
+ { body =>
+ val (job_name, (info, (start, duration))) =
+ pair(string, pair(node_info, pair(date, time)))(body)
+ Node(job_name, info, start, duration)
+ }
+ def schedule: T[Schedule] =
+ { body =>
+ val (build_uuid, (generator, (start, (graph, serial)))) =
+ pair(string, pair(string, (pair(date, pair(Graph.decode(string, node), long)))))(body)
+ Schedule(build_uuid, generator, start, graph, serial)
+ }
+
+ schedule(YXML.parse_body(File.read(file)))
+ }
}
case class Schedule(
@@ -505,6 +556,7 @@
else graph.maximals.map(graph.get_node).map(_.end).max(Date.Ordering)
def duration: Time = end - start
+ def durations: List[Time] = graph.keys.map(graph.get_node(_).end - start)
def message: String = "Estimated " + duration.message_hms + " build time with " + generator
def deviation(other: Schedule): Time = Time.ms((end - other.end).ms.abs)
@@ -514,10 +566,7 @@
def is_empty: Boolean = graph.is_empty
def is_outdated(options: Options, state: Build_Process.State): Boolean =
if (is_empty) true
- else {
- num_built(state) > options.int("build_schedule_outdated_limit") &&
- elapsed() > options.seconds("build_schedule_outdated_delay")
- }
+ else elapsed() > options.seconds("build_schedule_outdated_delay")
def next(hostname: String, state: Build_Process.State): List[String] =
for {
@@ -527,9 +576,9 @@
if graph.imm_preds(node.job_name).subsetOf(state.results.keySet)
} yield task.name
- def exists_next(hostname: String, state: Build_Process.State): Boolean =
+ def exists_next(hostname: String, state: Build_Process.State): Boolean =
next(hostname, state).nonEmpty
-
+
def update(state: Build_Process.State): Schedule = {
val start1 = Date.now()
@@ -632,7 +681,7 @@
def schedule(state: Build_Process.State): Schedule = {
def main(scheduler: Scheduler): Schedule = scheduler.schedule(state)
- Par_List.map(main, schedulers).minBy(_.duration.ms)
+ Par_List.map(main, schedulers).minBy(_.durations.map(_.ms).sorted.reverse)
}
}
@@ -1063,7 +1112,7 @@
fresh_build = build_context.fresh_build,
store_heap = store_heap)._1
case _ => false
- }
+ }
override def next_jobs(state: Build_Process.State): List[String] =
if (progress.stopped) state.next_ready.map(_.name)
@@ -1115,7 +1164,7 @@
if _schedule.exists_next(host.name, _state)
} build_send(Build_Schedule.private_data.channel_ready(host.name))
}
- while(!build_action()) {}
+ while (!build_action()) {}
}
}
finally {
@@ -1282,7 +1331,7 @@
val schedule1 =
if (changed) schedule.copy(serial = old_schedule.next_serial) else schedule
if (schedule1.serial != schedule.serial) write_schedule(db, schedule1)
-
+
schedule1
}
@@ -1352,8 +1401,22 @@
server: SSH.Server
): Build_Process =
if (!context.master) new Scheduled_Build_Process(context, progress, server)
- else new Scheduler_Build_Process(context, progress, server) {
- def init_scheduler(timing_data: Timing_Data): Scheduler = scheduler(timing_data, context)
+ else {
+ val schedule_file = context.build_options.string("build_schedule")
+ if (schedule_file.isEmpty) {
+ new Scheduler_Build_Process(context, progress, server) {
+ def init_scheduler(timing_data: Timing_Data): Scheduler =
+ scheduler(timing_data, context)
+ }
+ }
+ else {
+ val finished_schedule =
+ Schedule.read(Path.explode(schedule_file)).copy(build_uuid = context.build_uuid)
+ new Scheduler_Build_Process(context, progress, server) {
+ def init_scheduler(timing_data: Timing_Data): Scheduler =
+ (build_state: Build_Process.State) => finished_schedule
+ }
+ }
}
}
object Build_Engine extends Build_Engine
@@ -1375,6 +1438,8 @@
session_setup: (String, Session) => Unit = (_, _) => (),
cache: Term.Cache = Term.Cache.make()
): Schedule = {
+ Build.build_process(options, build_cluster = true, remove_builds = true)
+
val store =
Build_Engine.build_store(options, build_cluster = build_hosts.nonEmpty, cache = cache)
val log_store = Build_Log.store(options, cache = cache)
@@ -1389,7 +1454,6 @@
val full_sessions =
Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs,
select_dirs = select_dirs, infos = infos, augment_options = augment_options)
- val full_sessions_selection = full_sessions.imports_selection(selection)
val build_deps =
Sessions.deps(full_sessions.selection(selection), progress = progress,
@@ -1624,7 +1688,7 @@
-H HOSTS additional cluster host specifications of the form
NAMES:PARAMETERS (separated by commas)
-N cyclic shuffling of NUMA CPU nodes (performance tuning)
- -O FILE output file
+ -O FILE output file (pdf or png for image, else yxml)
-R refer to requirements of selected sessions
-X NAME exclude sessions from group NAME and all descendants
-a select all sessions
@@ -1634,7 +1698,7 @@
-v verbose
-x NAME exclude session NAME and all descendants
- Generate build graph for scheduling.
+ Generate build schedule, but do not run actual build.
""",
"A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))),
"B:" -> (arg => base_sessions += arg),
@@ -1672,7 +1736,12 @@
numa_shuffling = isabelle.Host.numa_check(progress, numa_shuffling),
build_hosts = build_hosts.toList)
- if (!schedule.is_empty && output_file.nonEmpty)
- write_schedule_graphic(schedule, output_file.get)
+ output_file match {
+ case Some(output_file) if !schedule.is_empty =>
+ if (File.is_pdf(output_file.file_name) || File.is_png(output_file.file_name))
+ write_schedule_graphic(schedule, output_file)
+ else Schedule.write(schedule, output_file)
+ case _ =>
+ }
})
}