merged
authorwenzelm
Fri, 08 Dec 2023 20:56:21 +0100
changeset 79218 8857975b99a9
parent 79194 a0e8efbcf18d (diff)
parent 79217 5073bbdfa2b8 (current diff)
child 79219 8b371e684acf
merged
--- a/src/Pure/System/isabelle_tool.scala	Fri Dec 08 20:47:03 2023 +0100
+++ b/src/Pure/System/isabelle_tool.scala	Fri Dec 08 20:56:21 2023 +0100
@@ -125,6 +125,7 @@
   Build.isabelle_tool2,
   Build.isabelle_tool3,
   Build.isabelle_tool4,
+  Build_Schedule.isabelle_tool,
   CI_Build.isabelle_tool,
   Doc.isabelle_tool,
   Docker_Build.isabelle_tool,
--- a/src/Pure/Tools/build_schedule.scala	Fri Dec 08 20:47:03 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala	Fri Dec 08 20:56:21 2023 +0100
@@ -8,6 +8,7 @@
 
 import Host.Node_Info
 import scala.annotation.tailrec
+import scala.collection.mutable
 
 
 object Build_Schedule {
@@ -17,6 +18,7 @@
     def elapsed: Time = timing.elapsed
     def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms)
   }
+
   case class Timing_Entries(entries: List[Timing_Entry]) {
     require(entries.nonEmpty)
 
@@ -31,6 +33,66 @@
     def best_entry: Timing_Entry = entries.minBy(_.elapsed.ms)
   }
 
+  object Timing_Data {
+    def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2)
+
+    def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).apply(obs.length / 2)
+
+    def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size)
+
+    private def dummy_entries(host: Host, host_factor: Double) = {
+      val baseline = Time.minutes(5).scale(host_factor)
+      val gc = Time.seconds(10).scale(host_factor)
+      List(
+        Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)),
+        Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
+    }
+
+    def make(
+      host_infos: Host_Infos,
+      build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)],
+    ): Timing_Data = {
+      val hosts = host_infos.hosts
+      val measurements =
+        for {
+          (meta_info, build_info) <- build_history
+          build_host = meta_info.get(Build_Log.Prop.build_host)
+          (job_name, session_info) <- build_info.sessions.toList
+          if build_info.finished_sessions.contains(job_name)
+          hostname <- session_info.hostname.orElse(build_host).toList
+          host <- hosts.find(_.info.hostname == hostname).toList
+          threads = session_info.threads.getOrElse(host.info.num_cpus)
+        } yield (job_name, hostname, threads) -> session_info.timing
+
+      val entries =
+        if (measurements.isEmpty) {
+          val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last
+          host_infos.hosts.flatMap(host =>
+            dummy_entries(host, host_infos.host_factor(default_host, host)))
+        }
+        else
+          measurements.groupMap(_._1)(_._2).toList.map {
+            case ((job_name, hostname, threads), timings) =>
+              Timing_Entry(job_name, hostname, threads, median_timing(timings))
+          }
+
+      new Timing_Data(Timing_Entries(entries), host_infos)
+    }
+
+    def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = {
+      val build_history =
+        for {
+          log_name <- log_database.execute_query_statement(
+            Build_Log.private_data.meta_info_table.select(List(Build_Log.Column.log_name)),
+            List.from[String], res => res.string(Build_Log.Column.log_name))
+          meta_info <- Build_Log.private_data.read_meta_info(log_database, log_name)
+          build_info = Build_Log.private_data.read_build_info(log_database, log_name)
+        } yield (meta_info, build_info)
+
+      make(host_infos, build_history)
+    }
+  }
+
   class Timing_Data private(data: Timing_Entries, val host_infos: Host_Infos) {
     private def inflection_point(last_mono: Int, next: Int): Int =
       last_mono + ((next - last_mono) / 2)
@@ -62,7 +124,7 @@
       def linear(p0: (Int, Time), p1: (Int, Time)): Time = {
         val a = (p1._2 - p0._2).scale(1.0 / (p1._1 - p0._1))
         val b = p0._2 - a.scale(p0._1)
-        Time.ms((a.scale(threads) + b).ms max 0)
+        (a.scale(threads) + b) max Time.zero
       }
 
       val mono_prefix = sorted_prefix(entries, e => -e._2.ms)
@@ -82,7 +144,7 @@
         val t_c =
           Timing_Data.median_time(for ((n, t) <- mono_prefix) yield t - t_p.scale(1.0 / n))
 
-        def model(threads: Int): Time = Time.ms((t_c + t_p.scale(1.0 / threads)).ms max 0)
+        def model(threads: Int): Time = (t_c + t_p.scale(1.0 / threads)) max Time.zero
 
         if (is_mono || in_prefix) model(threads)
         else {
@@ -164,109 +226,62 @@
       }
     }
 
-    def estimate(job_name: String, hostname: String, threads: Int): Time =
-      data.by_job.get(job_name) match {
-        case None =>
-          // no data for job, take average of other jobs for given threads
-          val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
-          if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
-          else {
-            // no other job to estimate from, use global curve to approximate any other job
-            val (threads1, data1) = data.by_threads.head
-            data1.mean_time.scale(global_threads_factor(threads1, threads))
-          }
+    private var cache: Map[(String, String, Int), Time] = Map.empty
+    def estimate(job_name: String, hostname: String, threads: Int): Time = {
+      def estimate: Time =
+        data.by_job.get(job_name) match {
+          case None =>
+            // no data for job, take average of other jobs for given threads
+            val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
+            if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
+            else {
+              // no other job to estimate from, use global curve to approximate any other job
+              val (threads1, data1) = data.by_threads.head
+              data1.mean_time.scale(global_threads_factor(threads1, threads))
+            }
 
-        case Some(data) =>
-          data.by_threads.get(threads) match {
-            case None => // interpolate threads
-              estimate_threads(job_name, hostname, threads).getOrElse {
-                // per machine, try to approximate config for threads
-                val approximated =
-                  for {
-                    hostname1 <- data.by_hostname.keys
-                    estimate <- estimate_threads(job_name, hostname1, threads)
-                    factor = hostname_factor(hostname1, hostname)
-                  } yield estimate.scale(factor)
+          case Some(data) =>
+            data.by_threads.get(threads) match {
+              case None => // interpolate threads
+                estimate_threads(job_name, hostname, threads).getOrElse {
+                  // per machine, try to approximate config for threads
+                  val approximated =
+                    for {
+                      hostname1 <- data.by_hostname.keys
+                      estimate <- estimate_threads(job_name, hostname1, threads)
+                      factor = hostname_factor(hostname1, hostname)
+                    } yield estimate.scale(factor)
 
-                if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
-                else {
-                  // no single machine where config can be approximated, unify data points
-                  val unified_entries = unify_hosts(job_name, hostname)
+                  if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
+                  else {
+                    // no single machine where config can be approximated, unify data points
+                    val unified_entries = unify_hosts(job_name, hostname)
 
-                  if (unified_entries.length > 1) approximate_threads(unified_entries, threads)
-                  else {
-                    // only single data point, use global curve to approximate
-                    val (job_threads, job_time) = unified_entries.head
-                    job_time.scale(global_threads_factor(job_threads, threads))
+                    if (unified_entries.length > 1) approximate_threads(unified_entries, threads)
+                    else {
+                      // only single data point, use global curve to approximate
+                      val (job_threads, job_time) = unified_entries.head
+                      job_time.scale(global_threads_factor(job_threads, threads))
+                    }
                   }
                 }
-              }
-
-            case Some(data) => // time for job/thread exists, interpolate machine
-              data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
-                Timing_Data.median_time(
-                  data.by_hostname.toList.map((hostname1, data) =>
-                    data.mean_time.scale(hostname_factor(hostname1, hostname))))
-              }
-          }
-      }
-  }
-
-  object Timing_Data {
-    def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2)
-    def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).apply(obs.length / 2)
-    def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size)
-
-    private def dummy_entries(host: Host, host_factor: Double) = {
-      val baseline = Time.minutes(5).scale(host_factor)
-      val gc = Time.seconds(10).scale(host_factor)
-      List(
-        Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)),
-        Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
-    }
 
-    def make(
-      host_infos: Host_Infos,
-      build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)],
-    ): Timing_Data = {
-      val hosts = host_infos.hosts
-      val measurements =
-        for {
-          (meta_info, build_info) <- build_history
-          build_host = meta_info.get(Build_Log.Prop.build_host)
-          (job_name, session_info) <- build_info.sessions.toList
-          if build_info.finished_sessions.contains(job_name)
-          hostname <- session_info.hostname.orElse(build_host).toList
-          host <- hosts.find(_.info.hostname == hostname).toList
-          threads = session_info.threads.getOrElse(host.info.num_cpus)
-        } yield (job_name, hostname, threads) -> session_info.timing
+              case Some(data) => // time for job/thread exists, interpolate machine
+                data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
+                  Timing_Data.median_time(
+                    data.by_hostname.toList.map((hostname1, data) =>
+                      data.mean_time.scale(hostname_factor(hostname1, hostname))))
+                }
+            }
+        }
 
-      val entries =
-        if (measurements.isEmpty) {
-          val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last
-          host_infos.hosts.flatMap(host =>
-            dummy_entries(host, host_infos.host_factor(default_host, host)))
-        }
-        else
-          measurements.groupMap(_._1)(_._2).toList.map {
-            case ((job_name, hostname, threads), timings) =>
-              Timing_Entry(job_name, hostname, threads, median_timing(timings))
-          }
-
-      new Timing_Data(Timing_Entries(entries), host_infos)
-    }
-
-    def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = {
-      val build_history =
-        for {
-          log_name <- log_database.execute_query_statement(
-            Build_Log.private_data.meta_info_table.select(List(Build_Log.Column.log_name)),
-            List.from[String], res => res.string(Build_Log.Column.log_name))
-          meta_info <- Build_Log.private_data.read_meta_info(log_database, log_name)
-          build_info = Build_Log.private_data.read_build_info(log_database, log_name)
-        } yield (meta_info, build_info)
-
-      make(host_infos, build_history)
+      cache.get(job_name, hostname, threads) match {
+        case Some(time) => time
+        case None =>
+          val time = estimate
+          cache = cache + ((job_name, hostname, threads) -> time)
+          time
+      }
     }
   }
 
@@ -324,10 +339,6 @@
 
   /* offline tracking of job configurations and resource allocations */
 
-  object Config {
-    def from_job(job: Build_Process.Job): Config = Config(job.name, job.node_info)
-  }
-
   case class Config(job_name: String, node_info: Node_Info) {
     def job_of(start_time: Time): Build_Process.Job =
       Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
@@ -420,15 +431,66 @@
     }
 
     type Graph = isabelle.Graph[String, Node]
+
+    def init(build_uuid: String): Schedule = Schedule(build_uuid, "none", Date.now(), Graph.empty)
   }
 
