proper parallel paths: factor in elapsed time;
authorFabian Huch <huch@in.tum.de>
Wed, 06 Dec 2023 17:44:51 +0100
changeset 79179 7ed43417770f
parent 79178 96e5d12c82fd
child 79180 229f49204603
proper parallel paths: factor in elapsed time;
src/Pure/Tools/build_schedule.scala
--- a/src/Pure/Tools/build_schedule.scala	Wed Dec 06 17:42:04 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala	Wed Dec 06 17:44:51 2023 +0100
@@ -581,7 +581,7 @@
     def path_max_times(minimals: List[Node]): Map[Node, Time] =
       path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
 
-    def parallel_paths(minimals: List[Node], pred: Node => Boolean = _ => true): Int = {
+    def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
       def start(node: Node): (Node, Time) = node -> best_times(node)
 
       def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
@@ -605,7 +605,7 @@
           (res max running.size, running2)
         }
 
-      parallel_paths(minimals.map(start).toMap)._1
+      parallel_paths(running.toMap)._1
     }
   }
 
@@ -649,7 +649,18 @@
       val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
 
       val resources0 = host_infos.available(state.copy(running = Map.empty))
-      val max_parallel = parallel_paths(state.ready.map(_.name))
+
+      def remaining_time(node: Node): (Node, Time) =
+        state.running.get(node) match {
+          case None => node -> best_time(node)
+          case Some(job) =>
+            val estimate =
+              timing_data.estimate(job.name, job.node_info.hostname,
+                host_infos.num_threads(job.node_info))
+            node -> Time.ms((Time.now() - job.start_date.time + estimate).ms max 0)
+        }
+
+      val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time))
       val fully_parallelizable = max_parallel <= resources0.unused_nodes(max_threads).length
 
       val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse
@@ -681,7 +692,8 @@
 
         val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))
 
-        val max_critical_parallel = parallel_paths(critical_minimals, critical_nodes.contains)
+        val max_critical_parallel =
+          parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains)
         val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(max_critical_parallel)
 
         val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks)