367 case class Config(job_name: String, node_info: Node_Info) { |
367 case class Config(job_name: String, node_info: Node_Info) { |
368 def job_of(start_time: Time): Build_Process.Job = |
368 def job_of(start_time: Time): Build_Process.Job = |
369 Build_Process.Job(job_name, "", "", node_info, Date(start_time), None) |
369 Build_Process.Job(job_name, "", "", node_info, Date(start_time), None) |
370 } |
370 } |
371 |
371 |
372 case class Resources( |
372 class Resources( |
373 host_infos: Host_Infos, |
373 val host_infos: Host_Infos, |
374 allocated_nodes: Map[Host, List[Node_Info]] |
374 allocated_nodes: Map[String, List[Node_Info]] |
375 ) { |
375 ) { |
376 def unused_nodes(host: Host, threads: Int): List[Node_Info] = |
376 def unused_nodes(host: Host, threads: Int): List[Node_Info] = |
377 if (!available(host, threads)) Nil |
377 if (!available(host, threads)) Nil |
378 else { |
378 else { |
379 val node = next_node(host, threads) |
379 val node = next_node(host, threads) |
381 } |
381 } |
382 |
382 |
383 def unused_nodes(threads: Int): List[Node_Info] = |
383 def unused_nodes(threads: Int): List[Node_Info] = |
384 host_infos.hosts.flatMap(unused_nodes(_, threads)) |
384 host_infos.hosts.flatMap(unused_nodes(_, threads)) |
385 |
385 |
386 def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil) |
386 def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host.name, Nil) |
387 |
387 |
388 def allocate(node_info: Node_Info): Resources = { |
388 def allocate(node_info: Node_Info): Resources = { |
389 val host = host_infos.the_host(node_info) |
389 val host = host_infos.the_host(node_info) |
390 copy(allocated_nodes = allocated_nodes + (host -> (node_info :: allocated(host)))) |
390 new Resources(host_infos, allocated_nodes + (host.name -> (node_info :: allocated(host)))) |
391 } |
391 } |
392 |
392 |
393 def try_allocate_tasks( |
393 def try_allocate_tasks( |
394 hosts: List[(Host, Int)], |
394 hosts: List[(Host, Int)], |
395 tasks: List[(Build_Process.Task, Int, Int)], |
395 tasks: List[(Build_Process.Task, Int, Int)], |
691 val maximals = build_graph.maximals |
692 val maximals = build_graph.maximals |
692 |
693 |
693 def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet |
694 def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet |
694 val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap |
695 val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap |
695 |
696 |
|
697 val best_threads = |
|
698 build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap |
|
699 |
696 def best_time(node: Node): Time = { |
700 def best_time(node: Node): Time = { |
697 val host = ordered_hosts.last |
701 val host = ordered_hosts.last |
698 val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus |
702 val threads = best_threads(node) min host.info.num_cpus |
699 timing_data.estimate(node, host.name, threads) |
703 timing_data.estimate(node, host.name, threads) |
700 } |
704 } |
701 val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap |
705 val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap |
702 |
706 |
703 val succs_max_time_ms = build_graph.node_height(best_times(_).ms) |
707 val succs_max_time_ms = build_graph.node_height(best_times(_).ms) |
711 } |
715 } |
712 |
716 |
713 def path_max_times(minimals: List[Node]): Map[Node, Time] = |
717 def path_max_times(minimals: List[Node]): Map[Node, Time] = |
714 path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap |
718 path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap |
715 |
719 |
716 def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = { |
720 val node_degrees = |
717 def start(node: Node): (Node, Time) = node -> best_times(node) |
721 build_graph.keys.map(node => node -> build_graph.imm_succs(node).size).toMap |
718 |
722 |
719 def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = |
723 def parallel_paths( |
720 node -> (time - elapsed) |
724 running: List[(Node, Time)], |
721 |
725 nodes: Set[Node] = build_graph.keys.toSet, |
722 def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) = |
726 max: Int = Int.MaxValue |
723 if (running.isEmpty) (0, running) |
727 ): Int = |
724 else { |
728 if (nodes.nonEmpty && nodes.map(node_degrees.apply).max > max) max |
725 def get_next(node: Node): List[Node] = |
729 else { |
726 build_graph.imm_succs(node).filter(pred).filter( |
730 def start(node: Node): (Node, Time) = node -> best_times(node) |
727 build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList |
731 |
728 |
732 def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = |
729 val (next, elapsed) = running.minBy(_._2.ms) |
733 node -> (time - elapsed) |
730 val (remaining, finished) = |
734 |
731 running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero) |
735 def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) = |
732 |
736 if (running.size >= max) (max, running) |
733 val running1 = |
737 else if (running.isEmpty) (0, running) |
734 remaining.map(pass_time(elapsed)).toMap ++ |
738 else { |
735 finished.map(_._1).flatMap(get_next).map(start) |
739 def get_next(node: Node): List[Node] = |
736 val (res, running2) = parallel_paths(running1) |
740 build_graph.imm_succs(node).intersect(nodes).filter( |
737 (res max running.size, running2) |
741 build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList |
738 } |
742 |
739 |
743 val (next, elapsed) = running.minBy(_._2.ms) |
740 parallel_paths(running.toMap)._1 |
744 val (remaining, finished) = |
741 } |
745 running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero) |
|
746 |
|
747 val running1 = |
|
748 remaining.map(pass_time(elapsed)).toMap ++ |
|
749 finished.map(_._1).flatMap(get_next).map(start) |
|
750 val (res, running2) = parallel_paths(running1) |
|
751 (res max running.size, running2) |
|
752 } |
|
753 |
|
754 parallel_paths(running.toMap)._1 |
|
755 } |
742 |
756 |
743 def select_next(state: Build_Process.State): List[Config] = { |
757 def select_next(state: Build_Process.State): List[Config] = { |
744 val resources = host_infos.available(state) |
758 val resources = host_infos.available(state) |
745 |
759 |
746 def best_threads(task: Build_Process.Task): Int = |
760 def best_threads(task: Build_Process.Task): Int = this.best_threads(task.name) |
747 timing_data.best_threads(task.name, max_threads) |
|
748 |
761 |
749 val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads) |
762 val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads) |
750 |
763 |
751 val available_nodes = |
764 val available_nodes = |
752 host_infos.available(state.copy(running = Map.empty)) |
765 host_infos.available(state.copy(running = Map.empty)) |
753 .unused_nodes(max_threads) |
766 .unused_nodes(max_threads) |
754 .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse |
767 .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse |
755 |
768 |
756 def remaining_time(node: Node): (Node, Time) = |
769 def remaining_time(node: Node): (Node, Time) = |
757 state.running.get(node) match { |
770 state.running.get(node) match { |
758 case None => node -> best_time(node) |
771 case None => node -> best_times(node) |
759 case Some(job) => |
772 case Some(job) => |
760 val estimate = |
773 val estimate = |
761 timing_data.estimate(job.name, job.node_info.hostname, |
774 timing_data.estimate(job.name, job.node_info.hostname, |
762 host_infos.num_threads(job.node_info)) |
775 host_infos.num_threads(job.node_info)) |
763 node -> ((Time.now() - job.start_date.time + estimate) max Time.zero) |
776 node -> ((Time.now() - job.start_date.time + estimate) max Time.zero) |
764 } |
777 } |
765 |
778 |
766 val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time)) |
|
767 val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse |
779 val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse |
768 |
780 val is_parallelizable = |
769 if (max_parallel <= available_nodes.length) { |
781 available_nodes.length >= parallel_paths( |
|
782 state.ready.map(_.name).map(remaining_time), |
|
783 max = available_nodes.length + 1) |
|
784 |
|
785 if (is_parallelizable) { |
770 val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task))) |
786 val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task))) |
771 resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1 |
787 resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1 |
772 } |
788 } |
773 else { |
789 else { |
774 def is_critical(time: Time): Boolean = |
790 def is_critical(time: Time): Boolean = |
786 val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task))) |
802 val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task))) |
787 |
803 |
788 def parallel_threads(task: Build_Process.Task): Int = |
804 def parallel_threads(task: Build_Process.Task): Int = |
789 this.parallel_threads match { |
805 this.parallel_threads match { |
790 case Fixed_Thread(threads) => threads |
806 case Fixed_Thread(threads) => threads |
791 case Time_Based_Threads(f) => f(best_time(task.name)) |
807 case Time_Based_Threads(f) => f(best_times(task.name)) |
792 } |
808 } |
793 |
809 |
794 val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task))) |
810 val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task))) |
795 |
811 |
796 val max_critical_parallel = |
812 val max_critical_parallel = |
797 parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains) |
813 parallel_paths(critical_minimals.map(remaining_time), critical_nodes) |
798 val max_critical_hosts = |
814 val max_critical_hosts = |
799 available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length |
815 available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length |
800 |
816 |
801 val split = |
817 val split = |
802 this.host_criterion match { |
818 this.host_criterion match { |