# HG changeset patch # User Fabian Huch # Date 1701881091 -3600 # Node ID 7ed43417770ffc9893d540d896377ca7c758aa82 # Parent 96e5d12c82fdf2b7a50c6b6072bcf9a9b5cbdc22 proper parallel paths: factor in elapsed time; diff -r 96e5d12c82fd -r 7ed43417770f src/Pure/Tools/build_schedule.scala --- a/src/Pure/Tools/build_schedule.scala Wed Dec 06 17:42:04 2023 +0100 +++ b/src/Pure/Tools/build_schedule.scala Wed Dec 06 17:44:51 2023 +0100 @@ -581,7 +581,7 @@ def path_max_times(minimals: List[Node]): Map[Node, Time] = path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap - def parallel_paths(minimals: List[Node], pred: Node => Boolean = _ => true): Int = { + def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = { def start(node: Node): (Node, Time) = node -> best_times(node) def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = @@ -605,7 +605,7 @@ (res max running.size, running2) } - parallel_paths(minimals.map(start).toMap)._1 + parallel_paths(running.toMap)._1 } } @@ -649,7 +649,18 @@ val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads) val resources0 = host_infos.available(state.copy(running = Map.empty)) - val max_parallel = parallel_paths(state.ready.map(_.name)) + + def remaining_time(node: Node): (Node, Time) = + state.running.get(node) match { + case None => node -> best_time(node) + case Some(job) => + val estimate = + timing_data.estimate(job.name, job.node_info.hostname, + host_infos.num_threads(job.node_info)) + node -> Time.ms((Time.now() - job.start_date.time + estimate).ms max 0) + } + + val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time)) val fully_parallelizable = max_parallel <= resources0.unused_nodes(max_threads).length val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse @@ -681,7 +692,8 @@ val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task))) - val max_critical_parallel = parallel_paths(critical_minimals, critical_nodes.contains) + val max_critical_parallel = + parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains) val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(max_critical_parallel) val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks)