src/Pure/Tools/build_schedule.scala
author Fabian Huch <huch@in.tum.de>
Wed, 06 Dec 2023 18:28:15 +0100
changeset 79182 6202d0ff36b4
parent 79181 9d6d559c9fde
child 79183 32d00ec387f4
permissions -rw-r--r--
added build schedule command-line wrapper;

/*  Title:      Pure/Tools/build_schedule.scala
    Author:     Fabian Huch, TU Muenchen

Build schedule generated by Heuristic methods, allowing for more efficient builds.
 */
package isabelle


import Host.Node_Info
import scala.annotation.tailrec
import scala.collection.mutable


object Build_Schedule {
  /* organized historic timing information (extracted from build logs) */

  case class Timing_Entry(job_name: String, hostname: String, threads: Int, timing: Timing) {
    def elapsed: Time = timing.elapsed
    def proper_cpu: Option[Time] = timing.cpu.proper_ms.map(Time.ms)
  }
  case class Timing_Entries(entries: List[Timing_Entry]) {
    require(entries.nonEmpty)

    def is_empty = entries.isEmpty
    def size = entries.length

    lazy val by_job = entries.groupBy(_.job_name).view.mapValues(Timing_Entries(_)).toMap
    lazy val by_threads = entries.groupBy(_.threads).view.mapValues(Timing_Entries(_)).toMap
    lazy val by_hostname = entries.groupBy(_.hostname).view.mapValues(Timing_Entries(_)).toMap

    def mean_time: Time = Timing_Data.mean_time(entries.map(_.elapsed))
    def best_entry: Timing_Entry = entries.minBy(_.elapsed.ms)
  }

  class Timing_Data private(data: Timing_Entries, val host_infos: Host_Infos) {
    private def inflection_point(last_mono: Int, next: Int): Int =
      last_mono + ((next - last_mono) / 2)

    def best_threads(job_name: String, max_threads: Int): Int = {
      val worse_threads =
        data.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap {
          case (hostname, data) =>
            val best_threads = data.best_entry.threads
            data.by_threads.keys.toList.sorted.find(_ > best_threads).map(
              inflection_point(best_threads, _))
        }
      (max_threads :: worse_threads).min
    }

    private def hostname_factor(from: String, to: String): Double =
      host_infos.host_factor(host_infos.the_host(from), host_infos.the_host(to))

    private def approximate_threads(entries_unsorted: List[(Int, Time)], threads: Int): Time = {
      val entries = entries_unsorted.sortBy(_._1)

      def sorted_prefix[A](xs: List[A], f: A => Long): List[A] =
        xs match {
          case x1 :: x2 :: xs =>
            if (f(x1) <= f(x2)) x1 :: sorted_prefix(x2 :: xs, f) else x1 :: Nil
          case xs => xs
        }

      def linear(p0: (Int, Time), p1: (Int, Time)): Time = {
        val a = (p1._2 - p0._2).scale(1.0 / (p1._1 - p0._1))
        val b = p0._2 - a.scale(p0._1)
        Time.ms((a.scale(threads) + b).ms max 0)
      }

      val mono_prefix = sorted_prefix(entries, e => -e._2.ms)

      val is_mono = entries == mono_prefix
      val in_prefix = mono_prefix.length > 1 && threads <= mono_prefix.last._1
      val in_inflection =
        !is_mono && mono_prefix.length > 1 && threads < entries.drop(mono_prefix.length).head._1
      if (is_mono || in_prefix || in_inflection) {
        // Model with Amdahl's law
        val t_p =
          Timing_Data.median_time(for {
            (n, t0) <- mono_prefix
            (m, t1) <- mono_prefix
            if m != n
          } yield (t0 - t1).scale(n.toDouble * m / (m - n)))
        val t_c =
          Timing_Data.median_time(for ((n, t) <- mono_prefix) yield t - t_p.scale(1.0 / n))

        def model(threads: Int): Time = Time.ms((t_c + t_p.scale(1.0 / threads)).ms max 0)

        if (is_mono || in_prefix) model(threads)
        else {
          val post_inflection = entries.drop(mono_prefix.length).head
          val inflection_threads = inflection_point(mono_prefix.last._1, post_inflection._1)

          if (threads <= inflection_threads) model(threads)
          else linear((inflection_threads, model(inflection_threads)), post_inflection)
        }
      } else {
        // Piecewise linear
        val (p0, p1) =
          if (entries.head._1 < threads && threads < entries.last._1) {
            val split = entries.partition(_._1 < threads)
            (split._1.last, split._2.head)
          } else {
            val piece = if (threads < entries.head._1) entries.take(2) else entries.takeRight(2)
            (piece.head, piece.last)
          }

        linear(p0, p1)
      }
    }

    private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = {
      def unify(hostname: String, data: Timing_Entries) =
        data.mean_time.scale(hostname_factor(hostname, on_host))

      for {
        data <- data.by_job.get(job_name).toList
        (threads, data) <- data.by_threads
        entries = data.by_hostname.toList.map(unify)
      } yield threads -> Timing_Data.median_time(entries)
    }

    def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = {
      def try_approximate(data: Timing_Entries): Option[Time] = {
        val entries =
          data.by_threads.toList match {
            case List((i, Timing_Entries(List(entry)))) if i != 1 =>
              (i, data.mean_time) :: entry.proper_cpu.map(1 -> _).toList
            case entries => entries.map((threads, data) => threads -> data.mean_time)
          }
        if (entries.size < 2) None else Some(approximate_threads(entries, threads))
      }

      for {
        data <- data.by_job.get(job_name)
        data <- data.by_hostname.get(hostname)
        time <- data.by_threads.get(threads).map(_.mean_time).orElse(try_approximate(data))
      } yield time
    }

