merged
authorpaulson
Tue, 13 Feb 2024 17:18:57 +0000
changeset 79598 66cbd1ef0db1
parent 79594 f933e9153624 (diff)
parent 79597 76a1c0ea6777 (current diff)
child 79599 2c18ac57e92e
merged
--- a/Admin/components/PLATFORMS	Tue Feb 13 17:18:50 2024 +0000
+++ b/Admin/components/PLATFORMS	Tue Feb 13 17:18:57 2024 +0000
@@ -50,7 +50,7 @@
 
 Experimental platforms:
 
-  arm64-linux       Ubuntu 18.04 LTS (e.g. via "docker run -it ubuntu:20.04 bash")
+  arm64-linux       Ubuntu 18.04 LTS (e.g. via "docker run -it ubuntu:18.04 bash")
 
 
 64 bit vs. 32 bit platform personality
--- a/src/Pure/Build/build_schedule.scala	Tue Feb 13 17:18:50 2024 +0000
+++ b/src/Pure/Build/build_schedule.scala	Tue Feb 13 17:18:57 2024 +0000
@@ -14,25 +14,11 @@
 object Build_Schedule {
   /* organized historic timing information (extracted from build logs) */
 
-  case class Timing_Entry(job_name: String, hostname: String, threads: Int, timing: Timing) {
+  case class Result(job_name: String, hostname: String, threads: Int, timing: Timing) {
     def elapsed: Time = timing.elapsed
     def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms)
   }
 
-  case class Timing_Entries(entries: List[Timing_Entry]) {
-    require(entries.nonEmpty)
-
-    def is_empty = entries.isEmpty
-    def size = entries.length
-
-    lazy val by_job = entries.groupBy(_.job_name).view.mapValues(Timing_Entries(_)).toMap
-    lazy val by_threads = entries.groupBy(_.threads).view.mapValues(Timing_Entries(_)).toMap
-    lazy val by_hostname = entries.groupBy(_.hostname).view.mapValues(Timing_Entries(_)).toMap
-
-    def mean_time: Time = Timing_Data.mean_time(entries.map(_.elapsed))
-    def best_entry: Timing_Entry = entries.minBy(_.elapsed.ms)
-  }
-
   object Timing_Data {
     def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2)
 
@@ -44,8 +30,8 @@
       val baseline = Time.minutes(5).scale(host_factor)
       val gc = Time.seconds(10).scale(host_factor)
       List(
-        Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)),
-        Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
+        Result("dummy", host.name, 1, Timing(baseline, baseline, gc)),
+        Result("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
     }
 
     def make(
@@ -73,10 +59,10 @@
         else
           measurements.groupMap(_._1)(_._2).toList.map {
             case ((job_name, hostname, threads), timings) =>
-              Timing_Entry(job_name, hostname, threads, median_timing(timings))
+              Result(job_name, hostname, threads, median_timing(timings))
           }
 
-      new Timing_Data(Timing_Entries(entries), host_infos)
+      new Timing_Data(new Facet(entries), host_infos)
     }
 
     def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = {
@@ -91,18 +77,41 @@
 
       make(host_infos, build_history)
     }
+
+
+    /* data facets */
+
+    object Facet {
+      def unapply(facet: Facet): Option[List[Result]] = Some(facet.results)
+    }
+
+    class Facet private[Timing_Data](val results: List[Result]) {
+      require(results.nonEmpty)
+
+      def is_empty = results.isEmpty
+
+      def size = results.length
+
+      lazy val by_job = results.groupBy(_.job_name).view.mapValues(new Facet(_)).toMap
+      lazy val by_threads = results.groupBy(_.threads).view.mapValues(new Facet(_)).toMap
+      lazy val by_hostname = results.groupBy(_.hostname).view.mapValues(new Facet(_)).toMap
+
+      def mean_time: Time = Timing_Data.mean_time(results.map(_.elapsed))
+
+      def best_result: Result = results.minBy(_.elapsed.ms)
+    }
   }
 
