src/Pure/Tools/build_schedule.scala
changeset 79028 6bada416ba55
parent 79027 d08fb157e300
child 79029 49e8b031e0cb
--- a/src/Pure/Tools/build_schedule.scala	Thu Nov 23 20:30:45 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala	Thu Nov 23 20:53:58 2023 +0100
@@ -46,11 +46,16 @@
     private def inflection_point(last_mono: Int, next: Int): Int =
       last_mono + ((next - last_mono) / 2)
 
-    def best_threads(job_name: String): Option[Int] = by_job.get(job_name).map(_.best_entry.threads)
-
-    def best_time(job_name: String): Time =
-      by_job.get(job_name).map(_.best_entry.elapsed).getOrElse(
-        estimate(job_name, best_entry.hostname, best_entry.threads))
+    def best_threads(job_name: String, max_threads: Int): Int = {
+      val worse_threads =
+        by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
+          case (hostname, data) =>
+            val best_threads = data.best_entry.threads
+            data.by_threads.keys.toList.sorted.find(_ > best_threads).map(
+              inflection_point(best_threads, _))
+        }
+      (max_threads :: worse_threads).min
+    }
 
     private def hostname_factor(from: String, to: String): Double =
       host_infos.host_factor(host_infos.the_host(from), host_infos.the_host(to))
@@ -371,8 +376,15 @@
 
   abstract class Heuristic(timing_data: Timing_Data, max_threads_limit: Int) extends Scheduler {
     val host_infos = timing_data.host_infos
+    val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
     val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
 
+    def best_time(job_name: String): Time = {
+      val host = ordered_hosts.last
+      val threads = timing_data.best_threads(job_name, max_threads) min host.info.num_cpus
+      timing_data.estimate(job_name, host.info.hostname, threads)
+    }
+
     def build_duration(build_state: Build_Process.State): Time = {
       @tailrec
       def simulate(state: State): State =
@@ -415,7 +427,7 @@
     val maximals_preds =
       all_maximals.map(node => node -> build_graph.all_preds(List(node)).toSet).toMap
 
-    val best_times = build_graph.keys.map(node => node -> timing_data.best_time(node)).toMap
+    val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
     val remaining_time_ms = build_graph.node_height(best_times(_).ms)
 
     def elapsed_times(node: Node): Map[Node, Time] =
@@ -473,10 +485,9 @@
       val resources = host_infos.available(state)
 
       def best_threads(task: Build_Process.Task): Int =
-        timing_data.best_threads(task.name).getOrElse(max_threads)
+        timing_data.best_threads(task.name, max_threads)
 
-      val ordered_hosts =
-        host_infos.hosts.sorted(host_infos.host_speeds).reverse.map(_ -> max_threads)
+      val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
 
       val resources0 = host_infos.available(state.copy(running = Map.empty))
       val max_parallel = parallel_paths(state.ready.map(_.name).toSet)
@@ -484,7 +495,7 @@
 
       if (fully_parallelizable) {
         val all_tasks = state.next_ready.map(task => (task, best_threads(task), best_threads(task)))
-        resources.try_allocate_tasks(ordered_hosts, all_tasks)._1
+        resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
       }
       else {
         val critical_nodes = state.ready.toSet.flatMap(task => critical_path_nodes(task.name))
@@ -498,7 +509,7 @@
 
         val critical_minimals = critical_nodes.intersect(state.ready.map(_.name).toSet)
         val max_critical_parallel = parallel_paths(critical_minimals, critical_nodes.contains)
-        val (critical_hosts, other_hosts) = ordered_hosts.splitAt(max_critical_parallel)
+        val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(max_critical_parallel)
 
         val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks)
         val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks)