src/Pure/Build/build_schedule.scala
changeset 79880 a3d53f2bc41d
parent 79879 c00181ecf869
child 79891 d8b4bfe82bb5
equal deleted inserted replaced
79879:c00181ecf869 79880:a3d53f2bc41d
    45           (meta_info, build_info) <- build_history
    45           (meta_info, build_info) <- build_history
    46           build_host = meta_info.get_build_host
    46           build_host = meta_info.get_build_host
    47           (job_name, session_info) <- build_info.sessions.toList
    47           (job_name, session_info) <- build_info.sessions.toList
    48           if build_info.finished_sessions.contains(job_name)
    48           if build_info.finished_sessions.contains(job_name)
    49           hostname <- session_info.hostname.orElse(build_host).toList
    49           hostname <- session_info.hostname.orElse(build_host).toList
    50           host <- hosts.find(_.info.hostname == hostname).toList
    50           host <- hosts.find(_.name == hostname).toList
    51           threads = session_info.threads.getOrElse(host.info.num_cpus)
    51           threads = session_info.threads.getOrElse(host.num_cpus)
    52         } yield (job_name, hostname, threads) -> session_info.timing
    52         } yield (job_name, hostname, threads) -> session_info.timing
    53 
    53 
    54       val entries =
    54       val entries =
    55         if (measurements.isEmpty) {
    55         if (measurements.isEmpty) {
    56           val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last
    56           val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last
   330   }
   330   }
   331 
   331 
   332 
   332 
   333   /* host information */
   333   /* host information */
   334 
   334 
   335   case class Host(info: isabelle.Host.Info, build: Build_Cluster.Host) {
   335   case class Host(
   336     def name: String = info.hostname
   336     name: String,
   337     def num_cpus: Int = info.num_cpus
   337     num_cpus: Int,
   338     def max_threads(options: Options): Int = (options ++ build.options).threads(default = num_cpus)
   338     max_jobs: Int,
       
   339     benchmark_score: Double,
       
   340     numa: Boolean = false,
       
   341     numa_nodes: List[Int] = Nil,
       
   342     options: List[Options.Spec] = Nil
       
   343   ) {
       
   344     def max_threads(options: Options): Int = (options ++ this.options).threads(default = num_cpus)
   339   }
   345   }
   340 
   346 
   341   object Host_Infos {
   347   object Host_Infos {
   342     def load(build_hosts: List[Build_Cluster.Host], db: SQL.Database): Host_Infos = {
   348     def load(build_hosts: List[Build_Cluster.Host], db: SQL.Database): Host_Infos = {
   343       def get_host(build_host: Build_Cluster.Host): Host = {
   349       def get_host(build_host: Build_Cluster.Host): Host = {
       
   350         val name = build_host.name
   344         val info =
   351         val info =
   345           isabelle.Host.read_info(db, build_host.name).getOrElse(
   352           isabelle.Host.read_info(db, name).getOrElse(error("No info for host " + quote(name)))
   346             error("No benchmark for " + quote(build_host.name)))
   353         val score = info.benchmark_score.getOrElse(error("No benchmark for " + quote(name)))
   347         Host(info, build_host)
   354 
       
   355         Host(
       
   356           name = name,
       
   357           num_cpus = info.num_cpus,
       
   358           max_jobs = build_host.jobs,
       
   359           numa = build_host.numa,
       
   360           numa_nodes = info.numa_nodes,
       
   361           benchmark_score = score,
       
   362           options = build_host.options)
   348       }
   363       }
   349 
   364 
   350       new Host_Infos(build_hosts.map(get_host))
   365       new Host_Infos(build_hosts.map(get_host))
   351     }
   366     }
   352   }
   367   }
   355     require(hosts.nonEmpty)
   370     require(hosts.nonEmpty)
   356 
   371 
   357     private val by_hostname = hosts.map(host => host.name -> host).toMap
   372     private val by_hostname = hosts.map(host => host.name -> host).toMap
   358 
   373 
   359     def host_factor(from: Host, to: Host): Double =
   374     def host_factor(from: Host, to: Host): Double =
   360       from.info.benchmark_score.get / to.info.benchmark_score.get
   375       from.benchmark_score / to.benchmark_score
   361 
   376 
   362     val host_speeds: Ordering[Host] =
   377     val host_speeds: Ordering[Host] =
   363       Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) < 1)
   378       Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) < 1)
   364 
   379 
   365     def the_host(hostname: String): Host =
   380     def the_host(hostname: String): Host =
   366       by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname)))
   381       by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname)))
   367     def the_host(node_info: Node_Info): Host = the_host(node_info.hostname)
   382     def the_host(node_info: Node_Info): Host = the_host(node_info.hostname)
   368 
   383 
   369     def num_threads(node_info: Node_Info): Int =
   384     def num_threads(node_info: Node_Info): Int =
   370       if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length
   385       if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length
   371       else the_host(node_info).info.num_cpus
   386       else the_host(node_info).num_cpus
   372 
   387 
   373     def available(state: Build_Process.State): Resources = {
   388     def available(state: Build_Process.State): Resources = {
   374       val allocated =
   389       val allocated =
   375         state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _)
   390         state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _)
   376       new Resources(this, allocated)
   391       new Resources(this, allocated)
   414         case Nil => (Nil, this)
   429         case Nil => (Nil, this)
   415         case (task, min_threads, max_threads) :: tasks =>
   430         case (task, min_threads, max_threads) :: tasks =>
   416           val (config, resources) =
   431           val (config, resources) =
   417             hosts.find((host, _) => available(host, min_threads)) match {
   432             hosts.find((host, _) => available(host, min_threads)) match {
   418               case Some((host, host_max_threads)) =>
   433               case Some((host, host_max_threads)) =>
   419                 val free_threads = host.info.num_cpus - ((host.build.jobs - 1) * host_max_threads)
   434                 val free_threads = host.num_cpus - ((host.max_jobs - 1) * host_max_threads)
   420                 val node_info = next_node(host, (min_threads max free_threads) min max_threads)
   435                 val node_info = next_node(host, (min_threads max free_threads) min max_threads)
   421                 (Some(Config(task.name, node_info)), allocate(node_info))
   436                 (Some(Config(task.name, node_info)), allocate(node_info))
   422               case None => (None, this)
   437               case None => (None, this)
   423             }
   438             }
   424           val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks)
   439           val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks)
   425           (configs ++ config, resources1)
   440           (configs ++ config, resources1)
   426       }
   441       }
   427 
   442 
   428     def next_node(host: Host, threads: Int): Node_Info = {
   443     def next_node(host: Host, threads: Int): Node_Info = {
   429       val numa_node_num_cpus = host.info.num_cpus / (host.info.numa_nodes.length max 1)
   444       val numa_node_num_cpus = host.num_cpus / (host.numa_nodes.length max 1)
   430       def explicit_cpus(node_info: Node_Info): List[Int] =
   445       def explicit_cpus(node_info: Node_Info): List[Int] =
   431         if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList
   446         if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList
   432 
   447 
   433       val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _)
   448       val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _)
   434 
   449 
   435       val available_nodes = host.info.numa_nodes
   450       val available_nodes = host.numa_nodes
   436       val numa_node =
   451       val numa_node =
   437         if (!host.build.numa) None
   452         if (!host.numa) None
   438         else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption
   453         else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption
   439 
   454 
   440       val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet
   455       val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet
   441       val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList
   456       val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList
   442 
   457 
   446     }
   461     }
   447 
   462 
   448     def available(host: Host, threads: Int): Boolean = {
   463     def available(host: Host, threads: Int): Boolean = {
   449       val used = allocated(host)
   464       val used = allocated(host)
   450 
   465 
   451       if (used.length >= host.build.jobs) false
   466       if (used.length >= host.max_jobs) false
   452       else {
   467       else {
   453         if (host.info.numa_nodes.length <= 1)
   468         if (host.numa_nodes.length <= 1)
   454           used.map(host_infos.num_threads).sum + threads <= host.info.num_cpus
   469           used.map(host_infos.num_threads).sum + threads <= host.num_cpus
   455         else {
   470         else {
   456           def node_threads(n: Int): Int =
   471           def node_threads(n: Int): Int =
   457             used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum
   472             used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum
   458 
   473 
   459           host.info.numa_nodes.exists(
   474           host.numa_nodes.exists(
   460             node_threads(_) + threads <= host.info.num_cpus / host.info.numa_nodes.length)
   475             node_threads(_) + threads <= host.num_cpus / host.numa_nodes.length)
   461         }
   476         }
   462       }
   477       }
   463     }
   478     }
   464   }
   479   }
   465 
   480 
   690 
   705 
   691     /* pre-computed properties for efficient heuristic */
   706     /* pre-computed properties for efficient heuristic */
   692     val host_infos: Host_Infos = timing_data.host_infos
   707     val host_infos: Host_Infos = timing_data.host_infos
   693     val ordered_hosts: List[Host] = host_infos.hosts.sorted(host_infos.host_speeds)
   708     val ordered_hosts: List[Host] = host_infos.hosts.sorted(host_infos.host_speeds)
   694 
   709 
   695     val max_threads: Int = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit
   710     val max_threads: Int = host_infos.hosts.map(_.num_cpus).max min max_threads_limit
   696 
   711 
   697     type Node = String
   712     type Node = String
   698     val build_graph: Graph[Node, Sessions.Info] = sessions_structure.build_graph
   713     val build_graph: Graph[Node, Sessions.Info] = sessions_structure.build_graph
   699 
   714 
   700     val minimals: List[Node] = build_graph.minimals
   715     val minimals: List[Node] = build_graph.minimals
   707     val best_threads: Map[Node, Int] =
   722     val best_threads: Map[Node, Int] =
   708       build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap
   723       build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap
   709 
   724 
   710     def best_time(node: Node): Time = {
   725     def best_time(node: Node): Time = {
   711       val host = ordered_hosts.last
   726       val host = ordered_hosts.last
   712       val threads = best_threads(node) min host.info.num_cpus
   727       val threads = best_threads(node) min host.num_cpus
   713       timing_data.estimate(node, host.name, threads)
   728       timing_data.estimate(node, host.name, threads)
   714     }
   729     }
   715     val best_times: Map[Node, Time] = build_graph.keys.map(node => node -> best_time(node)).toMap
   730     val best_times: Map[Node, Time] = build_graph.keys.map(node => node -> best_time(node)).toMap
   716 
   731 
   717     val succs_max_time_ms: Map[Node, Long] = build_graph.node_height(best_times(_).ms)
   732     val succs_max_time_ms: Map[Node, Long] = build_graph.node_height(best_times(_).ms)
   828           this.host_criterion match {
   843           this.host_criterion match {
   829             case Critical_Nodes => max_critical_hosts
   844             case Critical_Nodes => max_critical_hosts
   830             case Fixed_Fraction(fraction) =>
   845             case Fixed_Fraction(fraction) =>
   831               ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts
   846               ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts
   832             case Host_Speed(min_factor) =>
   847             case Host_Speed(min_factor) =>
   833               val best = rev_ordered_hosts.head._1.info.benchmark_score.get
   848               val best = rev_ordered_hosts.head._1.benchmark_score
   834               val num_fast =
   849               val num_fast =
   835                 rev_ordered_hosts.count(_._1.info.benchmark_score.exists(_ >= best * min_factor))
   850                 rev_ordered_hosts.count(_._1.benchmark_score >= best * min_factor)
   836               num_fast min max_critical_hosts
   851               num_fast min max_critical_hosts
   837           }
   852           }
   838 
   853 
   839         val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split)
   854         val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split)
   840 
   855