src/Pure/Build/build_schedule.scala
changeset 79594 f933e9153624
parent 79593 587a7dfeb03c
child 79614 58c0636e0ef5
equal deleted inserted replaced
79593:587a7dfeb03c 79594:f933e9153624
   354       if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length
   354       if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length
   355       else the_host(node_info).info.num_cpus
   355       else the_host(node_info).info.num_cpus
   356 
   356 
   357     def available(state: Build_Process.State): Resources = {
   357     def available(state: Build_Process.State): Resources = {
   358       val allocated =
   358       val allocated =
   359         state.running.values.map(_.node_info).groupMapReduce(the_host)(List(_))(_ ::: _)
   359         state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _)
   360       Resources(this, allocated)
   360       new Resources(this, allocated)
   361     }
   361     }
   362   }
   362   }
   363 
   363 
   364 
   364 
   365   /* offline tracking of job configurations and resource allocations */
   365   /* offline tracking of job configurations and resource allocations */
   367   case class Config(job_name: String, node_info: Node_Info) {
   367   case class Config(job_name: String, node_info: Node_Info) {
   368     def job_of(start_time: Time): Build_Process.Job =
   368     def job_of(start_time: Time): Build_Process.Job =
   369       Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
   369       Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
   370   }
   370   }
   371 
   371 
   372   case class Resources(
   372   class Resources(
   373     host_infos: Host_Infos,
   373     val host_infos: Host_Infos,
   374     allocated_nodes: Map[Host, List[Node_Info]]
   374     allocated_nodes: Map[String, List[Node_Info]]
   375   ) {
   375   ) {
   376     def unused_nodes(host: Host, threads: Int): List[Node_Info] =
   376     def unused_nodes(host: Host, threads: Int): List[Node_Info] =
   377       if (!available(host, threads)) Nil
   377       if (!available(host, threads)) Nil
   378       else {
   378       else {
   379         val node = next_node(host, threads)
   379         val node = next_node(host, threads)
   381       }
   381       }
   382 
   382 
   383     def unused_nodes(threads: Int): List[Node_Info] =
   383     def unused_nodes(threads: Int): List[Node_Info] =
   384       host_infos.hosts.flatMap(unused_nodes(_, threads))
   384       host_infos.hosts.flatMap(unused_nodes(_, threads))
   385 
   385 
   386     def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil)
   386     def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host.name, Nil)
   387 
   387 
   388     def allocate(node_info: Node_Info): Resources = {
   388     def allocate(node_info: Node_Info): Resources = {
   389       val host = host_infos.the_host(node_info)
   389       val host = host_infos.the_host(node_info)
   390       copy(allocated_nodes = allocated_nodes + (host -> (node_info :: allocated(host))))
   390       new Resources(host_infos, allocated_nodes + (host.name -> (node_info :: allocated(host))))
   391     }
   391     }
   392 
   392 
   393     def try_allocate_tasks(
   393     def try_allocate_tasks(
   394       hosts: List[(Host, Int)],
   394       hosts: List[(Host, Int)],
   395       tasks: List[(Build_Process.Task, Int, Int)],
   395       tasks: List[(Build_Process.Task, Int, Int)],
   545         val now = current_time + elapsed
   545         val now = current_time + elapsed
   546         val node = Schedule.Node(job.name, job.node_info, job.start_date, now - job.start_date.time)
   546         val node = Schedule.Node(job.name, job.node_info, job.start_date, now - job.start_date.time)
   547 
   547 
   548         val host_preds =
   548         val host_preds =
   549           for {
   549           for {
   550             (name, (pred_node, _)) <- finished.graph.iterator.toSet
   550             name <- finished.graph.keys
       
   551             pred_node = finished.graph.get_node(name)
   551             if pred_node.node_info.hostname == job.node_info.hostname
   552             if pred_node.node_info.hostname == job.node_info.hostname
   552             if pred_node.end.time <= node.start.time
   553             if pred_node.end.time <= node.start.time
   553           } yield name
   554           } yield name
   554         val build_preds =
   555         val build_preds =
   555           build_state.sessions.graph.imm_preds(job.name).filter(finished.graph.defined)
   556           build_state.sessions.graph.imm_preds(job.name).filter(finished.graph.defined)
   691     val maximals = build_graph.maximals
   692     val maximals = build_graph.maximals
   692 
   693 
   693     def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
   694     def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
   694     val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap
   695     val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap
   695 
   696 
       
   697     val best_threads =
       
   698       build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap
       
   699 
   696     def best_time(node: Node): Time = {
   700     def best_time(node: Node): Time = {
   697       val host = ordered_hosts.last
   701       val host = ordered_hosts.last
   698       val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus
   702       val threads = best_threads(node) min host.info.num_cpus
   699       timing_data.estimate(node, host.name, threads)
   703       timing_data.estimate(node, host.name, threads)
   700     }
   704     }
   701     val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
   705     val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap
   702 
   706 
   703     val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
   707     val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
   711     }
   715     }
   712 
   716 
   713     def path_max_times(minimals: List[Node]): Map[Node, Time] =
   717     def path_max_times(minimals: List[Node]): Map[Node, Time] =
   714       path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
   718       path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap
   715 
   719 
   716     def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
   720     val node_degrees =
   717       def start(node: Node): (Node, Time) = node -> best_times(node)
   721       build_graph.keys.map(node => node -> build_graph.imm_succs(node).size).toMap
   718 
   722 
   719       def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
   723     def parallel_paths(
   720         node -> (time - elapsed)
   724       running: List[(Node, Time)],
   721 
   725       nodes: Set[Node] = build_graph.keys.toSet,
   722       def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
   726       max: Int = Int.MaxValue
   723         if (running.isEmpty) (0, running)
   727     ): Int =
   724         else {
   728       if (nodes.nonEmpty && nodes.map(node_degrees.apply).max > max) max
   725           def get_next(node: Node): List[Node] =
   729       else {
   726             build_graph.imm_succs(node).filter(pred).filter(
   730         def start(node: Node): (Node, Time) = node -> best_times(node)
   727               build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList
   731 
   728 
   732         def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
   729           val (next, elapsed) = running.minBy(_._2.ms)
   733           node -> (time - elapsed)
   730           val (remaining, finished) =
   734 
   731             running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
   735         def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
   732 
   736           if (running.size >= max) (max, running)
   733           val running1 =
   737           else if (running.isEmpty) (0, running)
   734             remaining.map(pass_time(elapsed)).toMap ++
   738           else {
   735               finished.map(_._1).flatMap(get_next).map(start)
   739             def get_next(node: Node): List[Node] =
   736           val (res, running2) = parallel_paths(running1)
   740               build_graph.imm_succs(node).intersect(nodes).filter(
   737           (res max running.size, running2)
   741                 build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList
   738         }
   742 
   739 
   743             val (next, elapsed) = running.minBy(_._2.ms)
   740       parallel_paths(running.toMap)._1
   744             val (remaining, finished) =
   741     }
   745               running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
       
   746 
       
   747             val running1 =
       
   748               remaining.map(pass_time(elapsed)).toMap ++
       
   749                 finished.map(_._1).flatMap(get_next).map(start)
       
   750             val (res, running2) = parallel_paths(running1)
       
   751             (res max running.size, running2)
       
   752           }
       
   753 
       
   754         parallel_paths(running.toMap)._1
       
   755       }
   742 
   756 
   743     def select_next(state: Build_Process.State): List[Config] = {
   757     def select_next(state: Build_Process.State): List[Config] = {
   744       val resources = host_infos.available(state)
   758       val resources = host_infos.available(state)
   745 
   759 
   746       def best_threads(task: Build_Process.Task): Int =
   760       def best_threads(task: Build_Process.Task): Int = this.best_threads(task.name)
   747         timing_data.best_threads(task.name, max_threads)
       
   748 
   761 
   749       val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
   762       val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
   750 
   763 
   751       val available_nodes =
   764       val available_nodes =
   752         host_infos.available(state.copy(running = Map.empty))
   765         host_infos.available(state.copy(running = Map.empty))
   753           .unused_nodes(max_threads)
   766           .unused_nodes(max_threads)
   754           .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse
   767           .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse
   755 
   768 
   756       def remaining_time(node: Node): (Node, Time) =
   769       def remaining_time(node: Node): (Node, Time) =
   757         state.running.get(node) match {
   770         state.running.get(node) match {
   758           case None => node -> best_time(node)
   771           case None => node -> best_times(node)
   759           case Some(job) =>
   772           case Some(job) =>
   760             val estimate =
   773             val estimate =
   761               timing_data.estimate(job.name, job.node_info.hostname,
   774               timing_data.estimate(job.name, job.node_info.hostname,
   762                 host_infos.num_threads(job.node_info))
   775                 host_infos.num_threads(job.node_info))
   763             node -> ((Time.now() - job.start_date.time + estimate) max Time.zero)
   776             node -> ((Time.now() - job.start_date.time + estimate) max Time.zero)
   764         }
   777         }
   765 
   778 
   766       val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time))
       
   767       val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse
   779       val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse
   768 
   780       val is_parallelizable =
   769       if (max_parallel <= available_nodes.length) {
   781         available_nodes.length >= parallel_paths(
       
   782           state.ready.map(_.name).map(remaining_time),
       
   783           max = available_nodes.length + 1)
       
   784 
       
   785       if (is_parallelizable) {
   770         val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task)))
   786         val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task)))
   771         resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
   787         resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
   772       }
   788       }
   773       else {
   789       else {
   774         def is_critical(time: Time): Boolean =
   790         def is_critical(time: Time): Boolean =
   786         val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task)))
   802         val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task)))
   787 
   803 
   788         def parallel_threads(task: Build_Process.Task): Int =
   804         def parallel_threads(task: Build_Process.Task): Int =
   789           this.parallel_threads match {
   805           this.parallel_threads match {
   790             case Fixed_Thread(threads) => threads
   806             case Fixed_Thread(threads) => threads
   791             case Time_Based_Threads(f) => f(best_time(task.name))
   807             case Time_Based_Threads(f) => f(best_times(task.name))
   792           }
   808           }
   793 
   809 
   794         val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))
   810         val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))
   795 
   811 
   796         val max_critical_parallel =
   812         val max_critical_parallel =
   797           parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains)
   813           parallel_paths(critical_minimals.map(remaining_time), critical_nodes)
   798         val max_critical_hosts =
   814         val max_critical_hosts =
   799           available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length
   815           available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length
   800 
   816 
   801         val split =
   817         val split =
   802           this.host_criterion match {
   818           this.host_criterion match {
  1316 
  1332 
  1317       val scheduler = build_engine.scheduler(timing_data, build_context)
  1333       val scheduler = build_engine.scheduler(timing_data, build_context)
  1318       def schedule_msg(res: Exn.Result[Schedule]): String =
  1334       def schedule_msg(res: Exn.Result[Schedule]): String =
  1319         res match { case Exn.Res(schedule) => schedule.message case _ => "" }
  1335         res match { case Exn.Res(schedule) => schedule.message case _ => "" }
  1320 
  1336 
       
  1337       progress.echo("Building schedule...")
  1321       Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_))
  1338       Timing.timeit(scheduler.schedule(build_state), schedule_msg, output = progress.echo(_))
  1322     }
  1339     }
  1323 
  1340 
  1324     using(store.open_server()) { server =>
  1341     using(store.open_server()) { server =>
  1325       using_optional(store.maybe_open_database_server(server = server)) { database_server =>
  1342       using_optional(store.maybe_open_database_server(server = server)) { database_server =>