    def global_threads_factor(from: Int, to: Int): Double = {
      def median(xs: Iterable[Double]): Double = xs.toList.sorted.apply(xs.size / 2)

      val estimates =
        for {
          (hostname, data) <- data.by_hostname
          job_name <- data.by_job.keys
          from_time <- estimate_threads(job_name, hostname, from)
          to_time <- estimate_threads(job_name, hostname, to)
        } yield from_time.ms.toDouble / to_time.ms

      if (estimates.nonEmpty) median(estimates)
      else {
        // unify hosts
        val estimates =
          for {
            (job_name, data) <- data.by_job
            hostname = data.by_hostname.keys.head
            entries = unify_hosts(job_name, hostname)
            if entries.length > 1
          } yield
            approximate_threads(entries, from).ms.toDouble / approximate_threads(entries, to).ms

        if (estimates.nonEmpty) median(estimates)
        else from.toDouble / to.toDouble
      }
    }

    private var cache: Map[(String, String, Int), Time] = Map.empty
    def estimate(job_name: String, hostname: String, threads: Int): Time = {
      def estimate: Time =
        data.by_job.get(job_name) match {
          case None =>
            // no data for job, take average of other jobs for given threads
            val job_estimates = data.by_job.keys.flatMap(estimate_threads(_, hostname, threads))
            if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates)
            else {
              // no other job to estimate from, use global curve to approximate any other job
              val (threads1, data1) = data.by_threads.head
              data1.mean_time.scale(global_threads_factor(threads1, threads))
            }

          case Some(data) =>
            data.by_threads.get(threads) match {
              case None => // interpolate threads
                estimate_threads(job_name, hostname, threads).getOrElse {
                  // per machine, try to approximate config for threads
                  val approximated =
                    for {
                      hostname1 <- data.by_hostname.keys
                      estimate <- estimate_threads(job_name, hostname1, threads)
                      factor = hostname_factor(hostname1, hostname)
                    } yield estimate.scale(factor)

                  if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
                  else {
                    // no single machine where config can be approximated, unify data points
                    val unified_entries = unify_hosts(job_name, hostname)

                    if (unified_entries.length > 1) approximate_threads(unified_entries, threads)
                    else {
                      // only single data point, use global curve to approximate
                      val (job_threads, job_time) = unified_entries.head
                      job_time.scale(global_threads_factor(job_threads, threads))
                    }
                  }
                }

              case Some(data) => // time for job/thread exists, interpolate machine
                data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
                  Timing_Data.median_time(
                    data.by_hostname.toList.map((hostname1, data) =>
                      data.mean_time.scale(hostname_factor(hostname1, hostname))))
                }
            }
        }

