45 (meta_info, build_info) <- build_history |
45 (meta_info, build_info) <- build_history |
46 build_host = meta_info.get_build_host |
46 build_host = meta_info.get_build_host |
47 (job_name, session_info) <- build_info.sessions.toList |
47 (job_name, session_info) <- build_info.sessions.toList |
48 if build_info.finished_sessions.contains(job_name) |
48 if build_info.finished_sessions.contains(job_name) |
49 hostname <- session_info.hostname.orElse(build_host).toList |
49 hostname <- session_info.hostname.orElse(build_host).toList |
50 host <- hosts.find(_.info.hostname == hostname).toList |
50 host <- hosts.find(_.name == hostname).toList |
51 threads = session_info.threads.getOrElse(host.info.num_cpus) |
51 threads = session_info.threads.getOrElse(host.num_cpus) |
52 } yield (job_name, hostname, threads) -> session_info.timing |
52 } yield (job_name, hostname, threads) -> session_info.timing |
53 |
53 |
54 val entries = |
54 val entries = |
55 if (measurements.isEmpty) { |
55 if (measurements.isEmpty) { |
56 val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last |
56 val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last |
330 } |
330 } |
331 |
331 |
332 |
332 |
333 /* host information */ |
333 /* host information */ |
334 |
334 |
335 case class Host(info: isabelle.Host.Info, build: Build_Cluster.Host) { |
335 case class Host( |
336 def name: String = info.hostname |
336 name: String, |
337 def num_cpus: Int = info.num_cpus |
337 num_cpus: Int, |
338 def max_threads(options: Options): Int = (options ++ build.options).threads(default = num_cpus) |
338 max_jobs: Int, |
|
339 benchmark_score: Double, |
|
340 numa: Boolean = false, |
|
341 numa_nodes: List[Int] = Nil, |
|
342 options: List[Options.Spec] = Nil |
|
343 ) { |
|
344 def max_threads(options: Options): Int = (options ++ this.options).threads(default = num_cpus) |
339 } |
345 } |
340 |
346 |
341 object Host_Infos { |
347 object Host_Infos { |
342 def load(build_hosts: List[Build_Cluster.Host], db: SQL.Database): Host_Infos = { |
348 def load(build_hosts: List[Build_Cluster.Host], db: SQL.Database): Host_Infos = { |
343 def get_host(build_host: Build_Cluster.Host): Host = { |
349 def get_host(build_host: Build_Cluster.Host): Host = { |
|
350 val name = build_host.name |
344 val info = |
351 val info = |
345 isabelle.Host.read_info(db, build_host.name).getOrElse( |
352 isabelle.Host.read_info(db, name).getOrElse(error("No info for host " + quote(name))) |
346 error("No benchmark for " + quote(build_host.name))) |
353 val score = info.benchmark_score.getOrElse(error("No benchmark for " + quote(name))) |
347 Host(info, build_host) |
354 |
|
355 Host( |
|
356 name = name, |
|
357 num_cpus = info.num_cpus, |
|
358 max_jobs = build_host.jobs, |
|
359 numa = build_host.numa, |
|
360 numa_nodes = info.numa_nodes, |
|
361 benchmark_score = score, |
|
362 options = build_host.options) |
348 } |
363 } |
349 |
364 |
350 new Host_Infos(build_hosts.map(get_host)) |
365 new Host_Infos(build_hosts.map(get_host)) |
351 } |
366 } |
352 } |
367 } |
355 require(hosts.nonEmpty) |
370 require(hosts.nonEmpty) |
356 |
371 |
357 private val by_hostname = hosts.map(host => host.name -> host).toMap |
372 private val by_hostname = hosts.map(host => host.name -> host).toMap |
358 |
373 |
359 def host_factor(from: Host, to: Host): Double = |
374 def host_factor(from: Host, to: Host): Double = |
360 from.info.benchmark_score.get / to.info.benchmark_score.get |
375 from.benchmark_score / to.benchmark_score |
361 |
376 |
362 val host_speeds: Ordering[Host] = |
377 val host_speeds: Ordering[Host] = |
363 Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) < 1) |
378 Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) < 1) |
364 |
379 |
365 def the_host(hostname: String): Host = |
380 def the_host(hostname: String): Host = |
366 by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname))) |
381 by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname))) |
367 def the_host(node_info: Node_Info): Host = the_host(node_info.hostname) |
382 def the_host(node_info: Node_Info): Host = the_host(node_info.hostname) |
368 |
383 |
369 def num_threads(node_info: Node_Info): Int = |
384 def num_threads(node_info: Node_Info): Int = |
370 if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length |
385 if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length |
371 else the_host(node_info).info.num_cpus |
386 else the_host(node_info).num_cpus |
372 |
387 |
373 def available(state: Build_Process.State): Resources = { |
388 def available(state: Build_Process.State): Resources = { |
374 val allocated = |
389 val allocated = |
375 state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _) |
390 state.running.values.map(_.node_info).groupMapReduce(_.hostname)(List(_))(_ ::: _) |
376 new Resources(this, allocated) |
391 new Resources(this, allocated) |
414 case Nil => (Nil, this) |
429 case Nil => (Nil, this) |
415 case (task, min_threads, max_threads) :: tasks => |
430 case (task, min_threads, max_threads) :: tasks => |
416 val (config, resources) = |
431 val (config, resources) = |
417 hosts.find((host, _) => available(host, min_threads)) match { |
432 hosts.find((host, _) => available(host, min_threads)) match { |
418 case Some((host, host_max_threads)) => |
433 case Some((host, host_max_threads)) => |
419 val free_threads = host.info.num_cpus - ((host.build.jobs - 1) * host_max_threads) |
434 val free_threads = host.num_cpus - ((host.max_jobs - 1) * host_max_threads) |
420 val node_info = next_node(host, (min_threads max free_threads) min max_threads) |
435 val node_info = next_node(host, (min_threads max free_threads) min max_threads) |
421 (Some(Config(task.name, node_info)), allocate(node_info)) |
436 (Some(Config(task.name, node_info)), allocate(node_info)) |
422 case None => (None, this) |
437 case None => (None, this) |
423 } |
438 } |
424 val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks) |
439 val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks) |
425 (configs ++ config, resources1) |
440 (configs ++ config, resources1) |
426 } |
441 } |
427 |
442 |
428 def next_node(host: Host, threads: Int): Node_Info = { |
443 def next_node(host: Host, threads: Int): Node_Info = { |
429 val numa_node_num_cpus = host.info.num_cpus / (host.info.numa_nodes.length max 1) |
444 val numa_node_num_cpus = host.num_cpus / (host.numa_nodes.length max 1) |
430 def explicit_cpus(node_info: Node_Info): List[Int] = |
445 def explicit_cpus(node_info: Node_Info): List[Int] = |
431 if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList |
446 if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList |
432 |
447 |
433 val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _) |
448 val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _) |
434 |
449 |
435 val available_nodes = host.info.numa_nodes |
450 val available_nodes = host.numa_nodes |
436 val numa_node = |
451 val numa_node = |
437 if (!host.build.numa) None |
452 if (!host.numa) None |
438 else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption |
453 else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption |
439 |
454 |
440 val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet |
455 val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet |
441 val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList |
456 val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList |
442 |
457 |
446 } |
461 } |
447 |
462 |
448 def available(host: Host, threads: Int): Boolean = { |
463 def available(host: Host, threads: Int): Boolean = { |
449 val used = allocated(host) |
464 val used = allocated(host) |
450 |
465 |
451 if (used.length >= host.build.jobs) false |
466 if (used.length >= host.max_jobs) false |
452 else { |
467 else { |
453 if (host.info.numa_nodes.length <= 1) |
468 if (host.numa_nodes.length <= 1) |
454 used.map(host_infos.num_threads).sum + threads <= host.info.num_cpus |
469 used.map(host_infos.num_threads).sum + threads <= host.num_cpus |
455 else { |
470 else { |
456 def node_threads(n: Int): Int = |
471 def node_threads(n: Int): Int = |
457 used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum |
472 used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum |
458 |
473 |
459 host.info.numa_nodes.exists( |
474 host.numa_nodes.exists( |
460 node_threads(_) + threads <= host.info.num_cpus / host.info.numa_nodes.length) |
475 node_threads(_) + threads <= host.num_cpus / host.numa_nodes.length) |
461 } |
476 } |
462 } |
477 } |
463 } |
478 } |
464 } |
479 } |
465 |
480 |
690 |
705 |
691 /* pre-computed properties for efficient heuristic */ |
706 /* pre-computed properties for efficient heuristic */ |
692 val host_infos: Host_Infos = timing_data.host_infos |
707 val host_infos: Host_Infos = timing_data.host_infos |
693 val ordered_hosts: List[Host] = host_infos.hosts.sorted(host_infos.host_speeds) |
708 val ordered_hosts: List[Host] = host_infos.hosts.sorted(host_infos.host_speeds) |
694 |
709 |
695 val max_threads: Int = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit |
710 val max_threads: Int = host_infos.hosts.map(_.num_cpus).max min max_threads_limit |
696 |
711 |
697 type Node = String |
712 type Node = String |
698 val build_graph: Graph[Node, Sessions.Info] = sessions_structure.build_graph |
713 val build_graph: Graph[Node, Sessions.Info] = sessions_structure.build_graph |
699 |
714 |
700 val minimals: List[Node] = build_graph.minimals |
715 val minimals: List[Node] = build_graph.minimals |
707 val best_threads: Map[Node, Int] = |
722 val best_threads: Map[Node, Int] = |
708 build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap |
723 build_graph.keys.map(node => node -> timing_data.best_threads(node, max_threads)).toMap |
709 |
724 |
710 def best_time(node: Node): Time = { |
725 def best_time(node: Node): Time = { |
711 val host = ordered_hosts.last |
726 val host = ordered_hosts.last |
712 val threads = best_threads(node) min host.info.num_cpus |
727 val threads = best_threads(node) min host.num_cpus |
713 timing_data.estimate(node, host.name, threads) |
728 timing_data.estimate(node, host.name, threads) |
714 } |
729 } |
715 val best_times: Map[Node, Time] = build_graph.keys.map(node => node -> best_time(node)).toMap |
730 val best_times: Map[Node, Time] = build_graph.keys.map(node => node -> best_time(node)).toMap |
716 |
731 |
717 val succs_max_time_ms: Map[Node, Long] = build_graph.node_height(best_times(_).ms) |
732 val succs_max_time_ms: Map[Node, Long] = build_graph.node_height(best_times(_).ms) |
828 this.host_criterion match { |
843 this.host_criterion match { |
829 case Critical_Nodes => max_critical_hosts |
844 case Critical_Nodes => max_critical_hosts |
830 case Fixed_Fraction(fraction) => |
845 case Fixed_Fraction(fraction) => |
831 ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts |
846 ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts |
832 case Host_Speed(min_factor) => |
847 case Host_Speed(min_factor) => |
833 val best = rev_ordered_hosts.head._1.info.benchmark_score.get |
848 val best = rev_ordered_hosts.head._1.benchmark_score |
834 val num_fast = |
849 val num_fast = |
835 rev_ordered_hosts.count(_._1.info.benchmark_score.exists(_ >= best * min_factor)) |
850 rev_ordered_hosts.count(_._1.benchmark_score >= best * min_factor) |
836 num_fast min max_critical_hosts |
851 num_fast min max_critical_hosts |
837 } |
852 } |
838 |
853 |
839 val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split) |
854 val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split) |
840 |
855 |