-  class Timing_Data private(data: Timing_Entries, val host_infos: Host_Infos) {
+  class Timing_Data private(facet: Timing_Data.Facet, val host_infos: Host_Infos) {
     private def inflection_point(last_mono: Int, next: Int): Int =
       last_mono + ((next - last_mono) / 2)
 
     def best_threads(job_name: String, max_threads: Int): Int = {
       val worse_threads =
-        data.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
-          case (hostname, data) =>
-            val best_threads = data.best_entry.threads
-            data.by_threads.keys.toList.sorted.find(_ > best_threads).map(
+        facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
+          case (hostname, facet) =>
+            val best_threads = facet.best_result.threads
+           facet.by_threads.keys.toList.sorted.find(_ > best_threads).map(
               inflection_point(best_threads, _))
         }
       (max_threads :: worse_threads).min
@@ -170,31 +179,31 @@
     }
 
     private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = {
-      def unify(hostname: String, data: Timing_Entries) =
-        data.mean_time.scale(hostname_factor(hostname, on_host))
+      def unify(hostname: String, facet: Timing_Data.Facet) =
+        facet.mean_time.scale(hostname_factor(hostname, on_host))
 
       for {
-        data <- data.by_job.get(job_name).toList
-        (threads, data) <- data.by_threads
-        entries = data.by_hostname.toList.map(unify)
+        facet <- facet.by_job.get(job_name).toList
+        (threads, facet) <- facet.by_threads
+        entries = facet.by_hostname.toList.map(unify)
       } yield threads -> Timing_Data.median_time(entries)
     }
 
     def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = {
-      def try_approximate(data: Timing_Entries): Option[Time] = {
+      def try_approximate(facet: Timing_Data.Facet): Option[Time] = {
         val entries =
-          data.by_threads.toList match {
-            case List((i, Timing_Entries(List(entry)))) if i != 1 =>
-              (i, data.mean_time) :: entry.proper_cpu.map(1 -> _).toList
-            case entries => entries.map((threads, data) => threads -> data.mean_time)
+          facet.by_threads.toList match {
+            case List((i, Timing_Data.Facet(List(result)))) if i != 1 =>
+              (i, facet.mean_time) :: result.proper_cpu.map(1 -> _).toList
+            case entries => entries.map((threads, facet) => threads -> facet.mean_time)
           }
         if (entries.size < 2) None else Some(approximate_threads(entries, threads))
       }
 
       for {
-        data <- data.by_job.get(job_name)
-        data <- data.by_hostname.get(hostname)
-        time <- data.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(data))
+        facet <- facet.by_job.get(job_name)
+        facet <- facet.by_hostname.get(hostname)
+        time <- facet.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(facet))
       } yield time
     }
 
@@ -203,8 +212,8 @@
 
       val estimates =
         for {
-          (hostname, data) <- data.by_hostname
-          job_name <- data.by_job.keys
+          (hostname, facet) <- facet.by_hostname
+          job_name <- facet.by_job.keys
           from_time <- estimate_threads(job_name, hostname, from)
           to_time <- estimate_threads(job_name, hostname, to)
         } yield from_time.ms.toDouble / to_time.ms
@@ -214,8 +223,8 @@
         // unify hosts
         val estimates =
           for {
-            (job_name, data) <- data.by_job
-            hostname = data.by_hostname.keys.head
+            (job_name, facet) <- facet.by_job
+            hostname = facet.by_hostname.keys.head
             entries = unify_hosts(job_name, hostname)
             if entries.length > 1
           } yield
@@ -239,26 +248,26 @@
 
     def estimate(job_name: String, hostname: String, threads: Int): Time = {
       def estimate: Time =
-        data.by_job.get(job_name) match {
+        facet.by_job.get(job_name) match {
           case None =>
             // no data for job, take average of other jobs for given threads
-            val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
+            val job_estimates = facet.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
             if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
             else {
               // no other job to estimate from, use global curve to approximate any other job
-              val (threads1, data1) = data.by_threads.head
-              data1.mean_time.scale(global_threads_factor(threads1, threads))
+              val (threads1, facet1) = facet.by_threads.head
+              facet1.mean_time.scale(global_threads_factor(threads1, threads))
             }
 
-          case Some(data) =>
-            data.by_threads.get(threads) match {
+          case Some(facet) =>
+            facet.by_threads.get(threads) match {
               case None => // interpolate threads
                 estimate_threads(job_name, hostname, threads).map(_.scale(
                   FACTOR_NO_THREADS_SAME_MACHINE)).getOrElse {
                   // per machine, try to approximate config for threads
                   val approximated =
                     for {
-                      hostname1 <- data.by_hostname.keys
+                      hostname1 <- facet.by_hostname.keys
                       estimate <- estimate_threads(job_name, hostname1, threads)
                       factor = hostname_factor(hostname1, hostname)
                     } yield estimate.scale(factor)
@@ -281,11 +290,11 @@
                   }
                 }
 
-              case Some(data) => // time for job/thread exists, interpolate machine if necessary
-                data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
+              case Some(facet) => // time for job/thread exists, interpolate machine if necessary
+                facet.by_hostname.get(hostname).map(_.mean_time).getOrElse {
                   Timing_Data.median_time(
-                    data.by_hostname.toList.map((hostname1, data) =>
-                      data.mean_time.scale(hostname_factor(hostname1, hostname)))).scale(
+                    facet.by_hostname.toList.map((hostname1, facet) =>
+                      facet.mean_time.scale(hostname_factor(hostname1, hostname)))).scale(
                     FACTOR_THREADS_OTHER_MACHINE)
                 }
             }
@@ -347,8 +356,8 @@
 
     def available(state: Build_Process.State): Resources = {
       val allocated =
-        state.running.values.map(_.node_info).groupMapReduce(the_host)(List(_))(_ ::: _)
-      Resources(this, allocated)
+        state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _)
+      new Resources(this, allocated)
     }
   }
 
@@ -360,9 +369,9 @@
       Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
   }
 
-  case class Resources(
-    host_infos: Host_Infos,
-    allocated_nodes: Map[Host, List[Node_Info]]
+  class Resources(
+    val host_infos: Host_Infos,
+    allocated_nodes: Map[String, List[Node_Info]]
   ) {
     def unused_nodes(host: Host, threads: Int): List[Node_Info] =
       if (!available(host, threads)) Nil
@@ -374,11 +383,11 @@
     def unused_nodes(threads: Int): List[Node_Info] =
       host_infos.hosts.flatMap(unused_nodes(_, threads))
 
-    def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil)
+    def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host.name, Nil)
 
-    def allocate(node: Node_Info): Resources = {
-      val host = host_infos.the_host(node)
-      copy(allocated_nodes = allocated_nodes + (host -> (node :: allocated(host))))
+    def allocate(node_info: Node_Info): Resources = {
+      val host = host_infos.the_host(node_info)
+      new Resources(host_infos, allocated_nodes + (host.name -> (node_info :: allocated(host))))
     }
 
     def try_allocate_tasks(
@@ -538,7 +547,8 @@
 
         val host_preds =
           for {
-            (name, (pred_node, _)) <- finished.graph.iterator.toSet
+            name <- finished.graph.keys
+            pred_node = finished.graph.get_node(name)
             if pred_node.node_info.hostname == job.node_info.hostname
             if pred_node.end.time <= node.start.time
           } yield name
@@ -556,35 +566,49 @@
     def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
   }
 
-  trait Scheduler { def build_schedule(build_state: Build_Process.State): Schedule }
+  trait Scheduler { def schedule(build_state: Build_Process.State): Schedule }
+
+  trait Priority_Rule { def select_next(state: Build_Process.State): List[Config] }
 
-  abstract class Heuristic(timing_data: Timing_Data, build_uuid: String)
-    extends Scheduler {
-    val host_infos = timing_data.host_infos
-    val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
-
-    def next(state: Build_Process.State): List[Config]
-
-    def build_schedule(build_state: Build_Process.State): Schedule = {
+  case class Generation_Scheme(
+    priority_rule: Priority_Rule,
+    timing_data: Timing_Data,
+    build_uuid: String
+  ) extends Scheduler {
+    def schedule(build_state: Build_Process.State): Schedule = {
       @tailrec
       def simulate(state: State): State =
         if (state.is_finished) state
         else {
-          val state1 = next(state.build_state).foldLeft(state)(_.start(_)).step(timing_data)
+          val state1 =
+            priority_rule
+              .select_next(state.build_state)
+              .foldLeft(state)(_.start(_))
+              .step(timing_data)
           simulate(state1)
         }
 
       val start = Date.now()
+      val name = "generation scheme (" + priority_rule + ")"
       val end_state =
-        simulate(State(build_state, start.time, Schedule(build_uuid, toString, start, Graph.empty)))
+        simulate(State(build_state, start.time, Schedule(build_uuid, name, start, Graph.empty)))
 
       end_state.finished
     }
   }
 
-  class Default_Heuristic(timing_data: Timing_Data, options: Options, build_uuid: String)
-    extends Heuristic(timing_data, build_uuid) {
-    override def toString: String = "default build heuristic"
+  case class Optimizer(schedulers: List[Scheduler]) extends Scheduler {
+    require(schedulers.nonEmpty)
+
+    def schedule(state: Build_Process.State): Schedule =
+      schedulers.map(_.schedule(state)).minBy(_.duration.ms)
+  }
+
+
+  /* priority rules */
+
+  class Default_Heuristic(host_infos: Host_Infos, options: Options) extends Priority_Rule {
+    override def toString: String = "default heuristic"
 
     def host_threads(host: Host): Int = {
       val m = (options ++ host.build.options).int("threads")
@@ -594,7 +618,7 @@
     def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] =
       sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _))
 
-    def next(state: Build_Process.State): List[Config] = {
+    def select_next(state: Build_Process.State): List[Config] = {
       val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name)
       val resources = host_infos.available(state)
 
@@ -607,89 +631,6 @@
     }
   }
 
-  class Meta_Heuristic(heuristics: List[Heuristic]) extends Scheduler {
-    require(heuristics.nonEmpty)
-
-    def best_result(state: Build_Process.State): (Heuristic, Schedule) =
-      heuristics.map(heuristic =>
-        heuristic -> heuristic.build_schedule(state)).minBy(_._2.duration.ms)
-
-    def next(state: Build_Process.State): List[Config] = best_result(state)._1.next(state)
-
-    def build_schedule(state: Build_Process.State): Schedule = best_result(state)._2
-  }
-
-
-  /* heuristics */
-
-  abstract class Path_Heuristic(
-    timing_data: Timing_Data,
-    sessions_structure: Sessions.Structure,
-    max_threads_limit: Int,
-    build_uuid: String
-  ) extends Heuristic(timing_data, build_uuid) {
-    /* pre-computed properties for efficient heuristic */
-
-    val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
-
-    type Node = String
-    val build_graph = sessions_structure.build_graph
-
-    val minimals = build_graph.minimals
-    val maximals = build_graph.maximals
-
-    def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
-    val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap
-
-    def best_time(node: Node): Time = {
-      val host = ordered_hosts.last
-      val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus
-      timing_data.estimate(node, host.name, threads)
-    }
-    val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
-
-    val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
-    def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node)
-    def max_time(task: Build_Process.Task): Time = max_time(task.name)
-
-    def path_times(minimals: List[Node]): Map[Node, Time] = {
-      def time_ms(node: Node): Long = best_times(node).ms
-      val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals)
-      path_times_ms.view.mapValues(Time.ms).toMap
-    }
-
-    def path_max_times(minimals: List[Node]): Map[Node, Time] =
-      path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
-
-    def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
-      def start(node: Node): (Node, Time) = node -> best_times(node)
-
-      def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
-        node -> (time - elapsed)
-
-      def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
-        if (running.isEmpty) (0, running)
-        else {
-          def get_next(node: Node): List[Node] =
-            build_graph.imm_succs(node).filter(pred).filter(
-              build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList
-
-          val (next, elapsed) = running.minBy(_._2.ms)
-          val (remaining, finished) =
-            running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
-
-          val running1 =
-            remaining.map(pass_time(elapsed)).toMap ++
-              finished.map(_._1).flatMap(get_next).map(start)
-          val (res, running2) = parallel_paths(running1)
-          (res max running.size, running2)
-        }
-
-      parallel_paths(running.toMap)._1
-    }
-  }
-
-
   object Path_Time_Heuristic {
     sealed trait Critical_Criterion
     case class Absolute_Time(time: Time) extends Critical_Criterion {
@@ -725,9 +666,8 @@
     host_criterion: Path_Time_Heuristic.Host_Criterion,
     timing_data: Timing_Data,
     sessions_structure: Sessions.Structure,
-    build_uuid: String,
     max_threads_limit: Int = 8
-  ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit, build_uuid) {
+  ) extends Priority_Rule {
     import Path_Time_Heuristic.*
 
     override def toString: Node = {
@@ -739,11 +679,85 @@
       "path time heuristic (" + params.mkString(", ") + ")"
     }
 
-    def next(state: Build_Process.State): List[Config] = {
+    /* pre-computed properties for efficient heuristic */
+    val host_infos = timing_data.host_infos
+    val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
+
+    val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
+
+    type Node = String
+    val build_graph = sessions_structure.build_graph
+
+    val minimals = build_graph.minimals
+    val maximals = build_graph.maximals
+
+    def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
+    val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap
+
+    val best_threads =
+      build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap
+
+    def best_time(node: Node): Time = {
+      val host = ordered_hosts.last
+      val threads = best_threads(node) min host.info.num_cpus
+      timing_data.estimate(node, host.name, threads)
+    }
+    val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
+
+    val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
+    def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node)
+    def max_time(task: Build_Process.Task): Time = max_time(task.name)
+
+    def path_times(minimals: List[Node]): Map[Node, Time] = {
+      def time_ms(node: Node): Long = best_times(node).ms
+      val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals)
+      path_times_ms.view.mapValues(Time.ms).toMap
+    }
+
+    def path_max_times(minimals: List[Node]): Map[Node, Time] =
+      path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
+
+    val node_degrees =
+      build_graph.keys.map(node => node -> build_graph.imm_succs(node).size).toMap
+
+    def parallel_paths(
+      running: List[(Node, Time)],
+      nodes: Set[Node] = build_graph.keys.toSet,
+      max: Int = Int.MaxValue
+    ): Int =
+      if (nodes.nonEmpty && nodes.map(node_degrees.apply).max > max) max
+      else {
+        def start(node: Node): (Node, Time) = node -> best_times(node)
+
+        def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
+          node -> (time - elapsed)
+
+        def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
+          if (running.size >= max) (max, running)
+          else if (running.isEmpty) (0, running)
+          else {
+            def get_next(node: Node): List[Node] =
+              build_graph.imm_succs(node).intersect(nodes).filter(
+                build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList
+
+            val (next, elapsed) = running.minBy(_._2.ms)
+            val (remaining, finished) =
+              running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
+
+            val running1 =
+              remaining.map(pass_time(elapsed)).toMap ++
+                finished.map(_._1).flatMap(get_next).map(start)
+            val (res, running2) = parallel_paths(running1)
+            (res max running.size, running2)
+          }
+
+        parallel_paths(running.toMap)._1
+      }
+
+    def select_next(state: Build_Process.State): List[Config] = {
       val resources = host_infos.available(state)
 
-      def best_threads(task: Build_Process.Task): Int =
-        timing_data.best_threads(task.name, max_threads)
+      def best_threads(task: Build_Process.Task): Int = this.best_threads(task.name)
 
       val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
 
@@ -754,7 +768,7 @@
 
       def remaining_time(node: Node): (Node, Time) =
         state.running.get(node) match {
-          case None => node -> best_time(node)
+          case None => node -> best_times(node)
           case Some(job) =>
             val estimate =
               timing_data.estimate(job.name, job.node_info.hostname,
@@ -762,10 +776,13 @@
             node -> ((Time.now() - job.start_date.time + estimate) max Time.zero)
         }
 
-      val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time))
       val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse
+      val is_parallelizable =
+        available_nodes.length >= parallel_paths(
+          state.ready.map(_.name).map(remaining_time),
+          max = available_nodes.length + 1)
 
-      if (max_parallel <= available_nodes.length) {
+      if (is_parallelizable) {
         val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task)))
         resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
       }
@@ -787,13 +804,13 @@
         def parallel_threads(task: Build_Process.Task): Int =
           this.parallel_threads match {
             case Fixed_Thread(threads) => threads
-            case Time_Based_Threads(f) => f(best_time(task.name))
+            case Time_Based_Threads(f) => f(best_times(task.name))
           }
 
         val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))
 
         val max_critical_parallel =