-  case class Schedule(generator: String, start: Date, graph: Schedule.Graph) {
+  case class Schedule(
+    build_uuid: String,
+    generator: String,
+    start: Date,
+    graph: Schedule.Graph,
+    serial: Long = 0,
+  ) {
+    require(serial >= 0, "serial underflow")
+    def inc_serial: Schedule = {
+      require(serial < Long.MaxValue, "serial overflow")
+      copy(serial = serial + 1)
+    }
+    
     def end: Date =
       if (graph.is_empty) start
       else graph.maximals.map(graph.get_node).map(_.end).maxBy(_.unix_epoch)
 
     def duration: Time = end.time - start.time
     def message: String = "Estimated " + duration.message_hms + " build time with " + generator
+
+    def deviation(other: Schedule): Time = Time.ms((end.time - other.end.time).ms.abs)
+
+    def num_built(state: Build_Process.State): Int = graph.keys.count(state.results.contains)
+    def elapsed(): Time = Time.now() - start.time
+    def is_outdated(state: Build_Process.State, time_limit: Time, built_limit: Int): Boolean =
+      graph.is_empty || (elapsed() > time_limit && num_built(state) > built_limit)
+
+    def next(hostname: String, state: Build_Process.State): List[String] =
+      for {
+        task <- state.next_ready
+        node = graph.get_node(task.name)
+        if hostname == node.node_info.hostname
+        if graph.imm_preds(node.job_name).subsetOf(state.results.keySet)
+      } yield task.name
+
+    def update(state: Build_Process.State): Schedule = {
+      val start1 = Date.now()
+      val pending = state.pending.map(_.name).toSet
+
+      def shift_elapsed(graph: Schedule.Graph, name: String): Schedule.Graph =
+        graph.map_node(name, { node =>
+          val elapsed = start1.time - state.running(name).start_date.time
+          node.copy(duration = node.duration - elapsed)
+        })
+
+      def shift_starts(graph: Schedule.Graph, name: String): Schedule.Graph =
+        graph.map_node(name, { node =>
+          val starts = start1 :: graph.imm_preds(node.job_name).toList.map(graph.get_node(_).end)
+          node.copy(start = starts.max(Date.Ordering))
+        })
+
+      val graph0 = state.running.keys.foldLeft(graph.restrict(pending.contains))(shift_elapsed)
+      val graph1 = graph0.topological_order.foldLeft(graph0)(shift_starts)
+
+      copy(start = start1, graph = graph1)
+    }
   }
 
   case class State(build_state: Build_Process.State, current_time: Time, finished: Schedule) {
@@ -453,10 +515,16 @@
         val now = current_time + elapsed
         val node = Schedule.Node(job.name, job.node_info, job.start_date, now - job.start_date.time)
 
-        val preds =
+        val host_preds =
+          for {
+            (name, (node, _)) <- finished.graph.iterator.toSet
+            if node.node_info.hostname == job.node_info.hostname
+          } yield name
+        val build_preds =
           build_state.sessions.graph.imm_preds(job.name).filter(finished.graph.defined)
-        val graph =
-          preds.foldLeft(finished.graph.new_node(job.name, node))(_.add_edge(_, job.name))
+        val preds = build_preds ++ host_preds
+
+        val graph = preds.foldLeft(finished.graph.new_node(job.name, node))(_.add_edge(_, job.name))
 
         val build_state1 = build_state.remove_running(job.name).remove_pending(job.name)
         State(build_state1, now, finished.copy(graph = graph))
@@ -466,15 +534,15 @@
     def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
   }
 
-  trait Scheduler {
-    def next(state: Build_Process.State): List[Config]
-    def build_schedule(build_state: Build_Process.State): Schedule
-  }
+  trait Scheduler { def build_schedule(build_state: Build_Process.State): Schedule }
 
-  abstract class Heuristic(timing_data: Timing_Data) extends Scheduler {
+  abstract class Heuristic(timing_data: Timing_Data, build_uuid: String)
+    extends Scheduler {
     val host_infos = timing_data.host_infos
     val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
 
+    def next(state: Build_Process.State): List[Config]
+
     def build_schedule(build_state: Build_Process.State): Schedule = {
       @tailrec
       def simulate(state: State): State =
@@ -486,21 +554,21 @@
 
       val start = Date.now()
       val end_state =
-        simulate(State(build_state, start.time, Schedule(toString, start, Graph.empty)))
+        simulate(State(build_state, start.time, Schedule(build_uuid, toString, start, Graph.empty)))
 
       end_state.finished
     }
   }
 
-  class Default_Heuristic(timing_data: Timing_Data, options: Options)
-    extends Heuristic(timing_data) {
+  class Default_Heuristic(timing_data: Timing_Data, options: Options, build_uuid: String)
+    extends Heuristic(timing_data, build_uuid) {
     override def toString: String = "default build heuristic"
 
     def host_threads(host: Host): Int = {
       val m = (options ++ host.build.options).int("threads")
       if (m > 0) m else (host.num_cpus max 1) min 8
     }
-    
+
     def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] =
       sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _))
 
@@ -535,8 +603,9 @@
   abstract class Path_Heuristic(
     timing_data: Timing_Data,
     sessions_structure: Sessions.Structure,
-    max_threads_limit: Int
-  ) extends Heuristic(timing_data) {
+    max_threads_limit: Int,
+    build_uuid: String
+  ) extends Heuristic(timing_data, build_uuid) {
     /* pre-computed properties for efficient heuristic */
 
     val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
@@ -570,7 +639,7 @@
     def path_max_times(minimals: List[Node]): Map[Node, Time] =
       path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
 
-    def parallel_paths(minimals: List[Node], pred: Node => Boolean = _ => true): Int = {
+    def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
       def start(node: Node): (Node, Time) = node -> best_times(node)
 
       def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
@@ -594,40 +663,59 @@
           (res max running.size, running2)
         }
 
-      parallel_paths(minimals.map(start).toMap)._1
+      parallel_paths(running.toMap)._1
     }
   }
 
 
   object Path_Time_Heuristic {
-    sealed trait Criterion
-    case class Absolute_Time(time: Time) extends Criterion {
+    sealed trait Critical_Criterion
+    case class Absolute_Time(time: Time) extends Critical_Criterion {
       override def toString: String = "absolute time (" + time.message_hms + ")"
     }
-    case class Relative_Time(factor: Double) extends Criterion {
+    case class Relative_Time(factor: Double) extends Critical_Criterion {
       override def toString: String = "relative time (" + factor + ")"
     }
 
-    sealed trait Strategy
-    case class Fixed_Thread(threads: Int) extends Strategy {
+    sealed trait Parallel_Strategy
+    case class Fixed_Thread(threads: Int) extends Parallel_Strategy {
       override def toString: String = "fixed threads (" + threads + ")"
     }
-    case class Time_Based_Threads(f: Time => Int) extends Strategy {
+    case class Time_Based_Threads(f: Time => Int) extends Parallel_Strategy {
       override def toString: String = "time based threads"
     }
+
+    sealed trait Host_Criterion
+    case object Critical_Nodes extends Host_Criterion {
+      override def toString: String = "per critical node"
+    }
+    case class Fixed_Fraction(fraction: Double) extends Host_Criterion {
+      override def toString: String = "fixed fraction (" + fraction + ")"
+    }
+    case class Host_Speed(min_factor: Double) extends Host_Criterion {
+      override def toString: String = "host speed (" + min_factor + ")"
+    }
   }
 
   class Path_Time_Heuristic(
-    is_critical: Path_Time_Heuristic.Criterion,
-    parallel_threads: Path_Time_Heuristic.Strategy,
+    is_critical: Path_Time_Heuristic.Critical_Criterion,
+    parallel_threads: Path_Time_Heuristic.Parallel_Strategy,
+    host_criterion: Path_Time_Heuristic.Host_Criterion,
     timing_data: Timing_Data,
     sessions_structure: Sessions.Structure,
+    build_uuid: String,
     max_threads_limit: Int = 8
-  ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit) {
+  ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit, build_uuid) {
     import Path_Time_Heuristic.*
 
-    override def toString: Node =
-      "path time heuristic (critical: " + is_critical + ", parallel: " + parallel_threads + ")"
+    override def toString: Node = {
+      val params =
+        List(
+          "critical: " + is_critical,
+          "parallel: " + parallel_threads,
+          "fast hosts: " + host_criterion)
+      "path time heuristic (" + params.mkString(", ") + ")"
+    }
 
     def next(state: Build_Process.State): List[Config] = {
       val resources = host_infos.available(state)
@@ -637,13 +725,25 @@
 
       val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
 
-      val resources0 = host_infos.available(state.copy(running = Map.empty))
-      val max_parallel = parallel_paths(state.ready.map(_.name))
-      val fully_parallelizable = max_parallel <= resources0.unused_nodes(max_threads).length
+      val available_nodes =
+        host_infos.available(state.copy(running = Map.empty))
+          .unused_nodes(max_threads)
+          .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse
 
+      def remaining_time(node: Node): (Node, Time) =
+        state.running.get(node) match {
+          case None => node -> best_time(node)
+          case Some(job) =>
+            val estimate =
+              timing_data.estimate(job.name, job.node_info.hostname,
+                host_infos.num_threads(job.node_info))
+            node -> ((Time.now() - job.start_date.time + estimate) max Time.zero)
+        }
+
+      val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time))
       val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse
 
-      if (fully_parallelizable) {
+      if (max_parallel <= available_nodes.length) {
         val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task)))
         resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
       }
@@ -670,8 +770,24 @@
 
         val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))
 
