# HG changeset patch # User desharna # Date 1710657912 -3600 # Node ID 87a04ce7e3c39e0658540b230c12efea9fdde9bd # Parent d0205dde00bb7c7d4071506776ed50a0df3d4d63# Parent cfeb3a8f241d72b300b61362313f5c6e4cb9dbfa merged diff -r d0205dde00bb -r 87a04ce7e3c3 etc/options --- a/etc/options Sat Mar 16 09:05:17 2024 +0100 +++ b/etc/options Sun Mar 17 07:45:12 2024 +0100 @@ -219,12 +219,12 @@ option build_cluster_identifier : string = "build_cluster" -- "ISABELLE_IDENTIFIER for remote build cluster sessions" +option build_schedule : string = "" + -- "path to pre-computed schedule" + option build_schedule_outdated_delay : real = 300.0 -- "delay schedule generation loop" -option build_schedule_outdated_limit : int = 20 - -- "maximum number of sessions for which schedule stays valid" - section "Editor Session" diff -r d0205dde00bb -r 87a04ce7e3c3 src/Pure/Build/build_schedule.scala --- a/src/Pure/Build/build_schedule.scala Sat Mar 16 09:05:17 2024 +0100 +++ b/src/Pure/Build/build_schedule.scala Sun Mar 17 07:45:12 2024 +0100 @@ -7,8 +7,10 @@ import Host.Node_Info + import scala.annotation.tailrec import scala.collection.mutable +import scala.Ordering.Implicits.seqOrdering object Build_Schedule { @@ -101,7 +103,7 @@ lazy val by_threads: Map[Int, Facet] = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap lazy val by_hostname: Map[String, Facet] = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap - def mean_time: Time = Timing_Data.mean_time(results.map(_.elapsed)) + def median_time: Time = Timing_Data.median_time(results.map(_.elapsed)) def best_result: Result = results.minBy(_.elapsed.ms) } @@ -120,7 +122,7 @@ facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap { case (hostname, facet) => val best_threads = facet.best_result.threads - facet.by_threads.keys.toList.sorted.find(_ > best_threads).map( + facet.by_threads.keys.toList.sorted.find(_ > best_threads).map( inflection_point(best_threads, _)) } (max_threads :: worse_threads).min @@ -189,13 +191,13 @@ private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = { def unify(hostname: String, facet: Timing_Data.Facet) = - facet.mean_time.scale(hostname_factor(hostname, on_host)) + facet.median_time.scale(hostname_factor(hostname, on_host)) for { facet <- facet.by_job.get(job_name).toList (threads, facet) <- facet.by_threads entries = facet.by_hostname.toList.map(unify) - } yield threads -> Timing_Data.median_time(entries) + } yield threads -> Timing_Data.mean_time(entries) } def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = { @@ -203,8 +205,8 @@ val entries = facet.by_threads.toList match { case List((i, Timing_Data.Facet(List(result)))) if i != 1 => - (i, facet.mean_time) :: result.proper_cpu.map(1 -> _).toList - case entries => entries.map((threads, facet) => threads -> facet.mean_time) + (i, facet.median_time) :: result.proper_cpu.map(1 -> _).toList + case entries => entries.map((threads, facet) => threads -> facet.median_time) } if (entries.size < 2) None else Some(approximate_threads(entries, threads)) } @@ -212,7 +214,7 @@ for { facet <- facet.by_job.get(job_name) facet <- facet.by_hostname.get(hostname) - time <- facet.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(facet)) + time <- facet.by_threads.get(threads).map(_.median_time).orElse(try_approximate(facet)) } yield time } @@ -245,10 +247,10 @@ } private var cache: Map[(String, String, Int), Time] = Map.empty - - + + /* approximation factors -- penalize estimations with less information */ - + val FACTOR_NO_THREADS_GLOBAL_CURVE = 2.5 val FACTOR_NO_THREADS_UNIFY_MACHINES = 1.7 val FACTOR_NO_THREADS_OTHER_MACHINE = 1.5 @@ -274,7 +276,7 @@ else { // no other job to estimate from, use global curve to approximate any other job val (threads1, facet1) = facet.by_threads.head - facet1.mean_time.scale(global_threads_factor(threads1, threads)) + facet1.median_time.scale(global_threads_factor(threads1, threads)) } } @@ -291,7 +293,7 @@ factor = hostname_factor(hostname1, hostname) } yield estimate.scale(factor) - if (approximated.nonEmpty) + if (approximated.nonEmpty) Timing_Data.mean_time(approximated).scale(FACTOR_NO_THREADS_OTHER_MACHINE) else { // no single machine where config can be approximated, unify data points @@ -310,10 +312,10 @@ } case Some(facet) => // time for job/thread exists, interpolate machine if necessary - facet.by_hostname.get(hostname).map(_.mean_time).getOrElse { - Timing_Data.median_time( + facet.by_hostname.get(hostname).map(_.median_time).getOrElse { + Timing_Data.mean_time( facet.by_hostname.toList.map((hostname1, facet) => - facet.mean_time.scale(hostname_factor(hostname1, hostname)))).scale( + facet.median_time.scale(hostname_factor(hostname1, hostname)))).scale( FACTOR_THREADS_OTHER_MACHINE) } } @@ -489,6 +491,55 @@ type Graph = isabelle.Graph[String, Node] def init(build_uuid: String): Schedule = Schedule(build_uuid, "none", Date.now(), Graph.empty) + + + /* file representation */ + + def write(value: Schedule, file: Path): Unit = { + import XML.Encode._ + + def time: T[Time] = (time => long(time.ms)) + def date: T[Date] = (date => time(date.time)) + def node_info: T[Node_Info] = + (node_info => triple(string, option(int), list(int))( + (node_info.hostname, node_info.numa_node, node_info.rel_cpus))) + def node: T[Node] = + (node => pair(string, pair(node_info, pair(date, time)))( + (node.job_name, (node.node_info, (node.start, node.duration))))) + def schedule: T[Schedule] = + (schedule => + pair(string, pair(string, pair(date, pair(Graph.encode(string, node), long))))(( + schedule.build_uuid, (schedule.generator, (schedule.start, (schedule.graph, + schedule.serial)))))) + + File.write(file, YXML.string_of_body(schedule(value))) + } + + def read(file: Path): Schedule = { + import XML.Decode._ + + def time: T[Time] = { body => Time.ms(long(body)) } + def date: T[Date] = { body => Date(time(body)) } + def node_info: T[Node_Info] = + { body => + val (hostname, numa_node, rel_cpus) = triple(string, option(int), list(int))(body) + Node_Info(hostname, numa_node, rel_cpus) + } + val node: T[Schedule.Node] = + { body => + val (job_name, (info, (start, duration))) = + pair(string, pair(node_info, pair(date, time)))(body) + Node(job_name, info, start, duration) + } + def schedule: T[Schedule] = + { body => + val (build_uuid, (generator, (start, (graph, serial)))) = + pair(string, pair(string, (pair(date, pair(Graph.decode(string, node), long)))))(body) + Schedule(build_uuid, generator, start, graph, serial) + } + + schedule(YXML.parse_body(File.read(file))) + } } case class Schedule( @@ -505,6 +556,7 @@ else graph.maximals.map(graph.get_node).map(_.end).max(Date.Ordering) def duration: Time = end - start + def durations: List[Time] = graph.keys.map(graph.get_node(_).end - start) def message: String = "Estimated " + duration.message_hms + " build time with " + generator def deviation(other: Schedule): Time = Time.ms((end - other.end).ms.abs) @@ -514,10 +566,7 @@ def is_empty: Boolean = graph.is_empty def is_outdated(options: Options, state: Build_Process.State): Boolean = if (is_empty) true - else { - num_built(state) > options.int("build_schedule_outdated_limit") && - elapsed() > options.seconds("build_schedule_outdated_delay") - } + else elapsed() > options.seconds("build_schedule_outdated_delay") def next(hostname: String, state: Build_Process.State): List[String] = for { @@ -527,9 +576,9 @@ if graph.imm_preds(node.job_name).subsetOf(state.results.keySet) } yield task.name - def exists_next(hostname: String, state: Build_Process.State): Boolean = + def exists_next(hostname: String, state: Build_Process.State): Boolean = next(hostname, state).nonEmpty - + def update(state: Build_Process.State): Schedule = { val start1 = Date.now() @@ -632,7 +681,7 @@ def schedule(state: Build_Process.State): Schedule = { def main(scheduler: Scheduler): Schedule = scheduler.schedule(state) - Par_List.map(main, schedulers).minBy(_.duration.ms) + Par_List.map(main, schedulers).minBy(_.durations.map(_.ms).sorted.reverse) } } @@ -1063,7 +1112,7 @@ fresh_build = build_context.fresh_build, store_heap = store_heap)._1 case _ => false - } + } override def next_jobs(state: Build_Process.State): List[String] = if (progress.stopped) state.next_ready.map(_.name) @@ -1115,7 +1164,7 @@ if _schedule.exists_next(host.name, _state) } build_send(Build_Schedule.private_data.channel_ready(host.name)) } - while(!build_action()) {} + while (!build_action()) {} } } finally { @@ -1282,7 +1331,7 @@ val schedule1 = if (changed) schedule.copy(serial = old_schedule.next_serial) else schedule if (schedule1.serial != schedule.serial) write_schedule(db, schedule1) - + schedule1 } @@ -1352,8 +1401,22 @@ server: SSH.Server ): Build_Process = if (!context.master) new Scheduled_Build_Process(context, progress, server) - else new Scheduler_Build_Process(context, progress, server) { - def init_scheduler(timing_data: Timing_Data): Scheduler = scheduler(timing_data, context) + else { + val schedule_file = context.build_options.string("build_schedule") + if (schedule_file.isEmpty) { + new Scheduler_Build_Process(context, progress, server) { + def init_scheduler(timing_data: Timing_Data): Scheduler = + scheduler(timing_data, context) + } + } + else { + val finished_schedule = + Schedule.read(Path.explode(schedule_file)).copy(build_uuid = context.build_uuid) + new Scheduler_Build_Process(context, progress, server) { + def init_scheduler(timing_data: Timing_Data): Scheduler = + (build_state: Build_Process.State) => finished_schedule + } + } } } object Build_Engine extends Build_Engine @@ -1375,6 +1438,8 @@ session_setup: (String, Session) => Unit = (_, _) => (), cache: Term.Cache = Term.Cache.make() ): Schedule = { + Build.build_process(options, build_cluster = true, remove_builds = true) + val store = Build_Engine.build_store(options, build_cluster = build_hosts.nonEmpty, cache = cache) val log_store = Build_Log.store(options, cache = cache) @@ -1389,7 +1454,6 @@ val full_sessions = Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs, select_dirs = select_dirs, infos = infos, augment_options = augment_options) - val full_sessions_selection = full_sessions.imports_selection(selection) val build_deps = Sessions.deps(full_sessions.selection(selection), progress = progress, @@ -1624,7 +1688,7 @@ -H HOSTS additional cluster host specifications of the form NAMES:PARAMETERS (separated by commas) -N cyclic shuffling of NUMA CPU nodes (performance tuning) - -O FILE output file + -O FILE output file (pdf or png for image, else yxml) -R refer to requirements of selected sessions -X NAME exclude sessions from group NAME and all descendants -a select all sessions @@ -1634,7 +1698,7 @@ -v verbose -x NAME exclude session NAME and all descendants - Generate build graph for scheduling. + Generate build schedule, but do not run actual build. """, "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))), "B:" -> (arg => base_sessions += arg), @@ -1672,7 +1736,12 @@ numa_shuffling = isabelle.Host.numa_check(progress, numa_shuffling), build_hosts = build_hosts.toList) - if (!schedule.is_empty && output_file.nonEmpty) - write_schedule_graphic(schedule, output_file.get) + output_file match { + case Some(output_file) if !schedule.is_empty => + if (File.is_pdf(output_file.file_name) || File.is_png(output_file.file_name)) + write_schedule_graphic(schedule, output_file) + else Schedule.write(schedule, output_file) + case _ => + } }) }