equal
deleted
inserted
replaced
214 |
214 |
215 case class Resources( |
215 case class Resources( |
216 host_infos: Host_Infos, |
216 host_infos: Host_Infos, |
217 allocated_nodes: Map[Host, List[Node_Info]] |
217 allocated_nodes: Map[Host, List[Node_Info]] |
218 ) { |
218 ) { |
219 val unused_hosts: List[Host] = host_infos.hosts.filter(allocated(_).isEmpty) |
219 def unused_nodes(threads: Int): List[Node_Info] = { |
|
220 val fully_allocated = |
|
221 host_infos.hosts.foldLeft(this) { case (resources, host) => |
|
222 if (!resources.available(host, threads)) resources |
|
223 else resources.allocate(resources.next_node(host, threads)) |
|
224 } |
|
225 val used_nodes = allocated_nodes.values.flatten.toSet |
|
226 fully_allocated.allocated_nodes.values.flatten.toList.filterNot(used_nodes.contains) |
|
227 } |
220 |
228 |
221 def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil) |
229 def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil) |
222 |
230 |
223 def allocate(node: Node_Info): Resources = { |
231 def allocate(node: Node_Info): Resources = { |
224 val host = host_infos.the_host(node) |
232 val host = host_infos.the_host(node) |
417 |
425 |
418 def best_threads(task: Build_Process.Task): Int = |
426 def best_threads(task: Build_Process.Task): Int = |
419 timing_data.best_threads(task.name).getOrElse( |
427 timing_data.best_threads(task.name).getOrElse( |
420 host_infos.hosts.map(_.info.num_cpus).max min max_threads) |
428 host_infos.hosts.map(_.info.num_cpus).max min max_threads) |
421 |
429 |
422 val free = resources.unused_hosts.map(_ -> max_threads) |
430 val ordered_hosts = |
423 |
431 host_infos.hosts.sorted(host_infos.host_speeds).reverse.map(_ -> max_threads) |
424 if (state.ready.length <= free.length) { |
432 |
|
433 val fully_parallelizable = |
|
434 parallel_paths(state.ready.map(_.name).toSet) <= resources.unused_nodes(max_threads).length |
|
435 |
|
436 if (fully_parallelizable) { |
425 val all_tasks = state.ready.map(task => (task, best_threads(task), best_threads(task))) |
437 val all_tasks = state.ready.map(task => (task, best_threads(task), best_threads(task))) |
426 resources.try_allocate_tasks(free, all_tasks)._1 |
438 resources.try_allocate_tasks(ordered_hosts, all_tasks)._1 |
427 } |
439 } |
428 else { |
440 else { |
429 val pending_tasks = state.pending.map(_.name).toSet |
441 val pending_tasks = state.pending.map(_.name).toSet |
430 |
442 |
431 val critical_nodes = state.ready.toSet.flatMap(task => critical_path_nodes(task.name)) |
443 val critical_nodes = state.ready.toSet.flatMap(task => critical_path_nodes(task.name)) |
437 |
449 |
438 val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task))) |
450 val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task))) |
439 val other_tasks = other.map(task => (task, 1, best_threads(task))) |
451 val other_tasks = other.map(task => (task, 1, best_threads(task))) |
440 |
452 |
441 val (critical_hosts, other_hosts) = |
453 val (critical_hosts, other_hosts) = |
442 host_infos.hosts.sorted(host_infos.host_speeds).reverse.map(_ -> max_threads).splitAt( |
454 ordered_hosts.splitAt(parallel_paths(critical.map(_.name).toSet, is_critical)) |
443 parallel_paths(critical.map(_.name).toSet, is_critical)) |
|
444 |
455 |
445 val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks) |
456 val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks) |
446 val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks) |
457 val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks) |
447 |
458 |
448 configs1 ::: configs2 |
459 configs1 ::: configs2 |