-        val max_critical_parallel = parallel_paths(critical_minimals, critical_nodes.contains)
-        val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(max_critical_parallel)
+        val max_critical_parallel =
+          parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains)
+        val max_critical_hosts =
+          available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length
+
+        val split =
+          this.host_criterion match {
+            case Critical_Nodes => max_critical_hosts
+            case Fixed_Fraction(fraction) =>
+              ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts
+            case Host_Speed(min_factor) =>
+              val best = rev_ordered_hosts.head._1.info.benchmark_score.get
+              val num_fast =
+                rev_ordered_hosts.count(_._1.info.benchmark_score.exists(_ >= best * min_factor))
+              num_fast min max_critical_hosts
+          }
+
+        val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split)
 
         val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks)
         val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks)
@@ -704,12 +820,49 @@
       }
       catch { case exn: Throwable => close(); throw exn }
 
+    private val _build_database: Option[SQL.Database] =
+      try {
+        for (db <- store.maybe_open_build_database(server = server)) yield {
+          if (build_context.master) {
+            Build_Schedule.private_data.transaction_lock(
+              db,
+              create = true,
+              label = "Build_Schedule.build_database"
+            ) { Build_Schedule.private_data.clean_build_schedules(db) }
+            db.vacuum(Build_Schedule.private_data.tables.list)
+          }
+          db
+        }
+      }
+      catch { case exn: Throwable => close(); throw exn }
+
     override def close(): Unit = {
       super.close()
       Option(_log_database).foreach(_.close())
+      Option(_build_database).flatten.foreach(_.close())
     }
 
 
+    /* global state: internal var vs. external database */
+
+    private var _schedule = Schedule.init(build_uuid)
+
+    override protected def synchronized_database[A](label: String)(body: => A): A =
+      super.synchronized_database(label) {
+        _build_database match {
+          case None => body
+          case Some(db) =>
+            Build_Schedule.private_data.transaction_lock(db, label = label) {
+              val old_schedule = Build_Schedule.private_data.pull_schedule(db, _schedule)
+              _schedule = old_schedule
+              val res = body
+              _schedule = Build_Schedule.private_data.update_schedule(db, _schedule, old_schedule)
+              res
+            }
+        }
+      }
+
+
     /* previous results via build log */
 
     override def open_build_cluster(): Build_Cluster = {
@@ -790,21 +943,8 @@
 
     /* build process */
 
-    case class Cache(state: Build_Process.State, configs: List[Config], estimate: Date) {
-      def is_current(state: Build_Process.State): Boolean =
-        this.state.pending.nonEmpty && this.state.results == state.results
-      def is_current_estimate(estimate: Date): Boolean =
-        Math.abs((this.estimate.time - estimate.time).ms) < Time.minutes(1).ms
-    }
-
-    private var cache = Cache(Build_Process.State(), Nil, Date.now())
-
-    override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = {
-      val configs =
-        if (cache.is_current(state)) cache.configs
-        else scheduler.next(state)
-      configs.find(_.job_name == session_name).get.node_info
-    }
+    override def next_node_info(state: Build_Process.State, session_name: String): Node_Info =
+      _schedule.graph.get_node(session_name).node_info
 
     def is_current(state: Build_Process.State, session_name: String): Boolean =
       state.ancestor_results(session_name) match {
@@ -829,30 +969,31 @@
         case _ => false
     }
 
-    override def next_jobs(state: Build_Process.State): List[String] = {
-      val finalize_limit = if (build_context.master) Int.MaxValue else 0
-
-      if (progress.stopped) state.next_ready.map(_.name).take(finalize_limit)
-      else if (cache.is_current(state)) cache.configs.map(_.job_name).filterNot(state.is_running)
+    override def next_jobs(state: Build_Process.State): List[String] =
+      if (!progress.stopped && !_schedule.is_outdated(state, Time.minutes(3), 10))
+        _schedule.next(hostname, state)
+      else if (!build_context.master) Nil
+      else if (progress.stopped) state.next_ready.map(_.name)
       else {
         val current = state.next_ready.filter(task => is_current(state, task.name))
-        if (current.nonEmpty) current.map(_.name).take(finalize_limit)
+        if (current.nonEmpty) current.map(_.name)
         else {
           val start = Time.now()
-          val next = scheduler.next(state)
-          val schedule = scheduler.build_schedule(state)
+
+          val new_schedule = scheduler.build_schedule(state).update(state)
+          val schedule =
+            if (_schedule.graph.is_empty) new_schedule
+            else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering)
+
           val elapsed = Time.now() - start
 
           val timing_msg = if (elapsed.is_relevant) " (took " + elapsed.message + ")" else ""
-          progress.echo_if(build_context.master && !cache.is_current_estimate(schedule.end),
-            schedule.message + timing_msg)
+          progress.echo_if(_schedule.deviation(schedule).minutes > 1, schedule.message + timing_msg)
 
-          val configs = next.filter(_.node_info.hostname == hostname)
-          cache = Cache(state, configs, schedule.end)
-          configs.map(_.job_name)
+          _schedule = schedule
+          _schedule.next(hostname, state)
         }
       }
-    }
 
     override def run(): Build.Results = {
       val results = super.run()
@@ -861,6 +1002,167 @@
     }
   }
 
