merged
authordesharna
Sun, 17 Mar 2024 07:45:12 +0100
changeset 79918 87a04ce7e3c3
parent 79917 d0205dde00bb (current diff)
parent 79916 cfeb3a8f241d (diff)
child 79919 65e0682cca63
merged
--- a/etc/options	Sat Mar 16 09:05:17 2024 +0100
+++ b/etc/options	Sun Mar 17 07:45:12 2024 +0100
@@ -219,12 +219,12 @@
 option build_cluster_identifier : string = "build_cluster"
   -- "ISABELLE_IDENTIFIER for remote build cluster sessions"
 
+option build_schedule : string = ""
+  -- "path to pre-computed schedule"
+
 option build_schedule_outdated_delay : real = 300.0
   -- "delay schedule generation loop"
 
-option build_schedule_outdated_limit : int = 20
-  -- "maximum number of sessions for which schedule stays valid"
-
 
 section "Editor Session"
 
--- a/src/Pure/Build/build_schedule.scala	Sat Mar 16 09:05:17 2024 +0100
+++ b/src/Pure/Build/build_schedule.scala	Sun Mar 17 07:45:12 2024 +0100
@@ -7,8 +7,10 @@
 
 
 import Host.Node_Info
+
 import scala.annotation.tailrec
 import scala.collection.mutable
+import scala.Ordering.Implicits.seqOrdering
 
 
 object Build_Schedule {
@@ -101,7 +103,7 @@
       lazy val by_threads: Map[Int, Facet] = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap
       lazy val by_hostname: Map[String, Facet] = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap
 
-      def mean_time: Time = Timing_Data.mean_time(results.map(_.elapsed))
+      def median_time: Time = Timing_Data.median_time(results.map(_.elapsed))
 
       def best_result: Result = results.minBy(_.elapsed.ms)
     }
@@ -120,7 +122,7 @@
         facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
           case (hostname, facet) =>
             val best_threads = facet.best_result.threads
-           facet.by_threads.keys.toList.sorted.find(_ > best_threads).map(
+            facet.by_threads.keys.toList.sorted.find(_ > best_threads).map(
               inflection_point(best_threads, _))
         }
       (max_threads :: worse_threads).min
@@ -189,13 +191,13 @@
 
     private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = {
       def unify(hostname: String, facet: Timing_Data.Facet) =
-        facet.mean_time.scale(hostname_factor(hostname, on_host))
+        facet.median_time.scale(hostname_factor(hostname, on_host))
 
       for {
         facet <- facet.by_job.get(job_name).toList
         (threads, facet) <- facet.by_threads
         entries = facet.by_hostname.toList.map(unify)
-      } yield threads -> Timing_Data.median_time(entries)
+      } yield threads -> Timing_Data.mean_time(entries)
     }
 
     def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = {
@@ -203,8 +205,8 @@
         val entries =
           facet.by_threads.toList match {
             case List((i, Timing_Data.Facet(List(result)))) if i != 1 =>
-              (i, facet.mean_time) :: result.proper_cpu.map(1 -> _).toList
-            case entries => entries.map((threads, facet) => threads -> facet.mean_time)
+              (i, facet.median_time) :: result.proper_cpu.map(1 -> _).toList
+            case entries => entries.map((threads, facet) => threads -> facet.median_time)
           }
         if (entries.size < 2) None else Some(approximate_threads(entries, threads))
       }
@@ -212,7 +214,7 @@
       for {
         facet <- facet.by_job.get(job_name)
         facet <- facet.by_hostname.get(hostname)
-        time <- facet.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(facet))
+        time <- facet.by_threads.get(threads).map(_.median_time).orElse(try_approximate(facet))
       } yield time
     }
 
@@ -245,10 +247,10 @@
     }
 
     private var cache: Map[(String, String, Int), Time] = Map.empty
-    
-    
+
+
     /* approximation factors -- penalize estimations with less information */
-    
+
     val FACTOR_NO_THREADS_GLOBAL_CURVE = 2.5
     val FACTOR_NO_THREADS_UNIFY_MACHINES = 1.7
     val FACTOR_NO_THREADS_OTHER_MACHINE = 1.5
@@ -274,7 +276,7 @@
               else {
                 // no other job to estimate from, use global curve to approximate any other job
                 val (threads1, facet1) = facet.by_threads.head
-                facet1.mean_time.scale(global_threads_factor(threads1, threads))
+                facet1.median_time.scale(global_threads_factor(threads1, threads))
               }
             }
 