-          parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains)
+          parallel_paths(critical_minimals.map(remaining_time), critical_nodes)
         val max_critical_hosts =
           available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length
 
@@ -1002,7 +1019,7 @@
         else {
           val start = Time.now()
 
-          val new_schedule = scheduler.build_schedule(state).update(state)
+          val new_schedule = scheduler.schedule(state).update(state)
           val schedule =
             if (_schedule.is_empty) new_schedule
             else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering)
@@ -1230,11 +1247,10 @@
           parallel <- parallel_threads
           machine_split <- machine_splits
         } yield
-          Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure,
-            context.build_uuid)
-      val default_heuristic =
-        Default_Heuristic(timing_data, context.build_options, context.build_uuid)
-      new Meta_Heuristic(default_heuristic :: path_time_heuristics)
+          Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure)
+      val default_heuristic = Default_Heuristic(timing_data.host_infos, context.build_options)
+      val heuristics = default_heuristic :: path_time_heuristics
+      Optimizer(heuristics.map(Generation_Scheme(_, timing_data, context.build_uuid)))
     }
 
     override def open_build_process(
@@ -1318,7 +1334,8 @@
       def schedule_msg(res: Exn.Result[Schedule]): String =
         res match { case Exn.Res(schedule) => schedule.message case _ => "" }
 
-      Timing.timeit(scheduler.build_schedule(build_state), schedule_msg, output = progress.echo(_))
+      progress.echo("Building schedule...")
+      Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_))
     }
 
     using(store.open_server()) { server =>