554 } |
554 } |
555 |
555 |
556 def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty |
556 def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty |
557 } |
557 } |
558 |
558 |
559 trait Scheduler { def build_schedule(build_state: Build_Process.State): Schedule } |
559 trait Scheduler { def schedule(build_state: Build_Process.State): Schedule } |
560 |
560 |
561 abstract class Heuristic(timing_data: Timing_Data, build_uuid: String) |
561 trait Priority_Rule { def select_next(state: Build_Process.State): List[Config] } |
562 extends Scheduler { |
562 |
563 val host_infos = timing_data.host_infos |
563 case class Generation_Scheme( |
564 val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds) |
564 priority_rule: Priority_Rule, |
565 |
565 timing_data: Timing_Data, |
566 def next(state: Build_Process.State): List[Config] |
566 build_uuid: String |
567 |
567 ) extends Scheduler { |
568 def build_schedule(build_state: Build_Process.State): Schedule = { |
568 def schedule(build_state: Build_Process.State): Schedule = { |
569 @tailrec |
569 @tailrec |
570 def simulate(state: State): State = |
570 def simulate(state: State): State = |
571 if (state.is_finished) state |
571 if (state.is_finished) state |
572 else { |
572 else { |
573 val state1 = next(state.build_state).foldLeft(state)(_.start(_)).step(timing_data) |
573 val state1 = |
|
574 priority_rule |
|
575 .select_next(state.build_state) |
|
576 .foldLeft(state)(_.start(_)) |
|
577 .step(timing_data) |
574 simulate(state1) |
578 simulate(state1) |
575 } |
579 } |
576 |
580 |
577 val start = Date.now() |
581 val start = Date.now() |
|
582 val name = "generation scheme (" + priority_rule + ")" |
578 val end_state = |
583 val end_state = |
579 simulate(State(build_state, start.time, Schedule(build_uuid, toString, start, Graph.empty))) |
584 simulate(State(build_state, start.time, Schedule(build_uuid, name, start, Graph.empty))) |
580 |
585 |
581 end_state.finished |
586 end_state.finished |
582 } |
587 } |
583 } |
588 } |
584 |
589 |
585 class Default_Heuristic(timing_data: Timing_Data, options: Options, build_uuid: String) |
590 case class Optimizer(schedulers: List[Scheduler]) extends Scheduler { |
586 extends Heuristic(timing_data, build_uuid) { |
591 require(schedulers.nonEmpty) |
587 override def toString: String = "default build heuristic" |
592 |
|
593 def schedule(state: Build_Process.State): Schedule = |
|
594 schedulers.map(_.schedule(state)).minBy(_.duration.ms) |
|
595 } |
|
596 |
|
597 |
|
598 /* priority rules */ |
|
599 |
|
600 class Default_Heuristic(host_infos: Host_Infos, options: Options) extends Priority_Rule { |
|
601 override def toString: String = "default heuristic" |
588 |
602 |
589 def host_threads(host: Host): Int = { |
603 def host_threads(host: Host): Int = { |
590 val m = (options ++ host.build.options).int("threads") |
604 val m = (options ++ host.build.options).int("threads") |
591 if (m > 0) m else (host.num_cpus max 1) min 8 |
605 if (m > 0) m else (host.num_cpus max 1) min 8 |
592 } |
606 } |
593 |
607 |
594 def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] = |
608 def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] = |
595 sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _)) |
609 sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _)) |
596 |
610 |
597 def next(state: Build_Process.State): List[Config] = { |
611 def select_next(state: Build_Process.State): List[Config] = { |
598 val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name) |
612 val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name) |
599 val resources = host_infos.available(state) |
613 val resources = host_infos.available(state) |
600 |
614 |
601 host_infos.hosts.foldLeft((sorted_jobs, List.empty[Config])) { |
615 host_infos.hosts.foldLeft((sorted_jobs, List.empty[Config])) { |
602 case ((jobs, res), host) => |
616 case ((jobs, res), host) => |
605 (jobs.filterNot(config_jobs.contains), configs ::: res) |
619 (jobs.filterNot(config_jobs.contains), configs ::: res) |
606 }._2 |
620 }._2 |
607 } |
621 } |
608 } |
622 } |
609 |
623 |
610 class Meta_Heuristic(heuristics: List[Heuristic]) extends Scheduler { |
|
611 require(heuristics.nonEmpty) |
|
612 |
|
613 def best_result(state: Build_Process.State): (Heuristic, Schedule) = |
|
614 heuristics.map(heuristic => |
|
615 heuristic -> heuristic.build_schedule(state)).minBy(_._2.duration.ms) |
|
616 |
|
617 def next(state: Build_Process.State): List[Config] = best_result(state)._1.next(state) |
|
618 |
|
619 def build_schedule(state: Build_Process.State): Schedule = best_result(state)._2 |
|
620 } |
|
621 |
|
622 |
|
623 /* heuristics */ |
|
624 |
|
625 abstract class Path_Heuristic( |
|
626 timing_data: Timing_Data, |
|
627 sessions_structure: Sessions.Structure, |
|
628 max_threads_limit: Int, |
|
629 build_uuid: String |
|
630 ) extends Heuristic(timing_data, build_uuid) { |
|
631 /* pre-computed properties for efficient heuristic */ |
|
632 |
|
633 val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit |
|
634 |
|
635 type Node = String |
|
636 val build_graph = sessions_structure.build_graph |
|
637 |
|
638 val minimals = build_graph.minimals |
|
639 val maximals = build_graph.maximals |
|
640 |
|
641 def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet |
|
642 val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap |
|
643 |
|
644 def best_time(node: Node): Time = { |
|
645 val host = ordered_hosts.last |
|
646 val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus |
|
647 timing_data.estimate(node, host.name, threads) |
|
648 } |
|
649 val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap |
|
650 |
|
651 val succs_max_time_ms = build_graph.node_height(best_times(_).ms) |
|
652 def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node) |
|
653 def max_time(task: Build_Process.Task): Time = max_time(task.name) |
|
654 |
|
655 def path_times(minimals: List[Node]): Map[Node, Time] = { |
|
656 def time_ms(node: Node): Long = best_times(node).ms |
|
657 val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals) |
|
658 path_times_ms.view.mapValues(Time.ms).toMap |
|
659 } |
|
660 |
|
661 def path_max_times(minimals: List[Node]): Map[Node, Time] = |
|
662 path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap |
|
663 |
|
664 def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = { |
|
665 def start(node: Node): (Node, Time) = node -> best_times(node) |
|
666 |
|
667 def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = |
|
668 node -> (time - elapsed) |
|
669 |
|
670 def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) = |
|
671 if (running.isEmpty) (0, running) |
|
672 else { |
|
673 def get_next(node: Node): List[Node] = |
|
674 build_graph.imm_succs(node).filter(pred).filter( |
|
675 build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList |
|
676 |
|
677 val (next, elapsed) = running.minBy(_._2.ms) |
|
678 val (remaining, finished) = |
|
679 running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero) |
|
680 |
|
681 val running1 = |
|
682 remaining.map(pass_time(elapsed)).toMap ++ |
|
683 finished.map(_._1).flatMap(get_next).map(start) |
|
684 val (res, running2) = parallel_paths(running1) |
|
685 (res max running.size, running2) |
|
686 } |
|
687 |
|
688 parallel_paths(running.toMap)._1 |
|
689 } |
|
690 } |
|
691 |
|
692 |
|
693 object Path_Time_Heuristic { |
624 object Path_Time_Heuristic { |
694 sealed trait Critical_Criterion |
625 sealed trait Critical_Criterion |
695 case class Absolute_Time(time: Time) extends Critical_Criterion { |
626 case class Absolute_Time(time: Time) extends Critical_Criterion { |
696 override def toString: String = "absolute time (" + time.message_hms + ")" |
627 override def toString: String = "absolute time (" + time.message_hms + ")" |
697 } |
628 } |
723 is_critical: Path_Time_Heuristic.Critical_Criterion, |
654 is_critical: Path_Time_Heuristic.Critical_Criterion, |
724 parallel_threads: Path_Time_Heuristic.Parallel_Strategy, |
655 parallel_threads: Path_Time_Heuristic.Parallel_Strategy, |
725 host_criterion: Path_Time_Heuristic.Host_Criterion, |
656 host_criterion: Path_Time_Heuristic.Host_Criterion, |
726 timing_data: Timing_Data, |
657 timing_data: Timing_Data, |
727 sessions_structure: Sessions.Structure, |
658 sessions_structure: Sessions.Structure, |
728 build_uuid: String, |
|
729 max_threads_limit: Int = 8 |
659 max_threads_limit: Int = 8 |
730 ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit, build_uuid) { |
660 ) extends Priority_Rule { |
731 import Path_Time_Heuristic.* |
661 import Path_Time_Heuristic.* |
732 |
662 |
733 override def toString: Node = { |
663 override def toString: Node = { |
734 val params = |
664 val params = |
735 List( |
665 List( |
737 "parallel: " + parallel_threads, |
667 "parallel: " + parallel_threads, |
738 "fast hosts: " + host_criterion) |
668 "fast hosts: " + host_criterion) |
739 "path time heuristic (" + params.mkString(", ") + ")" |
669 "path time heuristic (" + params.mkString(", ") + ")" |
740 } |
670 } |
741 |
671 |
742 def next(state: Build_Process.State): List[Config] = { |
672 /* pre-computed properties for efficient heuristic */ |
|
673 val host_infos = timing_data.host_infos |
|
674 val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds) |
|
675 |
|
676 val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit |
|
677 |
|
678 type Node = String |
|
679 val build_graph = sessions_structure.build_graph |
|
680 |
|
681 val minimals = build_graph.minimals |
|
682 val maximals = build_graph.maximals |
|
683 |
|
684 def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet |
|
685 val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap |
|
686 |
|
687 def best_time(node: Node): Time = { |
|
688 val host = ordered_hosts.last |
|
689 val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus |
|
690 timing_data.estimate(node, host.name, threads) |
|
691 } |
|
692 val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap |
|
693 |
|
694 val succs_max_time_ms = build_graph.node_height(best_times(_).ms) |
|
695 def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node) |
|
696 def max_time(task: Build_Process.Task): Time = max_time(task.name) |
|
697 |
|
698 def path_times(minimals: List[Node]): Map[Node, Time] = { |
|
699 def time_ms(node: Node): Long = best_times(node).ms |
|
700 val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals) |
|
701 path_times_ms.view.mapValues(Time.ms).toMap |
|
702 } |
|
703 |
|
704 def path_max_times(minimals: List[Node]): Map[Node, Time] = |
|
705 path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap |
|
706 |
|
707 def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = { |
|
708 def start(node: Node): (Node, Time) = node -> best_times(node) |
|
709 |
|
710 def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) = |
|
711 node -> (time - elapsed) |
|
712 |
|
713 def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) = |
|
714 if (running.isEmpty) (0, running) |
|
715 else { |
|
716 def get_next(node: Node): List[Node] = |
|
717 build_graph.imm_succs(node).filter(pred).filter( |
|
718 build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList |
|
719 |
|
720 val (next, elapsed) = running.minBy(_._2.ms) |
|
721 val (remaining, finished) = |
|
722 running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero) |
|
723 |
|
724 val running1 = |
|
725 remaining.map(pass_time(elapsed)).toMap ++ |
|
726 finished.map(_._1).flatMap(get_next).map(start) |
|
727 val (res, running2) = parallel_paths(running1) |
|
728 (res max running.size, running2) |
|
729 } |
|
730 |
|
731 parallel_paths(running.toMap)._1 |
|
732 } |
|
733 |
|
734 def select_next(state: Build_Process.State): List[Config] = { |
743 val resources = host_infos.available(state) |
735 val resources = host_infos.available(state) |
744 |
736 |
745 def best_threads(task: Build_Process.Task): Int = |
737 def best_threads(task: Build_Process.Task): Int = |
746 timing_data.best_threads(task.name, max_threads) |
738 timing_data.best_threads(task.name, max_threads) |
747 |
739 |
1000 val current = state.next_ready.filter(task => is_current(state, task.name)) |
992 val current = state.next_ready.filter(task => is_current(state, task.name)) |
1001 if (current.nonEmpty) current.map(_.name) |
993 if (current.nonEmpty) current.map(_.name) |
1002 else { |
994 else { |
1003 val start = Time.now() |
995 val start = Time.now() |
1004 |
996 |
1005 val new_schedule = scheduler.build_schedule(state).update(state) |
997 val new_schedule = scheduler.schedule(state).update(state) |
1006 val schedule = |
998 val schedule = |
1007 if (_schedule.is_empty) new_schedule |
999 if (_schedule.is_empty) new_schedule |
1008 else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering) |
1000 else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering) |
1009 |
1001 |
1010 val elapsed = Time.now() - start |
1002 val elapsed = Time.now() - start |
1228 for { |
1220 for { |
1229 is_critical <- is_criticals |
1221 is_critical <- is_criticals |
1230 parallel <- parallel_threads |
1222 parallel <- parallel_threads |
1231 machine_split <- machine_splits |
1223 machine_split <- machine_splits |
1232 } yield |
1224 } yield |
1233 Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure, |
1225 Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure) |
1234 context.build_uuid) |
1226 val default_heuristic = Default_Heuristic(timing_data.host_infos, context.build_options) |
1235 val default_heuristic = |
1227 val heuristics = default_heuristic :: path_time_heuristics |
1236 Default_Heuristic(timing_data, context.build_options, context.build_uuid) |
1228 Optimizer(heuristics.map(Generation_Scheme(_, timing_data, context.build_uuid))) |
1237 new Meta_Heuristic(default_heuristic :: path_time_heuristics) |
|
1238 } |
1229 } |
1239 |
1230 |
1240 override def open_build_process( |
1231 override def open_build_process( |
1241 context: Build.Context, |
1232 context: Build.Context, |
1242 progress: Progress, |
1233 progress: Progress, |
1316 |
1307 |
1317 val scheduler = build_engine.scheduler(timing_data, build_context) |
1308 val scheduler = build_engine.scheduler(timing_data, build_context) |
1318 def schedule_msg(res: Exn.Result[Schedule]): String = |
1309 def schedule_msg(res: Exn.Result[Schedule]): String = |
1319 res match { case Exn.Res(schedule) => schedule.message case _ => "" } |
1310 res match { case Exn.Res(schedule) => schedule.message case _ => "" } |
1320 |
1311 |
1321 Timing.timeit(scheduler.build_schedule(build_state), schedule_msg, output = progress.echo(_)) |
1312 Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_)) |
1322 } |
1313 } |
1323 |
1314 |
1324 using(store.open_server()) { server => |
1315 using(store.open_server()) { server => |
1325 using_optional(store.maybe_open_database_server(server = server)) { database_server => |
1316 using_optional(store.maybe_open_database_server(server = server)) { database_server => |
1326 using(log_store.open_database(server = server)) { log_database => |
1317 using(log_store.open_database(server = server)) { log_database => |