+
+  /** SQL data model of build schedule, extending isabelle_build database */
+
+  object private_data extends SQL.Data("isabelle_build") {
+    import Build_Process.private_data.{Base, Generic}
+
+
+    /* schedule */
+
+    object Schedules {
+      val build_uuid = Generic.build_uuid.make_primary_key
+      val generator = SQL.Column.string("generator")
+      val start = SQL.Column.date("start")
+      val serial = SQL.Column.long("serial")
+
+      val table = make_table(List(build_uuid, generator, start, serial), name = "schedules")
+    }
+
+    def read_serial(db: SQL.Database, build_uuid: String = ""): Long =
+      db.execute_query_statementO[Long](
+        Schedules.table.select(List(Schedules.serial.max), sql = 
+          SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))),
+          _.long(Schedules.serial)).getOrElse(0L)
+
+    def read_scheduled_builds_domain(db: SQL.Database): List[String] =
+      db.execute_query_statement(
+        Schedules.table.select(List(Schedules.build_uuid)),
+        List.from[String], res => res.string(Schedules.build_uuid))
+
+    def read_schedules(db: SQL.Database, build_uuid: String = ""): List[Schedule] = {
+      val schedules =
+        db.execute_query_statement(Schedules.table.select(sql =
+          SQL.where(if_proper(build_uuid, Schedules.build_uuid.equal(build_uuid)))),
+          List.from[Schedule],
+          { res =>
+            val build_uuid = res.string(Schedules.build_uuid)
+            val generator = res.string(Schedules.generator)
+            val start = res.date(Schedules.start)
+            Schedule(build_uuid, generator, start, Graph.empty)
+          })
+
+      for (schedule <- schedules.sortBy(_.start)(Date.Ordering)) yield {
+        val nodes = private_data.read_nodes(db, build_uuid = schedule.build_uuid)
+        schedule.copy(graph = Graph.make(nodes))
+      }
+    }
+
+    def write_schedule(db: SQL.Database, schedule: Schedule): Unit = {
+      db.execute_statement(
+        Schedules.table.delete(Schedules.build_uuid.where_equal(schedule.build_uuid)))
+      db.execute_statement(Schedules.table.insert(), { stmt =>
+        stmt.string(1) = schedule.build_uuid
+        stmt.string(2) = schedule.generator
+        stmt.date(3) = schedule.start
+        stmt.long(4) = schedule.serial
+      })
+      update_nodes(db, schedule.build_uuid, schedule.graph.dest)
+    }
+
+
+    /* nodes */
+
+    object Nodes {
+      val build_uuid = Generic.build_uuid.make_primary_key
+      val name = Generic.name.make_primary_key
+      val succs = SQL.Column.string("succs")
+      val hostname = SQL.Column.string("hostname")
+      val numa_node = SQL.Column.int("numa_node")
+      val rel_cpus = SQL.Column.string("rel_cpus")
+      val start = SQL.Column.date("start")
+      val duration = SQL.Column.long("duration")
+
+      val table =
+        make_table(
+          List(build_uuid, name, succs, hostname, numa_node, rel_cpus, start, duration),
+          name = "schedule_nodes")
+    }
+
+    type Nodes = List[((String, Schedule.Node), List[String])]
+
+    def read_nodes(db: SQL.Database, build_uuid: String = ""): Nodes = {
+      db.execute_query_statement(
+        Nodes.table.select(sql =
+          SQL.where(if_proper(build_uuid, Nodes.build_uuid.equal(build_uuid)))),
+        List.from[((String, Schedule.Node), List[String])],
+        { res =>
+          val name = res.string(Nodes.name)
+          val succs = split_lines(res.string(Nodes.succs))
+          val hostname = res.string(Nodes.hostname)
+          val numa_node = res.get_int(Nodes.numa_node)
+          val rel_cpus = res.string(Nodes.rel_cpus)
+          val start = res.date(Nodes.start)
+          val duration = Time.ms(res.long(Nodes.duration))
+
+          val node_info = Node_Info(hostname, numa_node, isabelle.Host.Range.from(rel_cpus))
+          ((name, Schedule.Node(name, node_info, start, duration)), succs)
+        }
+      )
+    }
+
+    def update_nodes(db: SQL.Database, build_uuid: String, nodes: Nodes): Unit = {
+      db.execute_statement(Nodes.table.delete(Nodes.build_uuid.where_equal(build_uuid)))
+      db.execute_batch_statement(Nodes.table.insert(), batch =
+        for (((name, node), succs) <- nodes) yield { (stmt: SQL.Statement) =>
+          stmt.string(1) = build_uuid
+          stmt.string(2) = name
+          stmt.string(3) = cat_lines(succs)
+          stmt.string(4) = node.node_info.hostname
+          stmt.int(5) = node.node_info.numa_node
+          stmt.string(6) = isabelle.Host.Range(node.node_info.rel_cpus)
+          stmt.date(7) = node.start
+          stmt.long(8) = node.duration.ms
+        })
+    }
+
+    def pull_schedule(db: SQL.Database, old_schedule: Schedule): Build_Schedule.Schedule = {
+      val serial_db = read_serial(db)
+      if (serial_db == old_schedule.serial) old_schedule
+      else {
+        read_schedules(db, old_schedule.build_uuid) match {
+          case Nil => old_schedule
+          case schedules => Library.the_single(schedules)
+        }
+      }
+    }
+
+    def update_schedule(db: SQL.Database, schedule: Schedule, old_schedule: Schedule): Schedule = {
+      val changed =
+        schedule.generator != old_schedule.generator ||
+        schedule.start != old_schedule.start ||
+        schedule.graph != old_schedule.graph
+      
+      val schedule1 =
+        if (changed) schedule.copy(serial = old_schedule.serial).inc_serial else schedule
+      if (schedule1.serial != schedule.serial) write_schedule(db, schedule1)
+      
+      schedule1
+    }
+
+    def remove_schedules(db: SQL.Database, remove: List[String]): Unit =
+      if (remove.nonEmpty) {
+        val sql = Generic.build_uuid.where_member(remove)
+        db.execute_statement(SQL.MULTI(tables.map(_.delete(sql = sql))))
+      }
+
+    def clean_build_schedules(db: SQL.Database): Unit = {
+      val running_builds_domain =
+        db.execute_query_statement(
+          Base.table.select(List(Base.build_uuid), sql = SQL.where(Base.stop.undefined)),
+          List.from[String], res => res.string(Base.build_uuid))
+
+      val (remove, _) =
+        Library.symmetric_difference(read_scheduled_builds_domain(db), running_builds_domain)
+
+      remove_schedules(db, remove)
+    }
+
+    override val tables = SQL.Tables(Schedules.table, Nodes.table)
+  }
+
+
   class Engine extends Build.Engine("build_schedule") {
 
     def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = {
@@ -880,15 +1182,23 @@
             case time if time < Time.minutes(5) => 4
             case _ => 8
           }))