      cache.get(job_name, hostname, threads) match {
        case Some(time) => time
        case None =>
          val time = estimate
          cache = cache + ((job_name, hostname, threads) -> time)
          time
      }
    }
  }

  object Timing_Data {
    def median_timing(obs: List[Timing]): Timing = obs.sortBy(_.elapsed.ms).apply(obs.length / 2)
    def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).apply(obs.length / 2)
    def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size)

    private def dummy_entries(host: Host, host_factor: Double) = {
      val baseline = Time.minutes(5).scale(host_factor)
      val gc = Time.seconds(10).scale(host_factor)
      List(
        Timing_Entry("dummy", host.name, 1, Timing(baseline, baseline, gc)),
        Timing_Entry("dummy", host.name, 8, Timing(baseline.scale(0.2), baseline, gc)))
    }

    def make(
      host_infos: Host_Infos,
      build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)],
    ): Timing_Data = {
      val hosts = host_infos.hosts
      val measurements =
        for {
          (meta_info, build_info) <- build_history
          build_host = meta_info.get(Build_Log.Prop.build_host)
          (job_name, session_info) <- build_info.sessions.toList
          if build_info.finished_sessions.contains(job_name)
          hostname <- session_info.hostname.orElse(build_host).toList
          host <- hosts.find(_.info.hostname == hostname).toList
          threads = session_info.threads.getOrElse(host.info.num_cpus)
        } yield (job_name, hostname, threads) -> session_info.timing

      val entries =
        if (measurements.isEmpty) {
          val default_host = host_infos.hosts.sorted(host_infos.host_speeds).last
          host_infos.hosts.flatMap(host =>
            dummy_entries(host, host_infos.host_factor(default_host, host)))
        }
        else
          measurements.groupMap(_._1)(_._2).toList.map {
            case ((job_name, hostname, threads), timings) =>
              Timing_Entry(job_name, hostname, threads, median_timing(timings))
          }

      new Timing_Data(Timing_Entries(entries), host_infos)
    }

    def load(host_infos: Host_Infos, log_database: SQL.Database): Timing_Data = {
      val build_history =
        for {
          log_name <- log_database.execute_query_statement(
            Build_Log.private_data.meta_info_table.select(List(Build_Log.Column.log_name)),
            List.from[String], res => res.string(Build_Log.Column.log_name))
          meta_info <- Build_Log.private_data.read_meta_info(log_database, log_name)
          build_info = Build_Log.private_data.read_build_info(log_database, log_name)
        } yield (meta_info, build_info)

      make(host_infos, build_history)
    }
  }


  /* host information */

  case class Host(info: isabelle.Host.Info, build: Build_Cluster.Host) {
    def name: String = info.hostname
    def num_cpus: Int = info.num_cpus
  }

  object Host_Infos {
    def dummy: Host_Infos =
      new Host_Infos(
        List(Host(isabelle.Host.Info("dummy", Nil, 8, Some(1.0)), Build_Cluster.Host("dummy"))))

    def load(build_hosts: List[Build_Cluster.Host], db: SQL.Database): Host_Infos = {
      def get_host(build_host: Build_Cluster.Host): Host = {
        val info =
          isabelle.Host.read_info(db, build_host.name).getOrElse(
            error("No benchmark for " + quote(build_host.name)))
        Host(info, build_host)
      }

      new Host_Infos(build_hosts.map(get_host))
    }
  }

  class Host_Infos private(val hosts: List[Host]) {
    require(hosts.nonEmpty)

    private val by_hostname = hosts.map(host => host.name -> host).toMap

    def host_factor(from: Host, to: Host): Double =
      from.info.benchmark_score.get / to.info.benchmark_score.get

    val host_speeds: Ordering[Host] =
      Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) < 1)

    def the_host(hostname: String): Host =
      by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname)))
    def the_host(node_info: Node_Info): Host = the_host(node_info.hostname)

    def num_threads(node_info: Node_Info): Int =
      if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length
      else the_host(node_info).info.num_cpus

    def available(state: Build_Process.State): Resources = {
      val allocated =
        state.running.values.map(_.node_info).groupMapReduce(the_host)(List(_))(_ ::: _)
      Resources(this, allocated)
    }
  }


  /* offline tracking of job configurations and resource allocations */

  object Config {
    def from_job(job: Build_Process.Job): Config = Config(job.name, job.node_info)
  }

  case class Config(job_name: String, node_info: Node_Info) {
    def job_of(start_time: Time): Build_Process.Job =
      Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
  }

  case class Resources(
    host_infos: Host_Infos,
    allocated_nodes: Map[Host, List[Node_Info]]
  ) {
    def unused_nodes(host: Host, threads: Int): List[Node_Info] =
      if (!available(host, threads)) Nil
      else {
        val node = next_node(host, threads)
        node :: allocate(node).unused_nodes(host, threads)
      }

    def unused_nodes(threads: Int): List[Node_Info] =
      host_infos.hosts.flatMap(unused_nodes(_, threads))

    def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil)

    def allocate(node: Node_Info): Resources = {
      val host = host_infos.the_host(node)
      copy(allocated_nodes = allocated_nodes + (host -> (node :: allocated(host))))
    }

    def try_allocate_tasks(
      hosts: List[(Host, Int)],
      tasks: List[(Build_Process.Task, Int, Int)],
    ): (List[Config], Resources) =
      tasks match {
        case Nil => (Nil, this)
        case (task, min_threads, max_threads) :: tasks =>
          val (config, resources) =
            hosts.find((host, _) => available(host, min_threads)) match {
              case Some((host, host_max_threads)) =>
                val free_threads = host.info.num_cpus - ((host.build.jobs - 1) * host_max_threads)
                val node_info = next_node(host, (min_threads max free_threads) min max_threads)
                (Some(Config(task.name, node_info)), allocate(node_info))
              case None => (None, this)
            }
          val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks)
          (configs ++ config, resources1)
      }

    def next_node(host: Host, threads: Int): Node_Info = {
      val numa_node_num_cpus = host.info.num_cpus / (host.info.numa_nodes.length max 1)
      def explicit_cpus(node_info: Node_Info): List[Int] =
        if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList

      val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _)

      val available_nodes = host.info.numa_nodes
      val numa_node =
        if (!host.build.numa) None
        else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption

      val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet
      val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList

      val rel_cpus = if (available_cpus.length >= threads) available_cpus.take(threads) else Nil

      Node_Info(host.name, numa_node, rel_cpus)
    }

    def available(host: Host, threads: Int): Boolean = {
      val used = allocated(host)

      if (used.length >= host.build.jobs) false
      else {
        if (host.info.numa_nodes.length <= 1)
          used.map(host_infos.num_threads).sum + threads <= host.info.num_cpus
        else {
          def node_threads(n: Int): Int =
            used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum

          host.info.numa_nodes.exists(
            node_threads(_) + threads <= host.info.num_cpus / host.info.numa_nodes.length)
        }
      }
    }
  }


  /* schedule generation */

  object Schedule {
    case class Node(job_name: String, node_info: Node_Info, start: Date, duration: Time) {
      def end: Date = Date(start.time + duration)
    }

    type Graph = isabelle.Graph[String, Node]
  }

  case class Schedule(generator: String, start: Date, graph: Schedule.Graph) {
    def end: Date =
      if (graph.is_empty) start
      else graph.maximals.map(graph.get_node).map(_.end).maxBy(_.unix_epoch)

    def duration: Time = end.time - start.time
    def message: String = "Estimated " + duration.message_hms + " build time with " + generator
  }

  case class State(build_state: Build_Process.State, current_time: Time, finished: Schedule) {
    def start(config: Config): State =
      copy(build_state =
        build_state.copy(running = build_state.running +
          (config.job_name -> config.job_of(current_time))))

    def step(timing_data: Timing_Data): State = {
      val remaining =
        build_state.running.values.toList.map { job =>
          val elapsed = current_time - job.start_date.time
          val threads = timing_data.host_infos.num_threads(job.node_info)
          val predicted = timing_data.estimate(job.name, job.node_info.hostname, threads)
          val remaining = if (elapsed > predicted) Time.zero else predicted - elapsed
          job -> remaining
        }

      if (remaining.isEmpty) error("Schedule step without running sessions")
      else {
        val (job, elapsed) = remaining.minBy(_._2.ms)
        val now = current_time + elapsed
        val node = Schedule.Node(job.name, job.node_info, job.start_date, now - job.start_date.time)

        val preds =
          build_state.sessions.graph.imm_preds(job.name).filter(finished.graph.defined)
        val graph =
          preds.foldLeft(finished.graph.new_node(job.name, node))(_.add_edge(_, job.name))

        val build_state1 = build_state.remove_running(job.name).remove_pending(job.name)
        State(build_state1, now, finished.copy(graph = graph))
      }
    }

    def is_finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
  }

  trait Scheduler {
    def next(state: Build_Process.State): List[Config]
    def build_schedule(build_state: Build_Process.State): Schedule
  }

  abstract class Heuristic(timing_data: Timing_Data) extends Scheduler {
    val host_infos = timing_data.host_infos
    val ordered_hosts = host_infos.hosts.sorted(host_infos.host_speeds)

    def build_schedule(build_state: Build_Process.State): Schedule = {
      @tailrec
      def simulate(state: State): State =
        if (state.is_finished) state
        else {
          val state1 = next(state.build_state).foldLeft(state)(_.start(_)).step(timing_data)
          simulate(state1)
        }

      val start = Date.now()
      val end_state =
        simulate(State(build_state, start.time, Schedule(toString, start, Graph.empty)))

      end_state.finished
    }
  }

  class Default_Heuristic(timing_data: Timing_Data, options: Options)
    extends Heuristic(timing_data) {
    override def toString: String = "default build heuristic"

    def host_threads(host: Host): Int = {
      val m = (options ++ host.build.options).int("threads")
      if (m > 0) m else (host.num_cpus max 1) min 8
    }
    
    def next_jobs(resources: Resources, sorted_jobs: List[String], host: Host): List[Config] =
      sorted_jobs.zip(resources.unused_nodes(host, host_threads(host))).map(Config(_, _))

    def next(state: Build_Process.State): List[Config] = {
      val sorted_jobs = state.next_ready.sortBy(_.name)(state.sessions.ordering).map(_.name)
      val resources = host_infos.available(state)

      host_infos.hosts.foldLeft((sorted_jobs, List.empty[Config])) {
        case ((jobs, res), host) =>
          val configs = next_jobs(resources, jobs, host)
          val config_jobs = configs.map(_.job_name).toSet
          (jobs.filterNot(config_jobs.contains), configs ::: res)
      }._2
    }
  }

  class Meta_Heuristic(heuristics: List[Heuristic]) extends Scheduler {
    require(heuristics.nonEmpty)

    def best_result(state: Build_Process.State): (Heuristic, Schedule) =
      heuristics.map(heuristic =>
        heuristic -> heuristic.build_schedule(state)).minBy(_._2.duration.ms)

    def next(state: Build_Process.State): List[Config] = best_result(state)._1.next(state)

    def build_schedule(state: Build_Process.State): Schedule = best_result(state)._2
  }


  /* heuristics */

  abstract class Path_Heuristic(
    timing_data: Timing_Data,
    sessions_structure: Sessions.Structure,
    max_threads_limit: Int
  ) extends Heuristic(timing_data) {
    /* pre-computed properties for efficient heuristic */

    val max_threads = host_infos.hosts.map(_.info.num_cpus).max min max_threads_limit

    type Node = String
    val build_graph = sessions_structure.build_graph

    val minimals = build_graph.minimals
    val maximals = build_graph.maximals

    def all_preds(node: Node): Set[Node] = build_graph.all_preds(List(node)).toSet
    val maximals_all_preds = maximals.map(node => node -> all_preds(node)).toMap

    def best_time(node: Node): Time = {
      val host = ordered_hosts.last
      val threads = timing_data.best_threads(node, max_threads) min host.info.num_cpus
      timing_data.estimate(node, host.name, threads)
    }
    val best_times = build_graph.keys.map(node => node -> best_time(node)).toMap

    val succs_max_time_ms = build_graph.node_height(best_times(_).ms)
    def max_time(node: Node): Time = Time.ms(succs_max_time_ms(node)) + best_times(node)
    def max_time(task: Build_Process.Task): Time = max_time(task.name)

    def path_times(minimals: List[Node]): Map[Node, Time] = {
      def time_ms(node: Node): Long = best_times(node).ms
      val path_times_ms = build_graph.reachable_length(time_ms, build_graph.imm_succs, minimals)
      path_times_ms.view.mapValues(Time.ms).toMap
    }

    def path_max_times(minimals: List[Node]): Map[Node, Time] =
      path_times(minimals).toList.map((node, time) => node -> (time + max_time(node))).toMap

    def parallel_paths(running: List[(Node, Time)], pred: Node => Boolean = _ => true): Int = {
      def start(node: Node): (Node, Time) = node -> best_times(node)

      def pass_time(elapsed: Time)(node: Node, time: Time): (Node, Time) =
        node -> (time - elapsed)

      def parallel_paths(running: Map[Node, Time]): (Int, Map[Node, Time]) =
        if (running.isEmpty) (0, running)
        else {
          def get_next(node: Node): List[Node] =
            build_graph.imm_succs(node).filter(pred).filter(
              build_graph.imm_preds(_).intersect(running.keySet) == Set(node)).toList

          val (next, elapsed) = running.minBy(_._2.ms)
          val (remaining, finished) =
            running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)

          val running1 =
            remaining.map(pass_time(elapsed)).toMap ++
              finished.map(_._1).flatMap(get_next).map(start)
          val (res, running2) = parallel_paths(running1)
          (res max running.size, running2)
        }

      parallel_paths(running.toMap)._1
    }
  }


  object Path_Time_Heuristic {
    sealed trait Critical_Criterion
    case class Absolute_Time(time: Time) extends Critical_Criterion {
      override def toString: String = "absolute time (" + time.message_hms + ")"
    }
    case class Relative_Time(factor: Double) extends Critical_Criterion {
      override def toString: String = "relative time (" + factor + ")"
    }

    sealed trait Parallel_Strategy
    case class Fixed_Thread(threads: Int) extends Parallel_Strategy {
      override def toString: String = "fixed threads (" + threads + ")"
    }
    case class Time_Based_Threads(f: Time => Int) extends Parallel_Strategy {
      override def toString: String = "time based threads"
    }

    sealed trait Host_Criterion
    case object Critical_Nodes extends Host_Criterion {
      override def toString: String = "per critical node"
    }
    case class Fixed_Fraction(fraction: Double) extends Host_Criterion {
      override def toString: String = "fixed fraction (" + fraction + ")"
    }
    case class Host_Speed(min_factor: Double) extends Host_Criterion {
      override def toString: String = "host speed (" + min_factor + ")"
    }
  }

  class Path_Time_Heuristic(
    is_critical: Path_Time_Heuristic.Critical_Criterion,
    parallel_threads: Path_Time_Heuristic.Parallel_Strategy,
    host_criterion: Path_Time_Heuristic.Host_Criterion,
    timing_data: Timing_Data,
    sessions_structure: Sessions.Structure,
    max_threads_limit: Int = 8
  ) extends Path_Heuristic(timing_data, sessions_structure, max_threads_limit) {
    import Path_Time_Heuristic.*

    override def toString: Node = {
      val params =
        List(
          "critical: " + is_critical,
          "parallel: " + parallel_threads,
          "fast hosts: " + host_criterion)
      "path time heuristic (" + params.mkString(", ") + ")"
    }

    def next(state: Build_Process.State): List[Config] = {
      val resources = host_infos.available(state)

      def best_threads(task: Build_Process.Task): Int =
        timing_data.best_threads(task.name, max_threads)

      val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)

      val available_nodes =
        host_infos.available(state.copy(running = Map.empty))
          .unused_nodes(max_threads)
          .sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse

      def remaining_time(node: Node): (Node, Time) =
        state.running.get(node) match {
          case None => node -> best_time(node)
          case Some(job) =>
            val estimate =
              timing_data.estimate(job.name, job.node_info.hostname,
                host_infos.num_threads(job.node_info))
            node -> Time.ms((Time.now() - job.start_date.time + estimate).ms max 0)
        }

      val max_parallel = parallel_paths(state.ready.map(_.name).map(remaining_time))
      val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse

      if (max_parallel <= available_nodes.length) {
        val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task)))
        resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
      }
      else {
        def is_critical(time: Time): Boolean =
          this.is_critical match {
            case Absolute_Time(threshold) => time > threshold
            case Relative_Time(factor) => time > minimals.map(max_time).maxBy(_.ms).scale(factor)
          }

        val critical_minimals = state.ready.filter(task => is_critical(max_time(task))).map(_.name)
        val critical_nodes =
          path_max_times(critical_minimals).filter((_, time) => is_critical(time)).keySet

        val (critical, other) = next_sorted.partition(task => critical_nodes.contains(task.name))

        val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task)))

        def parallel_threads(task: Build_Process.Task): Int =
          this.parallel_threads match {
            case Fixed_Thread(threads) => threads
            case Time_Based_Threads(f) => f(best_time(task.name))
          }

        val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))

        val max_critical_parallel =
          parallel_paths(critical_minimals.map(remaining_time), critical_nodes.contains)
        val max_critical_hosts =
          available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length

        val split =
          this.host_criterion match {
            case Critical_Nodes => max_critical_hosts
            case Fixed_Fraction(fraction) =>
              ((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts
            case Host_Speed(min_factor) =>
              val best = rev_ordered_hosts.head._1.info.benchmark_score.get
              val num_fast =
                rev_ordered_hosts.count(_._1.info.benchmark_score.exists(_ >= best * min_factor))
              num_fast min max_critical_hosts
          }

        val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split)

        val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks)
        val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks)

        configs1 ::: configs2
      }
    }
  }


  /* process for scheduled build */

  abstract class Scheduled_Build_Process(
    build_context: Build.Context,
    build_progress: Progress,
    server: SSH.Server,
  ) extends Build_Process(build_context, build_progress, server) {
    protected val start_date = Date.now()

    def init_scheduler(timing_data: Timing_Data): Scheduler

    /* global resources with common close() operation */

    private final lazy val _log_store: Build_Log.Store = Build_Log.store(build_options)
    private final lazy val _log_database: SQL.Database =
      try {
        val db = _log_store.open_database(server = this.server)
        _log_store.init_database(db)
        db
      }
      catch { case exn: Throwable => close(); throw exn }

    override def close(): Unit = {
      super.close()
      Option(_log_database).foreach(_.close())
    }


    /* previous results via build log */

    override def open_build_cluster(): Build_Cluster = {
      val build_cluster = super.open_build_cluster()
      build_cluster.init()
      if (build_context.master && build_context.max_jobs > 0) {
        val benchmark_options = build_options.string("build_hostname") = hostname
        Benchmark.benchmark(benchmark_options, progress)
      }
      build_cluster.benchmark()
      build_cluster
    }

    private val timing_data: Timing_Data = {
      val cluster_hosts: List[Build_Cluster.Host] =
        if (build_context.max_jobs == 0) build_context.build_hosts
        else {
          val local_build_host =
            Build_Cluster.Host(
              hostname, jobs = build_context.max_jobs, numa = build_context.numa_shuffling)
          local_build_host :: build_context.build_hosts
        }

      val host_infos = Host_Infos.load(cluster_hosts, _host_database)
      Timing_Data.load(host_infos, _log_database)
    }
    private val scheduler = init_scheduler(timing_data)

    def write_build_log(results: Build.Results, state: Build_Process.State.Results): Unit = {
      val sessions =
        for {
          (session_name, result) <- state.toList
          if !result.current
        } yield {
          val info = build_context.sessions_structure(session_name)
          val entry =
            if (!results.cancelled(session_name)) {
              val status =
                if (result.ok) Build_Log.Session_Status.finished
                else Build_Log.Session_Status.failed

              Build_Log.Session_Entry(
                chapter = info.chapter,
                groups = info.groups,
                hostname = Some(result.node_info.hostname),
                threads = Some(timing_data.host_infos.num_threads(result.node_info)),
                timing = result.process_result.timing,
                sources = Some(result.output_shasum.digest.toString),
                status = Some(status))
            }
            else
              Build_Log.Session_Entry(
                chapter = info.chapter,
                groups = info.groups,
                status = Some(Build_Log.Session_Status.cancelled))
          session_name -> entry
        }

      val settings =
        Build_Log.Settings.all_settings.map(_.name).map(name =>
          name -> Isabelle_System.getenv(name))
      val props =
        List(
          Build_Log.Prop.build_id.name -> build_context.build_uuid,
          Build_Log.Prop.build_engine.name -> build_context.engine.name,
          Build_Log.Prop.build_host.name -> hostname,
          Build_Log.Prop.build_start.name -> Build_Log.print_date(start_date))

      val meta_info = Build_Log.Meta_Info(props, settings)
      val build_info = Build_Log.Build_Info(sessions.toMap)
      val log_name = Build_Log.log_filename(engine = build_context.engine.name, date = start_date)

      Build_Log.private_data.update_sessions(
        _log_database, _log_store.cache.compress, log_name.file_name, build_info)
      Build_Log.private_data.update_meta_info(_log_database, log_name.file_name, meta_info)
    }


    /* build process */

    case class Cache(state: Build_Process.State, configs: List[Config], estimate: Date) {
      def is_current(state: Build_Process.State): Boolean =
        this.state.pending.nonEmpty && this.state.results == state.results
      def is_current_estimate(estimate: Date): Boolean =
        Math.abs((this.estimate.time - estimate.time).ms) < Time.minutes(1).ms
    }

    private var cache = Cache(Build_Process.State(), Nil, Date.now())

    override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = {
      val configs =
        if (cache.is_current(state)) cache.configs
        else scheduler.next(state)
      configs.find(_.job_name == session_name).get.node_info
    }

    def is_current(state: Build_Process.State, session_name: String): Boolean =
      state.ancestor_results(session_name) match {
        case Some(ancestor_results) if ancestor_results.forall(_.current) =>
          val sources_shasum = state.sessions(session_name).sources_shasum

          val input_shasum =
            if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum()
            else SHA1.flat_shasum(ancestor_results.map(_.output_shasum))

          val store_heap =
            build_context.build_heap || Sessions.is_pure(session_name) ||
              state.sessions.iterator.exists(_.ancestors.contains(session_name))

          store.check_output(
            _database_server, session_name,
            session_options = build_context.sessions_structure(session_name).options,
            sources_shasum = sources_shasum,
            input_shasum = input_shasum,
            fresh_build = build_context.fresh_build,
            store_heap = store_heap)._1
        case _ => false
    }

    override def next_jobs(state: Build_Process.State): List[String] = {
      val finalize_limit = if (build_context.master) Int.MaxValue else 0

      if (progress.stopped) state.next_ready.map(_.name).take(finalize_limit)
      else if (cache.is_current(state)) cache.configs.map(_.job_name).filterNot(state.is_running)
      else {
        val current = state.next_ready.filter(task => is_current(state, task.name))
        if (current.nonEmpty) current.map(_.name).take(finalize_limit)
        else {
          val start = Time.now()
          val next = scheduler.next(state)
          val schedule = scheduler.build_schedule(state)
          val elapsed = Time.now() - start

          val timing_msg = if (elapsed.is_relevant) " (took " + elapsed.message + ")" else ""
          progress.echo_if(build_context.master && !cache.is_current_estimate(schedule.end),
            schedule.message + timing_msg)

          val configs = next.filter(_.node_info.hostname == hostname)
          cache = Cache(state, configs, schedule.end)
          configs.map(_.job_name)
        }
      }
    }

    override def run(): Build.Results = {
      val results = super.run()
      if (build_context.master) write_build_log(results, snapshot().results)
      results
    }
  }

  class Engine extends Build.Engine("build_schedule") {

    def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = {
      val sessions_structure = context.sessions_structure

      val is_criticals =
        List(
          Path_Time_Heuristic.Absolute_Time(Time.minutes(5)),
          Path_Time_Heuristic.Absolute_Time(Time.minutes(10)),
          Path_Time_Heuristic.Absolute_Time(Time.minutes(20)),
          Path_Time_Heuristic.Relative_Time(0.5))
      val parallel_threads =
        List(
          Path_Time_Heuristic.Fixed_Thread(1),
          Path_Time_Heuristic.Time_Based_Threads({
            case time if time < Time.minutes(1) => 1
            case time if time < Time.minutes(5) => 4
            case _ => 8
          }))
      val machine_splits =
        List(
          Path_Time_Heuristic.Critical_Nodes,
          Path_Time_Heuristic.Fixed_Fraction(0.3),
          Path_Time_Heuristic.Host_Speed(0.9))

      val path_time_heuristics =
        for {
          is_critical <- is_criticals
          parallel <- parallel_threads
          machine_split <- machine_splits
        } yield
          Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure)
      val heuristics = Default_Heuristic(timing_data, context.build_options) :: path_time_heuristics

      new Meta_Heuristic(heuristics)
    }

    override def open_build_process(
      context: Build.Context,
      progress: Progress,
      server: SSH.Server
    ): Build_Process =
      new Scheduled_Build_Process(context, progress, server) {
        def init_scheduler(timing_data: Timing_Data): Scheduler = scheduler(timing_data, context)
      }
  }


  /* build schedule */

  def build_schedule(
    options: Options,
    build_hosts: List[Build_Cluster.Host] = Nil,
    selection: Sessions.Selection = Sessions.Selection.empty,
    progress: Progress = new Progress,
    afp_root: Option[Path] = None,
    dirs: List[Path] = Nil,
    select_dirs: List[Path] = Nil,
    infos: List[Sessions.Info] = Nil,
    numa_shuffling: Boolean = false,
    augment_options: String => List[Options.Spec] = _ => Nil,
    session_setup: (String, Session) => Unit = (_, _) => (),
    cache: Term.Cache = Term.Cache.make()
  ): Schedule = {
    val build_engine = new Engine

    val store = build_engine.build_store(options, build_hosts = build_hosts, cache = cache)
    val log_store = Build_Log.store(options, cache = cache)
    val build_options = store.options

    def build_schedule(
      server: SSH.Server,
      database_server: Option[SQL.Database],
      log_database: PostgreSQL.Database,
      host_database: SQL.Database
    ): Schedule = {
      val full_sessions =
        Sessions.load_structure(build_options, dirs = AFP.make_dirs(afp_root) ::: dirs,
          select_dirs = select_dirs, infos = infos, augment_options = augment_options)
      val full_sessions_selection = full_sessions.imports_selection(selection)

      val build_deps =
        Sessions.deps(full_sessions.selection(selection), progress = progress,
          inlined_files = true).check_errors

      val build_context =
        Build.Context(store, build_engine, build_deps, afp_root = afp_root,
          build_hosts = build_hosts, hostname = Build.hostname(build_options),
          numa_shuffling = numa_shuffling, max_jobs = 0, session_setup = session_setup,
          master = true)

      val cluster_hosts = build_context.build_hosts

      val hosts_current =
        cluster_hosts.forall(host => isabelle.Host.read_info(host_database, host.name).isDefined)
      if (!hosts_current) {
        val build_cluster = Build_Cluster.make(build_context, progress = progress)
        build_cluster.open()
        build_cluster.init()
        build_cluster.benchmark()
        build_cluster.close()
      }

      val host_infos = Host_Infos.load(cluster_hosts, host_database)
      val timing_data = Timing_Data.load(host_infos, log_database)

      val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress)
      def task(session: Build_Job.Session_Context): Build_Process.Task =
        Build_Process.Task(session.name, session.deps, JSON.Object.empty, build_context.build_uuid)

      val build_state =
        Build_Process.State(sessions = sessions, pending = sessions.iterator.map(task).toList)

      val scheduler = build_engine.scheduler(timing_data, build_context)
      def schedule_msg(res: Exn.Result[Schedule]): String =
        res match { case Exn.Res(schedule) => schedule.message case _ => "" }

      Timing.timeit(scheduler.build_schedule(build_state), schedule_msg, output = progress.echo(_))
    }

    using(store.open_server()) { server =>
      using_optional(store.maybe_open_database_server(server = server)) { database_server =>
        using(log_store.open_database(server = server)) { log_database =>
          using(store.open_build_database(
            path = isabelle.Host.private_data.database, server = server)) { host_database =>
              build_schedule(server, database_server, log_database, host_database)
          }
        }
      }
    }
  }

  def write_schedule_graphic(schedule: Schedule, output: Path): Unit = {
    import java.awt.geom.{GeneralPath, Rectangle2D}
    import java.awt.{BasicStroke, Color, Graphics2D}

    val line_height = isabelle.graphview.Metrics.default.height
    val char_width = isabelle.graphview.Metrics.default.char_width
    val padding = isabelle.graphview.Metrics.default.space_width
    val gap = isabelle.graphview.Metrics.default.gap

    val graph = schedule.graph

    def text_width(text: String): Double = text.length * char_width

    val generator_height = line_height + padding
    val hostname_height = generator_height + line_height + padding
    def time_height(time: Time): Double = time.seconds
    def date_height(date: Date): Double = time_height(date.time - schedule.start.time)

    val hosts = graph.iterator.map(_._2._1).toList.groupBy(_.node_info.hostname)

    def node_width(node: Schedule.Node): Double = 2 * padding + text_width(node.job_name)

    case class Range(start: Double, stop: Double) {
      def proper: List[Range] = if (start < stop) List(this) else Nil
      def width: Double = stop - start
    }

    val rel_node_ranges =
      hosts.toList.flatMap { (hostname, nodes) =>
        val sorted = nodes.sortBy(node => (node.start.time.ms, node.end.time.ms, node.job_name))
        sorted.foldLeft((List.empty[Schedule.Node], Map.empty[Schedule.Node, Range])) {
          case ((nodes, allocated), node) =>
            val width = node_width(node) + padding
            val parallel = nodes.filter(_.end.time > node.start.time)
            val (last, slots) =
              parallel.sortBy(allocated(_).start).foldLeft((0D, List.empty[Range])) {
                case ((start, ranges), node1) =>
                  val node_range = allocated(node1)
                  (node_range.stop, ranges ::: Range(start, node_range.start).proper)
              }
            val start =
              (Range(last, Double.MaxValue) :: slots.filter(_.width >= width)).minBy(_.width).start
            (node :: parallel, allocated + (node -> Range(start, start + width)))
        }._2
      }.toMap

    def host_width(hostname: String) =
      2 * padding + (hosts(hostname).map(rel_node_ranges(_).stop).max max text_width(hostname))

    def graph_height(graph: Graph[String, Schedule.Node]): Double =
      date_height(graph.maximals.map(graph.get_node(_).end).maxBy(_.unix_epoch))

    val height = (hostname_height + 2 * padding + graph_height(graph)).ceil.toInt
    val (last, host_starts) =
      hosts.keys.foldLeft((0D, Map.empty[String, Double])) {
        case ((previous, starts), hostname) =>
          (previous + gap + host_width(hostname), starts + (hostname -> previous))
      }
    val width = (last - gap).ceil.toInt

    def node_start(node: Schedule.Node): Double =
      host_starts(node.node_info.hostname) + padding + rel_node_ranges(node).start

    def paint(gfx: Graphics2D): Unit = {
      gfx.setColor(Color.LIGHT_GRAY)
      gfx.fillRect(0, 0, width, height)
      gfx.setRenderingHints(isabelle.graphview.Metrics.rendering_hints)
      gfx.setFont(isabelle.graphview.Metrics.default.font)
      gfx.setStroke(new BasicStroke(1, BasicStroke.CAP_BUTT, BasicStroke.JOIN_ROUND))

      draw_string(schedule.generator + ", build time: " + schedule.duration.message_hms, padding, 0)

      def draw_host(x: Double, hostname: String): Double = {
        val nodes = hosts(hostname).map(_.job_name).toSet
        val width = host_width(hostname)
        val height = 2 * padding + graph_height(graph.restrict(nodes.contains))
        val padding1 = ((width - text_width(hostname)) / 2) max 0
        val rect = new Rectangle2D.Double(x, hostname_height, width, height)
        gfx.setColor(Color.BLACK)
        gfx.draw(rect)
        gfx.setColor(Color.GRAY)
        gfx.fill(rect)
        draw_string(hostname, x + padding1, generator_height)
        x + gap + width
      }

      def draw_string(str: String, x: Double, y: Double): Unit = {
        gfx.setColor(Color.BLACK)
        gfx.drawString(str, x.toInt, (y + line_height).toInt)
      }

      def node_rect(node: Schedule.Node): Rectangle2D.Double = {
        val x = node_start(node)
        val y = hostname_height + padding + date_height(node.start)
        val width = node_width(node)
        val height = time_height(node.duration)
        new Rectangle2D.Double(x, y, width, height)
      }

      def draw_node(node: Schedule.Node): Rectangle2D.Double = {
        val rect = node_rect(node)
        gfx.setColor(Color.BLACK)
        gfx.draw(rect)
        gfx.setColor(Color.WHITE)
        gfx.fill(rect)

        def add_text(y: Double, text: String): Double =
          if (line_height > rect.height - y || text_width(text) + 2 * padding > rect.width) y
          else {
            val padding1 = padding min ((rect.height - (y + line_height)) / 2)
            draw_string(text, rect.x + padding, rect.y + y + padding1)
            y + padding1 + line_height
          }

        val node_info = node.node_info

        val duration_str = "(" + node.duration.message_hms + ")"
        val node_str =
          "on " + proper_string(node_info.toString.stripPrefix(node_info.hostname)).getOrElse("all")
        val start_str = "Start: " + (node.start.time - schedule.start.time).message_hms

        List(node.job_name, duration_str, node_str, start_str).foldLeft(0D)(add_text)

        rect
      }

      def draw_arrow(from: Schedule.Node, to: Rectangle2D.Double, curve: Double = 10): Unit = {
        val from_rect = node_rect(from)

        val path = new GeneralPath()
        path.moveTo(from_rect.getCenterX, from_rect.getMaxY)
        path.lineTo(to.getCenterX, to.getMinY)

        gfx.setColor(Color.BLUE)
        gfx.draw(path)
      }

      hosts.keys.foldLeft(0D)(draw_host)

      graph.topological_order.foreach { job_name =>
        val node = graph.get_node(job_name)
        val rect = draw_node(node)

        graph.imm_preds(job_name).foreach(pred => draw_arrow(graph.get_node(pred), rect))
      }
    }

    val name = output.file_name
    if (File.is_png(name)) Graphics_File.write_png(output.file, paint, width, height)
    else if (File.is_pdf(name)) Graphics_File.write_pdf(output.file, paint, width, height)
    else error("Bad type of file: " + quote(name) + " (.png or .pdf expected)")
  }


  /* command-line wrapper */

  val isabelle_tool = Isabelle_Tool("build_schedule", "generate build schedule", Scala_Project.here,
    { args =>
      var afp_root: Option[Path] = None
      val base_sessions = new mutable.ListBuffer[String]
      val select_dirs = new mutable.ListBuffer[Path]
      val build_hosts = new mutable.ListBuffer[Build_Cluster.Host]
      var numa_shuffling = false
      var output_file: Option[Path] = None
      var requirements = false
      val exclude_session_groups = new mutable.ListBuffer[String]
      var all_sessions = false
      val dirs = new mutable.ListBuffer[Path]
      val session_groups = new mutable.ListBuffer[String]
      var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS)
      var verbose = false
      val exclude_sessions = new mutable.ListBuffer[String]

      val getopts = Getopts("""
Usage: isabelle build_schedule [OPTIONS] [SESSIONS ...]

  Options are:
    -A ROOT      include AFP with given root directory (":" for """ + AFP.BASE.implode + """)
    -B NAME      include session NAME and all descendants
    -D DIR       include session directory and select its sessions
    -H HOSTS     additional build cluster host specifications, of the form
                 "NAMES:PARAMETERS" (separated by commas)
    -N           cyclic shuffling of NUMA CPU nodes (performance tuning)
    -O FILE      output file
    -R           refer to requirements of selected sessions
    -X NAME      exclude sessions from group NAME and all descendants
    -a           select all sessions
    -d DIR       include session directory
    -g NAME      select session group NAME
    -o OPTION    override Isabelle system OPTION (via NAME=VAL or NAME)
    -v           verbose
    -x NAME      exclude session NAME and all descendants

  Generate build graph.
""",
        "A:" -> (arg => afp_root = Some(if (arg == ":") AFP.BASE else Path.explode(arg))),
        "B:" -> (arg => base_sessions += arg),
        "D:" -> (arg => select_dirs += Path.explode(arg)),
        "H:" -> (arg => build_hosts ++= Build_Cluster.Host.parse(Registry.global, arg)),
        "N" -> (_ => numa_shuffling = true),
        "O:" -> (arg => output_file = Some(Path.explode(arg))),
        "R" -> (_ => requirements = true),
        "X:" -> (arg => exclude_session_groups += arg),
        "a" -> (_ => all_sessions = true),
        "d:" -> (arg => dirs += Path.explode(arg)),
        "g:" -> (arg => session_groups += arg),
        "o:" -> (arg => options = options + arg),
        "v" -> (_ => verbose = true),
        "x:" -> (arg => exclude_sessions += arg))

      val sessions = getopts(args)

      val progress = new Console_Progress(verbose = verbose)

      val schedule =
        build_schedule(options,
          selection = Sessions.Selection(
            requirements = requirements,
            all_sessions = all_sessions,
            base_sessions = base_sessions.toList,
            exclude_session_groups = exclude_session_groups.toList,
            exclude_sessions = exclude_sessions.toList,
            session_groups = session_groups.toList,
            sessions = sessions),
          progress = progress,
          afp_root = afp_root,
          dirs = dirs.toList,
          select_dirs = select_dirs.toList,
          numa_shuffling = isabelle.Host.numa_check(progress, numa_shuffling),
          build_hosts = build_hosts.toList)

      if (!schedule.graph.is_empty && output_file.nonEmpty)
        write_schedule_graphic(schedule, output_file.get)
    })
}