@@ -291,7 +293,7 @@
                       factor = hostname_factor(hostname1, hostname)
                     } yield estimate.scale(factor)
 
-                  if (approximated.nonEmpty) 
+                  if (approximated.nonEmpty)
                     Timing_Data.mean_time(approximated).scale(FACTOR_NO_THREADS_OTHER_MACHINE)
                   else {
                     // no single machine where config can be approximated, unify data points
@@ -310,10 +312,10 @@
                 }
 
               case Some(facet) => // time for job/thread exists, interpolate machine if necessary
-                facet.by_hostname.get(hostname).map(_.mean_time).getOrElse {
-                  Timing_Data.median_time(
+                facet.by_hostname.get(hostname).map(_.median_time).getOrElse {
+                  Timing_Data.mean_time(
                     facet.by_hostname.toList.map((hostname1, facet) =>
-                      facet.mean_time.scale(hostname_factor(hostname1, hostname)))).scale(
+                      facet.median_time.scale(hostname_factor(hostname1, hostname)))).scale(
                     FACTOR_THREADS_OTHER_MACHINE)
                 }
             }
@@ -489,6 +491,55 @@
     type Graph = isabelle.Graph[String, Node]
 
     def init(build_uuid: String): Schedule = Schedule(build_uuid, "none", Date.now(), Graph.empty)
+
+
+    /* file representation */
+
+    def write(value: Schedule, file: Path): Unit = {
+      import XML.Encode._
+
+      def time: T[Time] = (time => long(time.ms))
+      def date: T[Date] = (date => time(date.time))
+      def node_info: T[Node_Info] =
+        (node_info => triple(string, option(int), list(int))(
+          (node_info.hostname, node_info.numa_node, node_info.rel_cpus)))
+      def node: T[Node] =
+        (node => pair(string, pair(node_info, pair(date, time)))(
+          (node.job_name, (node.node_info, (node.start, node.duration)))))
+      def schedule: T[Schedule] =
+        (schedule =>
+          pair(string, pair(string, pair(date, pair(Graph.encode(string, node), long))))((
+            schedule.build_uuid, (schedule.generator, (schedule.start, (schedule.graph,
+            schedule.serial))))))
+
+      File.write(file, YXML.string_of_body(schedule(value)))
+    }
+
+    def read(file: Path): Schedule = {
+      import XML.Decode._
+
+      def time: T[Time] = { body => Time.ms(long(body)) }
+      def date: T[Date] = { body => Date(time(body)) }
+      def node_info: T[Node_Info] =
+        { body =>
+          val (hostname, numa_node, rel_cpus) = triple(string, option(int), list(int))(body)
+          Node_Info(hostname, numa_node, rel_cpus)
+        }
+      val node: T[Schedule.Node] =
+        { body =>
+          val (job_name, (info, (start, duration))) =
+            pair(string, pair(node_info, pair(date, time)))(body)
+          Node(job_name, info, start, duration)
+        }
+      def schedule: T[Schedule] =
+        { body =>
+          val (build_uuid, (generator, (start, (graph, serial)))) =
+            pair(string, pair(string, (pair(date, pair(Graph.decode(string, node), long)))))(body)
+          Schedule(build_uuid, generator, start, graph, serial)
+        }
+
+      schedule(YXML.parse_body(File.read(file)))
+    }
   }
 
   case class Schedule(
@@ -505,6 +556,7 @@
       else graph.maximals.map(graph.get_node).map(_.end).max(Date.Ordering)
 
     def duration: Time = end - start
+    def durations: List[Time] = graph.keys.map(graph.get_node(_).end - start)
     def message: String = "Estimated " + duration.message_hms + " build time with " + generator
 
     def deviation(other: Schedule): Time = Time.ms((end - other.end).ms.abs)
@@ -514,10 +566,7 @@
     def is_empty: Boolean = graph.is_empty
     def is_outdated(options: Options, state: Build_Process.State): Boolean =
       if (is_empty) true
-      else {
-        num_built(state) > options.int("build_schedule_outdated_limit") &&
-          elapsed() > options.seconds("build_schedule_outdated_delay")
-      }
+      else elapsed() > options.seconds("build_schedule_outdated_delay")
 
     def next(hostname: String, state: Build_Process.State): List[String] =
       for {
@@ -527,9 +576,9 @@
         if graph.imm_preds(node.job_name).subsetOf(state.results.keySet)
       } yield task.name
 
-    def exists_next(hostname: String, state: Build_Process.State): Boolean = 
+    def exists_next(hostname: String, state: Build_Process.State): Boolean =
       next(hostname, state).nonEmpty
-    
+
     def update(state: Build_Process.State): Schedule = {
       val start1 = Date.now()
 
@@ -632,7 +681,7 @@
 
     def schedule(state: Build_Process.State): Schedule = {
       def main(scheduler: Scheduler): Schedule = scheduler.schedule(state)
-      Par_List.map(main, schedulers).minBy(_.duration.ms)
+      Par_List.map(main, schedulers).minBy(_.durations.map(_.ms).sorted.reverse)
     }
   }
 
@@ -1063,7 +1112,7 @@
             fresh_build = build_context.fresh_build,
             store_heap = store_heap)._1
         case _ => false