+      val machine_splits =
+        List(
+          Path_Time_Heuristic.Critical_Nodes,
+          Path_Time_Heuristic.Fixed_Fraction(0.3),
+          Path_Time_Heuristic.Host_Speed(0.9))
 
       val path_time_heuristics =
         for {
           is_critical <- is_criticals
           parallel <- parallel_threads
-        } yield Path_Time_Heuristic(is_critical, parallel, timing_data, sessions_structure)
-      val heuristics = Default_Heuristic(timing_data, context.build_options) :: path_time_heuristics
-
-      new Meta_Heuristic(heuristics)
+          machine_split <- machine_splits
+        } yield
+          Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure,
+            context.build_uuid)
+      val default_heuristic =
+        Default_Heuristic(timing_data, context.build_options, context.build_uuid)
+      new Meta_Heuristic(default_heuristic :: path_time_heuristics)
     }
 
     override def open_build_process(
@@ -985,4 +1295,239 @@
       }
     }
   }
+
+  def write_schedule_graphic(schedule: Schedule, output: Path): Unit = {
+    import java.awt.geom.{GeneralPath, Rectangle2D}
+    import java.awt.{BasicStroke, Color, Graphics2D}
+
+    val line_height = isabelle.graphview.Metrics.default.height
+    val char_width = isabelle.graphview.Metrics.default.char_width
+    val padding = isabelle.graphview.Metrics.default.space_width
+    val gap = isabelle.graphview.Metrics.default.gap
+
+    val graph = schedule.graph
+
+    def text_width(text: String): Double = text.length * char_width
+
+    val generator_height = line_height + padding
+    val hostname_height = generator_height + line_height + padding
+    def time_height(time: Time): Double = time.seconds
+    def date_height(date: Date): Double = time_height(date.time - schedule.start.time)
+
+    val hosts = graph.iterator.map(_._2._1).toList.groupBy(_.node_info.hostname)
+
+    def node_width(node: Schedule.Node): Double = 2 * padding + text_width(node.job_name)
+
+    case class Range(start: Double, stop: Double) {
+      def proper: List[Range] = if (start < stop) List(this) else Nil
+      def width: Double = stop - start
+    }
+
+    val rel_node_ranges =
+      hosts.toList.flatMap { (hostname, nodes) =>
+        val sorted = nodes.sortBy(node => (node.start.time.ms, node.end.time.ms, node.job_name))
+        sorted.foldLeft((List.empty[Schedule.Node], Map.empty[Schedule.Node, Range])) {
+          case ((nodes, allocated), node) =>
+            val width = node_width(node) + padding
+            val parallel = nodes.filter(_.end.time > node.start.time)
+            val (last, slots) =
+              parallel.sortBy(allocated(_).start).foldLeft((0D, List.empty[Range])) {
+                case ((start, ranges), node1) =>
+                  val node_range = allocated(node1)
+                  (node_range.stop, ranges ::: Range(start, node_range.start).proper)
+              }
+            val start =
+              (Range(last, Double.MaxValue) :: slots.filter(_.width >= width)).minBy(_.width).start
+            (node :: parallel, allocated + (node -> Range(start, start + width)))
+        }._2
+      }.toMap
+
+    def host_width(hostname: String) =
+      2 * padding + (hosts(hostname).map(rel_node_ranges(_).stop).max max text_width(hostname))
+
+    def graph_height(graph: Graph[String, Schedule.Node]): Double =
+      date_height(graph.maximals.map(graph.get_node(_).end).maxBy(_.unix_epoch))
+
+    val height = (hostname_height + 2 * padding + graph_height(graph)).ceil.toInt
+    val (last, host_starts) =
+      hosts.keys.foldLeft((0D, Map.empty[String, Double])) {
+        case ((previous, starts), hostname) =>
+          (previous + gap + host_width(hostname), starts + (hostname -> previous))
+      }
+    val width = (last - gap).ceil.toInt
+
+    def node_start(node: Schedule.Node): Double =
+      host_starts(node.node_info.hostname) + padding + rel_node_ranges(node).start
+
+    def paint(gfx: Graphics2D): Unit = {
+      gfx.setColor(Color.LIGHT_GRAY)
+      gfx.fillRect(0, 0, width, height)
+      gfx.setRenderingHints(isabelle.graphview.Metrics.rendering_hints)
+      gfx.setFont(isabelle.graphview.Metrics.default.font)
+      gfx.setStroke(new BasicStroke(1, BasicStroke.CAP_BUTT, BasicStroke.JOIN_ROUND))
+
+      draw_string(schedule.generator + ", build time: " + schedule.duration.message_hms, padding, 0)
+
+      def draw_host(x: Double, hostname: String): Double = {
+        val nodes = hosts(hostname).map(_.job_name).toSet
+        val width = host_width(hostname)
+        val height = 2 * padding + graph_height(graph.restrict(nodes.contains))
+        val padding1 = ((width - text_width(hostname)) / 2) max 0
+        val rect = new Rectangle2D.Double(x, hostname_height, width, height)
+        gfx.setColor(Color.BLACK)
+        gfx.draw(rect)
+        gfx.setColor(Color.GRAY)
+        gfx.fill(rect)
+        draw_string(hostname, x + padding1, generator_height)
+        x + gap + width
+      }
+
+      def draw_string(str: String, x: Double, y: Double): Unit = {
+        gfx.setColor(Color.BLACK)
+        gfx.drawString(str, x.toInt, (y + line_height).toInt)
+      }
+
+      def node_rect(node: Schedule.Node): Rectangle2D.Double = {
+        val x = node_start(node)
+        val y = hostname_height + padding + date_height(node.start)
+        val width = node_width(node)
+        val height = time_height(node.duration)
+        new Rectangle2D.Double(x, y, width, height)
+      }
+
+      def draw_node(node: Schedule.Node): Rectangle2D.Double = {
+        val rect = node_rect(node)
+        gfx.setColor(Color.BLACK)
+        gfx.draw(rect)
+        gfx.setColor(Color.WHITE)
+        gfx.fill(rect)
+
+        def add_text(y: Double, text: String): Double =
+          if (line_height > rect.height - y || text_width(text) + 2 * padding > rect.width) y
+          else {
+            val padding1 = padding min ((rect.height - (y + line_height)) / 2)
+            draw_string(text, rect.x + padding, rect.y + y + padding1)
+            y + padding1 + line_height
+          }
+
+        val node_info = node.node_info
+
+        val duration_str = "(" + node.duration.message_hms + ")"
+        val node_str =
+          "on " + proper_string(node_info.toString.stripPrefix(node_info.hostname)).getOrElse("all")
+        val start_str = "Start: " + (node.start.time - schedule.start.time).message_hms
+
+        List(node.job_name, duration_str, node_str, start_str).foldLeft(0D)(add_text)
+
+        rect
+      }
+
+      def draw_arrow(from: Schedule.Node, to: Rectangle2D.Double, curve: Double = 10): Unit = {
+        val from_rect = node_rect(from)
+
+        val path = new GeneralPath()
+        path.moveTo(from_rect.getCenterX, from_rect.getMaxY)
+        path.lineTo(to.getCenterX, to.getMinY)
+
+        gfx.setColor(Color.BLUE)
+        gfx.draw(path)
+      }
+
+      hosts.keys.foldLeft(0D)(draw_host)
+
+      graph.topological_order.foreach { job_name =>
+        val node = graph.get_node(job_name)
+        val rect = draw_node(node)
+
+        graph.imm_preds(job_name).foreach(pred => draw_arrow(graph.get_node(pred), rect))
+      }
+    }
+
+    val name = output.file_name
+    if (File.is_png(name)) Graphics_File.write_png(output.file, paint, width, height)
+    else if (File.is_pdf(name)) Graphics_File.write_pdf(output.file, paint, width, height)
+    else error("Bad type of file: " + quote(name) + " (.png or .pdf expected)")
+  }
+
+
+  /* command-line wrapper */
+
+  val isabelle_tool = Isabelle_Tool("build_schedule", "generate build schedule", Scala_Project.here,
+    { args =>
+      var afp_root: Option[Path] = None
+      val base_sessions = new mutable.ListBuffer[String]
+      val select_dirs = new mutable.ListBuffer[Path]
+      val build_hosts = new mutable.ListBuffer[Build_Cluster.Host]
+      var numa_shuffling = false
+      var output_file: Option[Path] = None
+      var requirements = false
+      val exclude_session_groups = new mutable.ListBuffer[String]
+      var all_sessions = false
+      val dirs = new mutable.ListBuffer[Path]
+      val session_groups = new mutable.ListBuffer[String]
+      var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS)
+      var verbose = false
+      val exclude_sessions = new mutable.ListBuffer[String]
+
+      val getopts = Getopts("""
+Usage: isabelle build_schedule [OPTIONS] [SESSIONS ...]
+
+  Options are:
+    -A ROOT      include AFP with given root directory (":" for """ + AFP.BASE.implode + """)
+    -B NAME      include session NAME and all descendants
+    -D DIR       include session directory and select its sessions
+    -H HOSTS     additional build cluster host specifications, of the form
+                 "NAMES:PARAMETERS" (separated by commas)
+    -N           cyclic shuffling of NUMA CPU nodes (performance tuning)
+    -O FILE      output file
+    -R           refer to requirements of selected sessions
+    -X NAME      exclude sessions from group NAME and all descendants
+    -a           select all sessions
+    -d DIR       include session directory
+    -g NAME      select session group NAME
+    -o OPTION    override Isabelle system OPTION (via NAME=VAL or NAME)
+    -v           verbose
+    -x NAME      exclude session NAME and all descendants
+
+  Generate build graph.
+""",
+        "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))),
+        "B:" -> (arg => base_sessions += arg),
+        "D:" -> (arg => select_dirs += Path.explode(arg)),
+        "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.global, arg)),
+        "N" -> (_ => numa_shuffling = true),
+        "O:" -> (arg => output_file = Some(Path.explode(arg))),
+        "R" -> (_ => requirements = true),
+        "X:" -> (arg => exclude_session_groups += arg),
+        "a" -> (_ => all_sessions = true),
+        "d:" -> (arg => dirs += Path.explode(arg)),
+        "g:" -> (arg => session_groups += arg),
+        "o:" -> (arg => options = options + arg),
+        "v" -> (_ => verbose = true),
+        "x:" -> (arg => exclude_sessions += arg))
+
+      val sessions = getopts(args)
+
+      val progress = new Console_Progress(verbose = verbose)
+
+      val schedule =
+        build_schedule(options,
+          selection = Sessions.Selection(
+            requirements = requirements,
+            all_sessions = all_sessions,
+            base_sessions = base_sessions.toList,
+            exclude_session_groups = exclude_session_groups.toList,
+            exclude_sessions = exclude_sessions.toList,
+            session_groups = session_groups.toList,
+            sessions = sessions),
+          progress = progress,
+          afp_root = afp_root,
+          dirs = dirs.toList,
+          select_dirs = select_dirs.toList,
+          numa_shuffling = isabelle.Host.numa_check(progress, numa_shuffling),
+          build_hosts = build_hosts.toList)
+
+      if (!schedule.graph.is_empty && output_file.nonEmpty)
+        write_schedule_graphic(schedule, output_file.get)
+    })
 }