609 } |
609 } |
610 } |
610 } |
611 |
611 |
612 |
612 |
613 object Path_Time_Heuristic { |
613 object Path_Time_Heuristic { |
614 sealed trait Criterion |
614 sealed trait Critical_Criterion |
615 case class Absolute_Time(time: Time) extends Criterion { |
615 case class Absolute_Time(time: Time) extends Critical_Criterion { |
616 override def toString: String = "absolute time (" + time.message_hms + ")" |
616 override def toString: String = "absolute time (" + time.message_hms + ")" |
617 } |
617 } |
618 case class Relative_Time(factor: Double) extends Criterion { |
618 case class Relative_Time(factor: Double) extends Critical_Criterion { |
619 override def toString: String = "relative time (" + factor + ")" |
619 override def toString: String = "relative time (" + factor + ")" |
620 } |
620 } |
621 |
621 |
622 sealed trait Strategy |
622 sealed trait Parallel_Strategy |
623 case class Fixed_Thread(threads: Int) extends Strategy { |
623 case class Fixed_Thread(threads: Int) extends Parallel_Strategy { |
624 override def toString: String = "fixed threads (" + threads + ")" |
624 override def toString: String = "fixed threads (" + threads + ")" |
625 } |
625 } |
626 case class Time_Based_Threads(f: Time => Int) extends Strategy { |
626 case class Time_Based_Threads(f: Time => Int) extends Parallel_Strategy { |
627 override def toString: String = "time based threads" |
627 override def toString: String = "time based threads" |
628 } |
628 } |
|
629 |
|
630 sealed trait Host_Criterion |
|
631 case object Critical_Nodes extends Host_Criterion { |
|
632 override def toString: String = "per critical node" |
|
633 } |
|
634 case class Fixed_Fraction(fraction: Double) extends Host_Criterion { |
|
635 override def toString: String = "fixed fraction (" + fraction + ")" |
|
636 } |
|
637 case class Host_Speed(min_factor: Double) extends Host_Criterion { |
|
638 override def toString: String = "host speed (" + min_factor + ")" |
|
639 } |
629 } |
640 } |
630 |
641 |
631 class Path_Time_Heuristic( |
642 class Path_Time_Heuristic( |
632 is_critical: Path_Time_Heuristic.Criterion, |
643 is_critical: Path_Time_Heuristic.Critical_Criterion, |
633 parallel_threads: Path_Time_Heuristic.Strategy, |
644 parallel_threads: Path_Time_Heuristic.Parallel_Strategy, |
|
645 host_criterion: Path_Time_Heuristic.Host_Criterion, |
634 timing_data: Timing_Data, |
646 timing_data: Timing_Data, |
635 sessions_structure: Sessions.Structure, |
647 sessions_structure: Sessions.Structure, |
636 max_threads_limit: Int = 8 |
648 max_threads_limit: Int = 8 |
637 ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit) { |
649 ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit) { |
638 import Path_Time_Heuristic.* |
650 import Path_Time_Heuristic.* |
639 |
651 |
640 override def toString: Node = |
652 override def toString: Node = { |
641 "path time heuristic (critical: " + is_critical + ", parallel: " + parallel_threads + ")" |
653 val params = |
|
654 List( |
|
655 "critical: " + is_critical, |
|
656 "parallel: " + parallel_threads, |
|
657 "fast hosts: " + host_criterion) |
|
658 "path time heuristic (" + params.mkString(", ") + ")" |
|
659 } |
642 |
660 |
643 def next(state: Build_Process.State): List[Config] = { |
661 def next(state: Build_Process.State): List[Config] = { |
644 val resources = host_infos.available(state) |
662 val resources = host_infos.available(state) |
645 |
663 |
646 def best_threads(task: Build_Process.Task): Int = |
664 def best_threads(task: Build_Process.Task): Int = |
647 timing_data.best_threads(task.name, max_threads) |
665 timing_data.best_threads(task.name, max_threads) |
648 |
666 |
649 val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads) |
667 val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads) |
650 |
668 |
651 val resources0 = host_infos.available(state.copy(running = Map.empty)) |
669 val available_nodes = |
|
670 host_infos.available(state.copy(running = Map.empty)) |
|
671 .unused_nodes(max_threads) |
|
672 .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse |
652 |
673 |
653 def remaining_time(node: Node): (Node, Time) = |
674 def remaining_time(node: Node): (Node, Time) = |
654 state.running.get(node) match { |
675 state.running.get(node) match { |
655 case None => node -> best_time(node) |
676 case None => node -> best_time(node) |
656 case Some(job) => |
677 case Some(job) => |
659 host_infos.num_threads(job.node_info)) |
680 host_infos.num_threads(job.node_info)) |
660 node -> Time.ms((Time.now() - job.start_date.time + estimate).ms max 0) |
681 node -> Time.ms((Time.now() - job.start_date.time + estimate).ms max 0) |
661 } |
682 } |
662 |
683 |
663 val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time)) |
684 val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time)) |
664 val fully_parallelizable = max_parallel <= resources0.unused_nodes(max_threads).length |
|
665 |
|
666 val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse |
685 val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse |
667 |
686 |
668 if (fully_parallelizable) { |
687 if (max_parallel <= available_nodes.length) { |
669 val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task))) |
688 val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task))) |
670 resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1 |
689 resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1 |
671 } |
690 } |
672 else { |
691 else { |
673 def is_critical(time: Time): Boolean = |
692 def is_critical(time: Time): Boolean = |
692 |
711 |
693 val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task))) |
712 val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task))) |
694 |
713 |
695 val max_critical_parallel = |
714 val max_critical_parallel = |
696 parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains) |
715 parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains) |
697 val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(max_critical_parallel) |
716 val max_critical_hosts = |
|
717 available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length |
|
718 |
|
719 val split = |
|
720 this.host_criterion match { |
|
721 case Critical_Nodes => max_critical_hosts |
|
722 case Fixed_Fraction(fraction) => |
|
723 ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts |
|
724 case Host_Speed(min_factor) => |
|
725 val best = rev_ordered_hosts.head._1.info.benchmark_score.get |
|
726 val num_fast = |
|
727 rev_ordered_hosts.count(_._1.info.benchmark_score.exists(_ >= best * min_factor)) |
|
728 num_fast min max_critical_hosts |
|
729 } |
|
730 |
|
731 val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split) |
698 |
732 |
699 val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks) |
733 val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks) |
700 val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks) |
734 val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks) |
701 |
735 |
702 configs1 ::: configs2 |
736 configs1 ::: configs2 |
901 Path_Time_Heuristic.Time_Based_Threads({ |
935 Path_Time_Heuristic.Time_Based_Threads({ |
902 case time if time < Time.minutes(1) => 1 |
936 case time if time < Time.minutes(1) => 1 |
903 case time if time < Time.minutes(5) => 4 |
937 case time if time < Time.minutes(5) => 4 |
904 case _ => 8 |
938 case _ => 8 |
905 })) |
939 })) |
|
940 val machine_splits = |
|
941 List( |
|
942 Path_Time_Heuristic.Critical_Nodes, |
|
943 Path_Time_Heuristic.Fixed_Fraction(0.3), |
|
944 Path_Time_Heuristic.Host_Speed(0.9)) |
906 |
945 |
907 val path_time_heuristics = |
946 val path_time_heuristics = |
908 for { |
947 for { |
909 is_critical <- is_criticals |
948 is_critical <- is_criticals |
910 parallel <- parallel_threads |
949 parallel <- parallel_threads |
911 } yield Path_Time_Heuristic(is_critical, parallel, timing_data, sessions_structure) |
950 machine_split <- machine_splits |
|
951 } yield |
|
952 Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure) |
912 val heuristics = Default_Heuristic(timing_data, context.build_options) :: path_time_heuristics |
953 val heuristics = Default_Heuristic(timing_data, context.build_options) :: path_time_heuristics |
913 |
954 |
914 new Meta_Heuristic(heuristics) |
955 new Meta_Heuristic(heuristics) |
915 } |
956 } |
916 |
957 |