-    }
+      }
 
     override def next_jobs(state: Build_Process.State): List[String] =
       if (progress.stopped) state.next_ready.map(_.name)
@@ -1115,7 +1164,7 @@
                 if _schedule.exists_next(host.name, _state)
               } build_send(Build_Schedule.private_data.channel_ready(host.name))
             }
-            while(!build_action()) {}
+            while (!build_action()) {}
           }
         }
         finally {
@@ -1282,7 +1331,7 @@
       val schedule1 =
         if (changed) schedule.copy(serial = old_schedule.next_serial) else schedule
       if (schedule1.serial != schedule.serial) write_schedule(db, schedule1)
-      
+
       schedule1
     }
 
@@ -1352,8 +1401,22 @@
       server: SSH.Server
     ): Build_Process =
       if (!context.master) new Scheduled_Build_Process(context, progress, server)
-      else new Scheduler_Build_Process(context, progress, server) {
-        def init_scheduler(timing_data: Timing_Data): Scheduler = scheduler(timing_data, context)
+      else {
+        val schedule_file = context.build_options.string("build_schedule")
+        if (schedule_file.isEmpty) {
+          new Scheduler_Build_Process(context, progress, server) {
+            def init_scheduler(timing_data: Timing_Data): Scheduler =
+              scheduler(timing_data, context)
+          }
+        }
+        else {
+          val finished_schedule =
+            Schedule.read(Path.explode(schedule_file)).copy(build_uuid = context.build_uuid)
+          new Scheduler_Build_Process(context, progress, server) {
+            def init_scheduler(timing_data: Timing_Data): Scheduler =
+              (build_state: Build_Process.State) => finished_schedule
+          }
+        }
       }
   }
   object Build_Engine extends Build_Engine
@@ -1375,6 +1438,8 @@
     session_setup: (String, Session) => Unit = (_, _) => (),
     cache: Term.Cache = Term.Cache.make()
   ): Schedule = {
+    Build.build_process(options, build_cluster = true, remove_builds = true)
+
     val store =
       Build_Engine.build_store(options, build_cluster = build_hosts.nonEmpty, cache = cache)
     val log_store = Build_Log.store(options, cache = cache)
@@ -1389,7 +1454,6 @@
       val full_sessions =
         Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs,
           select_dirs = select_dirs, infos = infos, augment_options = augment_options)
-      val full_sessions_selection = full_sessions.imports_selection(selection)
 
       val build_deps =
         Sessions.deps(full_sessions.selection(selection), progress = progress,
@@ -1624,7 +1688,7 @@
     -H HOSTS     additional cluster host specifications of the form
                  NAMES:PARAMETERS (separated by commas)
     -N           cyclic shuffling of NUMA CPU nodes (performance tuning)
-    -O FILE      output file
+    -O FILE      output file (pdf or png for image, else yxml)
     -R           refer to requirements of selected sessions
     -X NAME      exclude sessions from group NAME and all descendants
     -a           select all sessions
@@ -1634,7 +1698,7 @@
     -v           verbose
     -x NAME      exclude session NAME and all descendants
 
-  Generate build graph for scheduling.
+  Generate build schedule, but do not run actual build.
 """,
         "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))),
         "B:" -> (arg => base_sessions += arg),
@@ -1672,7 +1736,12 @@
           numa_shuffling = isabelle.Host.numa_check(progress, numa_shuffling),
           build_hosts = build_hosts.toList)
 
-      if (!schedule.is_empty && output_file.nonEmpty)
-        write_schedule_graphic(schedule, output_file.get)
+      output_file match {
+        case Some(output_file) if !schedule.is_empty =>
+          if (File.is_pdf(output_file.file_name) || File.is_png(output_file.file_name))
+            write_schedule_graphic(schedule, output_file)
+          else Schedule.write(schedule, output_file)
+        case _ =>
+      }
     })
 }