src/Pure/Build/build_schedule.scala
changeset 79592 7db599be70cc
parent 79534 1dcc97227442
child 79593 587a7dfeb03c
equal deleted inserted replaced
79591:6e5f40cfa877 79592:7db599be70cc
   554     }
   554     }
   555 
   555 
   556     def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
   556     def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
   557   }
   557   }
   558 
   558 
   559   trait Scheduler { def build_schedule(build_state: Build_Process.State): Schedule }
   559   trait Scheduler { def schedule(build_state: Build_Process.State): Schedule }
   560 
   560 
   561   abstract class Heuristic(timing_data: Timing_Data, build_uuid: String)
   561   trait Priority_Rule { def select_next(state: Build_Process.State): List[Config] }
   562     extends Scheduler {
   562 
   563     val host_infos = timing_data.host_infos
   563   case class Generation_Scheme(
   564     val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
   564     priority_rule: Priority_Rule,
   565 
   565     timing_data: Timing_Data,
   566     def next(state: Build_Process.State): List[Config]
   566     build_uuid: String
   567 
   567   ) extends Scheduler {
   568     def build_schedule(build_state: Build_Process.State): Schedule = {
   568     def schedule(build_state: Build_Process.State): Schedule = {
   569       @tailrec
   569       @tailrec
   570       def simulate(state: State): State =
   570       def simulate(state: State): State =
   571         if (state.is_finished) state
   571         if (state.is_finished) state
   572         else {
   572         else {
   573           val state1 = next(state.build_state).foldLeft(state)(_.start(_)).step(timing_data)
   573           val state1 =
       
   574             priority_rule
       
   575               .select_next(state.build_state)
       
   576               .foldLeft(state)(_.start(_))
       
   577               .step(timing_data)
   574           simulate(state1)
   578           simulate(state1)
   575         }
   579         }
   576 
   580 
   577       val start = Date.now()
   581       val start = Date.now()
       
   582       val name = "generation scheme (" + priority_rule + ")"
   578       val end_state =
   583       val end_state =
   579         simulate(State(build_state, start.time, Schedule(build_uuid, toString, start, Graph.empty)))
   584         simulate(State(build_state, start.time, Schedule(build_uuid, name, start, Graph.empty)))
   580 
   585 
   581       end_state.finished
   586       end_state.finished
   582     }
   587     }
   583   }
   588   }
   584 
   589 
   585   class Default_Heuristic(timing_data: Timing_Data, options: Options, build_uuid: String)
   590   case class Optimizer(schedulers: List[Scheduler]) extends Scheduler {
   586     extends Heuristic(timing_data, build_uuid) {
   591     require(schedulers.nonEmpty)
   587     override def toString: String = "default build heuristic"
   592 
       
   593     def schedule(state: Build_Process.State): Schedule =
       
   594       schedulers.map(_.schedule(state)).minBy(_.duration.ms)
       
   595   }
       
   596 
       
   597 
       
   598   /* priority rules */
       
   599 
       
   600   class Default_Heuristic(host_infos: Host_Infos, options: Options) extends Priority_Rule {
       
   601     override def toString: String = "default heuristic"
   588 
   602 
   589     def host_threads(host: Host): Int = {
   603     def host_threads(host: Host): Int = {
   590       val m = (options ++ host.build.options).int("threads")
   604       val m = (options ++ host.build.options).int("threads")
   591       if (m > 0) m else (host.num_cpus max 1) min 8
   605       if (m > 0) m else (host.num_cpus max 1) min 8
   592     }
   606     }
   593 
   607 
   594     def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] =
   608     def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] =
   595       sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _))
   609       sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _))
   596 
   610 
   597     def next(state: Build_Process.State): List[Config] = {
   611     def select_next(state: Build_Process.State): List[Config] = {
   598       val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name)
   612       val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name)
   599       val resources = host_infos.available(state)
   613       val resources = host_infos.available(state)
   600 
   614 
   601       host_infos.hosts.foldLeft((sorted_jobs, List.empty[Config])) {
   615       host_infos.hosts.foldLeft((sorted_jobs, List.empty[Config])) {
   602         case ((jobs, res), host) =>
   616         case ((jobs, res), host) =>
   605           (jobs.filterNot(config_jobs.contains), configs ::: res)
   619           (jobs.filterNot(config_jobs.contains), configs ::: res)
   606       }._2
   620       }._2
   607     }
   621     }
   608   }
   622   }
   609 
   623 
   610   class Meta_Heuristic(heuristics: List[Heuristic]) extends Scheduler {
       
   611     require(heuristics.nonEmpty)
       
   612 
       
   613     def best_result(state: Build_Process.State): (Heuristic, Schedule) =
       
   614       heuristics.map(heuristic =>
       
   615         heuristic -> heuristic.build_schedule(state)).minBy(_._2.duration.ms)
       
   616 
       
   617     def next(state: Build_Process.State): List[Config] = best_result(state)._1.next(state)
       
   618 
       
   619     def build_schedule(state: Build_Process.State): Schedule = best_result(state)._2
       
   620   }
       
   621 
       
   622 
       
   623   /* heuristics */
       
   624 
       
   625   abstract class Path_Heuristic(
       
   626     timing_data: Timing_Data,
       
   627     sessions_structure: Sessions.Structure,
       
   628     max_threads_limit: Int,
       
   629     build_uuid: String
       
   630   ) extends Heuristic(timing_data, build_uuid) {
       
   631     /* pre-computed properties for efficient heuristic */
       
   632 
       
   633     val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
       
   634 
       
   635     type Node = String
       
   636     val build_graph = sessions_structure.build_graph
       
   637 
       
   638     val minimals = build_graph.minimals
       
   639     val maximals = build_graph.maximals
       
   640 
       
   641     def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
       
   642     val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap
       
   643 
       
   644     def best_time(node: Node): Time = {
       
   645       val host = ordered_hosts.last
       
   646       val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus
       
   647       timing_data.estimate(node, host.name, threads)
       
   648     }
       
   649     val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
       
   650 
       
   651     val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
       
   652     def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node)
       
   653     def max_time(task: Build_Process.Task): Time = max_time(task.name)
       
   654 
       
   655     def path_times(minimals: List[Node]): Map[Node, Time] = {
       
   656       def time_ms(node: Node): Long = best_times(node).ms
       
   657       val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals)
       
   658       path_times_ms.view.mapValues(Time.ms).toMap
       
   659     }
       
   660 
       
   661     def path_max_times(minimals: List[Node]): Map[Node, Time] =
       
   662       path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
       
   663 
       
   664     def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
       
   665       def start(node: Node): (Node, Time) = node -> best_times(node)
       
   666 
       
   667       def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
       
   668         node -> (time - elapsed)
       
   669 
       
   670       def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
       
   671         if (running.isEmpty) (0, running)
       
   672         else {
       
   673           def get_next(node: Node): List[Node] =
       
   674             build_graph.imm_succs(node).filter(pred).filter(
       
   675               build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList
       
   676 
       
   677           val (next, elapsed) = running.minBy(_._2.ms)
       
   678           val (remaining, finished) =
       
   679             running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
       
   680 
       
   681           val running1 =
       
   682             remaining.map(pass_time(elapsed)).toMap ++
       
   683               finished.map(_._1).flatMap(get_next).map(start)
       
   684           val (res, running2) = parallel_paths(running1)
       
   685           (res max running.size, running2)
       
   686         }
       
   687 
       
   688       parallel_paths(running.toMap)._1
       
   689     }
       
   690   }
       
   691 
       
   692 
       
   693   object Path_Time_Heuristic {
   624   object Path_Time_Heuristic {
   694     sealed trait Critical_Criterion
   625     sealed trait Critical_Criterion
   695     case class Absolute_Time(time: Time) extends Critical_Criterion {
   626     case class Absolute_Time(time: Time) extends Critical_Criterion {
   696       override def toString: String = "absolute time (" + time.message_hms + ")"
   627       override def toString: String = "absolute time (" + time.message_hms + ")"
   697     }
   628     }
   723     is_critical: Path_Time_Heuristic.Critical_Criterion,
   654     is_critical: Path_Time_Heuristic.Critical_Criterion,
   724     parallel_threads: Path_Time_Heuristic.Parallel_Strategy,
   655     parallel_threads: Path_Time_Heuristic.Parallel_Strategy,
   725     host_criterion: Path_Time_Heuristic.Host_Criterion,
   656     host_criterion: Path_Time_Heuristic.Host_Criterion,
   726     timing_data: Timing_Data,
   657     timing_data: Timing_Data,
   727     sessions_structure: Sessions.Structure,
   658     sessions_structure: Sessions.Structure,
   728     build_uuid: String,
       
   729     max_threads_limit: Int = 8
   659     max_threads_limit: Int = 8
   730   ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit, build_uuid) {
   660   ) extends Priority_Rule {
   731     import Path_Time_Heuristic.*
   661     import Path_Time_Heuristic.*
   732 
   662 
   733     override def toString: Node = {
   663     override def toString: Node = {
   734       val params =
   664       val params =
   735         List(
   665         List(
   737           "parallel: " + parallel_threads,
   667           "parallel: " + parallel_threads,
   738           "fast hosts: " + host_criterion)
   668           "fast hosts: " + host_criterion)
   739       "path time heuristic (" + params.mkString(", ") + ")"
   669       "path time heuristic (" + params.mkString(", ") + ")"
   740     }
   670     }
   741 
   671 
   742     def next(state: Build_Process.State): List[Config] = {
   672     /* pre-computed properties for efficient heuristic */
       
   673     val host_infos = timing_data.host_infos
       
   674     val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)
       
   675 
       
   676     val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
       
   677 
       
   678     type Node = String
       
   679     val build_graph = sessions_structure.build_graph
       
   680 
       
   681     val minimals = build_graph.minimals
       
   682     val maximals = build_graph.maximals
       
   683 
       
   684     def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
       
   685     val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap
       
   686 
       
   687     def best_time(node: Node): Time = {
       
   688       val host = ordered_hosts.last
       
   689       val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus
       
   690       timing_data.estimate(node, host.name, threads)
       
   691     }
       
   692     val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
       
   693 
       
   694     val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
       
   695     def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node)
       
   696     def max_time(task: Build_Process.Task): Time = max_time(task.name)
       
   697 
       
   698     def path_times(minimals: List[Node]): Map[Node, Time] = {
       
   699       def time_ms(node: Node): Long = best_times(node).ms
       
   700       val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals)
       
   701       path_times_ms.view.mapValues(Time.ms).toMap
       
   702     }
       
   703 
       
   704     def path_max_times(minimals: List[Node]): Map[Node, Time] =
       
   705       path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
       
   706 
       
   707     def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
       
   708       def start(node: Node): (Node, Time) = node -> best_times(node)
       
   709 
       
   710       def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
       
   711         node -> (time - elapsed)
       
   712 
       
   713       def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
       
   714         if (running.isEmpty) (0, running)
       
   715         else {
       
   716           def get_next(node: Node): List[Node] =
       
   717             build_graph.imm_succs(node).filter(pred).filter(
       
   718               build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList
       
   719 
       
   720           val (next, elapsed) = running.minBy(_._2.ms)
       
   721           val (remaining, finished) =
       
   722             running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
       
   723 
       
   724           val running1 =
       
   725             remaining.map(pass_time(elapsed)).toMap ++
       
   726               finished.map(_._1).flatMap(get_next).map(start)
       
   727           val (res, running2) = parallel_paths(running1)
       
   728           (res max running.size, running2)
       
   729         }
       
   730 
       
   731       parallel_paths(running.toMap)._1
       
   732     }
       
   733 
       
   734     def select_next(state: Build_Process.State): List[Config] = {
   743       val resources = host_infos.available(state)
   735       val resources = host_infos.available(state)
   744 
   736 
   745       def best_threads(task: Build_Process.Task): Int =
   737       def best_threads(task: Build_Process.Task): Int =
   746         timing_data.best_threads(task.name, max_threads)
   738         timing_data.best_threads(task.name, max_threads)
   747 
   739 
  1000         val current = state.next_ready.filter(task => is_current(state, task.name))
   992         val current = state.next_ready.filter(task => is_current(state, task.name))
  1001         if (current.nonEmpty) current.map(_.name)
   993         if (current.nonEmpty) current.map(_.name)
  1002         else {
   994         else {
  1003           val start = Time.now()
   995           val start = Time.now()
  1004 
   996 
  1005           val new_schedule = scheduler.build_schedule(state).update(state)
   997           val new_schedule = scheduler.schedule(state).update(state)
  1006           val schedule =
   998           val schedule =
  1007             if (_schedule.is_empty) new_schedule
   999             if (_schedule.is_empty) new_schedule
  1008             else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering)
  1000             else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering)
  1009 
  1001 
  1010           val elapsed = Time.now() - start
  1002           val elapsed = Time.now() - start
  1228         for {
  1220         for {
  1229           is_critical <- is_criticals
  1221           is_critical <- is_criticals
  1230           parallel <- parallel_threads
  1222           parallel <- parallel_threads
  1231           machine_split <- machine_splits
  1223           machine_split <- machine_splits
  1232         } yield
  1224         } yield
  1233           Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure,
  1225           Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure)
  1234             context.build_uuid)
  1226       val default_heuristic = Default_Heuristic(timing_data.host_infos, context.build_options)
  1235       val default_heuristic =
  1227       val heuristics = default_heuristic :: path_time_heuristics
  1236         Default_Heuristic(timing_data, context.build_options, context.build_uuid)
  1228       Optimizer(heuristics.map(Generation_Scheme(_, timing_data, context.build_uuid)))
  1237       new Meta_Heuristic(default_heuristic :: path_time_heuristics)
       
  1238     }
  1229     }
  1239 
  1230 
  1240     override def open_build_process(
  1231     override def open_build_process(
  1241       context: Build.Context,
  1232       context: Build.Context,
  1242       progress: Progress,
  1233       progress: Progress,
  1316 
  1307 
  1317       val scheduler = build_engine.scheduler(timing_data, build_context)
  1308       val scheduler = build_engine.scheduler(timing_data, build_context)
  1318       def schedule_msg(res: Exn.Result[Schedule]): String =
  1309       def schedule_msg(res: Exn.Result[Schedule]): String =
  1319         res match { case Exn.Res(schedule) => schedule.message case _ => "" }
  1310         res match { case Exn.Res(schedule) => schedule.message case _ => "" }
  1320 
  1311 
  1321       Timing.timeit(scheduler.build_schedule(build_state), schedule_msg, output = progress.echo(_))
  1312       Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_))
  1322     }
  1313     }
  1323 
  1314 
  1324     using(store.open_server()) { server =>
  1315     using(store.open_server()) { server =>
  1325       using_optional(store.maybe_open_database_server(server = server)) { database_server =>
  1316       using_optional(store.maybe_open_database_server(server = server)) { database_server =>
  1326         using(log_store.open_database(server = server)) { log_database =>
  1317         using(log_store.open_database(server = server)) { log_database =>