add module for faster scheduled builds;
authorFabian Huch <huch@in.tum.de>
Wed, 18 Oct 2023 20:51:24 +0200
changeset 78845 ff96d94957cb
parent 78844 c7f436a63108
child 78846 966aa081929f
add module for faster scheduled builds;
etc/build.props
src/Pure/Tools/build_schedule.scala
--- a/etc/build.props	Wed Oct 18 20:26:02 2023 +0200
+++ b/etc/build.props	Wed Oct 18 20:51:24 2023 +0200
@@ -194,6 +194,7 @@
   src/Pure/Tools/build_cluster.scala \
   src/Pure/Tools/build_job.scala \
   src/Pure/Tools/build_process.scala \
+  src/Pure/Tools/build_schedule.scala \
   src/Pure/Tools/check_keywords.scala \
   src/Pure/Tools/debugger.scala \
   src/Pure/Tools/doc.scala \
@@ -316,6 +317,7 @@
   isabelle.Bash$Handler \
   isabelle.Bibtex$File_Format \
   isabelle.Build$Default_Engine \
+  isabelle.Build_Schedule$Engine \
   isabelle.Document_Build$Build_Engine \
   isabelle.Document_Build$LIPIcs_LuaLaTeX_Engine \
   isabelle.Document_Build$LIPIcs_PDFLaTeX_Engine \
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Tools/build_schedule.scala	Wed Oct 18 20:51:24 2023 +0200
@@ -0,0 +1,555 @@
+/*  Title:      Pure/Tools/build_schedule.scala
+    Author:     Fabian Huch, TU Muenchen
+
+Build schedule generated by Heuristic methods, allowing for more efficient builds.
+ */
+package isabelle
+
+
+import Host.{Info, Node_Info}
+import scala.annotation.tailrec
+
+
+object Build_Schedule {
+  val engine_name = "build_schedule"
+
+  object Config {
+    def from_job(job: Build_Process.Job): Config = Config(job.name, job.node_info)
+  }
+
+  case class Config(job_name: String, node_info: Node_Info) {
+    def job_of(start_time: Time): Build_Process.Job =
+      Build_Process.Job(job_name, "", "", node_info, Date(start_time), None)
+  }
+
+  /* organized historic timing information (extracted from build logs) */
+
+  case class Timing_Entry(job_name: String, hostname: String, threads: Int, elapsed: Time)
+
+  class Timing_Data private(data: List[Timing_Entry], val host_infos: Host_Infos) {
+    require(data.nonEmpty)
+
+    def speedup(time: Time, factor: Double): Time =
+      Time.ms((time.ms * factor).toLong)
+
+    def is_empty = data.isEmpty
+    def size = data.length
+
+    private lazy val by_job =
+      data.groupBy(_.job_name).view.mapValues(new Timing_Data(_, host_infos)).toMap
+    private lazy val by_threads =
+      data.groupBy(_.threads).view.mapValues(new Timing_Data(_, host_infos)).toMap
+    private lazy val by_hostname =
+      data.groupBy(_.hostname).view.mapValues(new Timing_Data(_, host_infos)).toMap
+
+    def mean_time: Time = Timing_Data.mean_time(data.map(_.elapsed))
+
+    private def best_entry: Timing_Entry = data.minBy(_.elapsed.ms)
+
+    def best_threads(job_name: String): Option[Int] = by_job.get(job_name).map(_.best_entry.threads)
+
+    def best_time(job_name: String): Time =
+      by_job.get(job_name).map(_.best_entry.elapsed).getOrElse(
+        estimate_config(Config(job_name, Node_Info(best_entry.hostname, None, Nil))))
+
+    private def hostname_factor(from: String, to: String): Double =
+      host_infos.host_factor(host_infos.the_host(from), host_infos.the_host(to))
+
+    def approximate_threads(threads: Int): Option[Time] = {
+      val approximations =
+        by_job.values.filter(_.size > 1).map { data =>
+          val (ref_hostname, x0) =
+            data.by_hostname.toList.flatMap((hostname, data) =>
+              data.by_threads.keys.map(hostname -> _)).minBy((_, n) => Math.abs(n - threads))
+
+          def unify_hosts(data: Timing_Data): List[Time] =
+            data.by_hostname.toList.map((hostname, data) =>
+              speedup(data.mean_time, hostname_factor(hostname, ref_hostname)))
+
+          val entries =
+            data.by_threads.toList.map((threads, data) =>
+              threads -> Timing_Data.median_time(unify_hosts(data)))
+
+          val y0 = data.by_hostname(ref_hostname).by_threads(x0).mean_time
+          val (x1, y1_data) =
+            data.by_hostname(ref_hostname).by_threads.toList.minBy((n, _) => Math.abs(n - threads))
+          val y1 = y1_data.mean_time
+
+          val a = (y1.ms - y0.ms).toDouble / (x1 - x0)
+          val b = y0.ms - a * x0
+          Time.ms((a * threads + b).toLong)
+        }
+      if (approximations.isEmpty) None else Some(Timing_Data.mean_time(approximations))
+    }
+
+    def threads_factor(divided: Int, divisor: Int): Double =
+      (approximate_threads(divided), approximate_threads(divisor)) match {
+        case (Some(dividend), Some(divisor)) => dividend.ms.toDouble / divisor.ms
+        case _ => divided.toDouble / divisor
+      }
+
+    def estimate_config(config: Config): Time =
+      by_job.get(config.job_name) match {
+        case None => mean_time
+        case Some(data) =>
+          val hostname = config.node_info.hostname
+          val threads = host_infos.num_threads(config.node_info)
+          data.by_threads.get(threads) match {
+            case None => // interpolate threads
+              data.by_hostname.get(hostname).flatMap(
+                _.approximate_threads(threads)).getOrElse {
+                  // per machine, try to approximate config for threads
+                  val approximated =
+                    data.by_hostname.toList.flatMap((hostname1, data) =>
+                      data.approximate_threads(threads).map(time =>
+                        speedup(time, hostname_factor(hostname1, hostname))))
+
+                  if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
+                  else {
+                    // no machine where config can be approximated
+                    data.approximate_threads(threads).getOrElse {
+                      // only single data point, use global curve to approximate
+                      val global_factor = threads_factor(data.by_threads.keys.head, threads)
+                      speedup(data.by_threads.values.head.mean_time, global_factor)
+                    }
+                  }
+                }
+
+            case Some(data) => // time for job/thread exists, interpolate machine
+              data.by_hostname.get(hostname).map(_.mean_time).getOrElse {
+                Timing_Data.median_time(
+                  data.by_hostname.toList.map((hostname1, data) =>
+                    speedup(data.mean_time, hostname_factor(hostname1, hostname))))
+              }
+          }
+      }
+  }
+
+  object Timing_Data {
+    def median_time(obs: List[Time]): Time = obs.sortBy(_.ms).drop(obs.length / 2).head
+    def mean_time(obs: Iterable[Time]): Time = Time.ms(obs.map(_.ms).sum / obs.size)
+
+    private val dummy_entries =
+      List(
+        Timing_Entry("dummy", "dummy", 1, Time.minutes(5)),
+        Timing_Entry("dummy", "dummy", 8, Time.minutes(1)))
+
+    def dummy: Timing_Data = new Timing_Data(dummy_entries, Host_Infos.dummy)
+
+    def make(
+      host_infos: Host_Infos,
+      build_history: List[(Build_Log.Meta_Info, Build_Log.Build_Info)],
+    ): Timing_Data = {
+      val hosts = host_infos.hosts
+      val measurements =
+        for {
+          (meta_info, build_info) <- build_history
+          build_host <- meta_info.get(Build_Log.Prop.build_host).toList
+          (job_name, session_info) <- build_info.sessions.toList
+          hostname = session_info.hostname.getOrElse(build_host)
+          host <- hosts.find(_.info.hostname == build_host).toList
+          threads = session_info.threads.getOrElse(host.info.num_cpus)
+        } yield (job_name, hostname, threads) -> session_info.timing.elapsed
+
+      val entries =
+        if (measurements.isEmpty) dummy_entries
+        else
+          measurements.groupMap(_._1)(_._2).toList.map {
+            case ((job_name, hostname, threads), timings) =>
+              Timing_Entry(job_name, hostname, threads, median_time(timings))
+          }
+
+      new Timing_Data(entries, host_infos)
+    }
+  }
+
+
+  /* host information */
+
+  case class Host(info: isabelle.Host.Info, build: Build_Cluster.Host)
+
+  object Host_Infos {
+    def dummy: Host_Infos =
+      new Host_Infos(List(Host(Info("dummy", Nil, 8, Some(1.0)), Build_Cluster.Host("dummy"))))
+
+    def apply(build_hosts: List[Build_Cluster.Host], db: SQL.Database): Host_Infos = {
+      def get_host(build_host: Build_Cluster.Host): Host = {
+        val info =
+          isabelle.Host.read_info(db, build_host.name).getOrElse(
+            error("No benchmark for " + quote(build_host.name)))
+        Host(info, build_host)
+      }
+
+      new Host_Infos(build_hosts.map(get_host))
+    }
+  }
+
+  class Host_Infos private(val hosts: List[Host]) {
+    private val by_hostname = hosts.map(host => host.info.hostname -> host).toMap
+
+    def host_factor(from: Host, to: Host): Double =
+      from.info.benchmark_score.get / to.info.benchmark_score.get
+
+    val host_speeds: Ordering[Host] =
+      Ordering.fromLessThan((host1, host2) => host_factor(host1, host2) > 1)
+
+    def the_host(hostname: String): Host =
+      by_hostname.getOrElse(hostname, error("Unknown host " + quote(hostname)))
+    def the_host(node_info: Node_Info): Host = the_host(node_info.hostname)
+
+    def num_threads(node_info: Node_Info): Int =
+      if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus.length
+      else the_host(node_info).info.num_cpus
+
+    def available(state: Build_Process.State): Resources = {
+      val allocated =
+        state.running.values.map(_.node_info).groupMapReduce(the_host)(List(_))(_ ::: _)
+      Resources(this, allocated)
+    }
+  }
+
+
+  /* offline tracking of resource allocations */
+
+  case class Resources(
+    host_infos: Host_Infos,
+    allocated_nodes: Map[Host, List[Node_Info]]
+  ) {
+    val unused_hosts: List[Host] = host_infos.hosts.filter(allocated(_).isEmpty)
+
+    def allocated(host: Host): List[Node_Info] = allocated_nodes.getOrElse(host, Nil)
+
+    def allocate(node: Node_Info): Resources = {
+      val host = host_infos.the_host(node)
+      copy(allocated_nodes = allocated_nodes + (host -> (node :: allocated(host))))
+    }
+
+    def try_allocate_tasks(
+      hosts: List[Host],
+      tasks: List[(Build_Process.Task, Int)]
+    ): (List[Config], Resources) =
+      tasks match {
+        case Nil => (Nil, this)
+        case (task, threads) :: tasks =>
+          val (config, resources) =
+            hosts.find(available(_, threads)) match {
+              case Some(host) =>
+                val node_info = next_node(host, threads)
+                (Some(Config(task.name, node_info)), allocate(node_info))
+              case None => (None, this)
+            }
+          val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks)
+          (configs ++ config, resources1)
+      }
+
+    def next_node(host: Host, threads: Int): Node_Info = {
+      val numa_node_num_cpus = host.info.num_cpus / (host.info.numa_nodes.length max 1)
+      def explicit_cpus(node_info: Node_Info): List[Int] =
+        if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList
+
+      val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _)
+
+      val available_nodes = host.info.numa_nodes
+      val numa_node =
+        if (!host.build.numa) None
+        else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption
+
+      val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet
+      val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList
+
+      val rel_cpus = if (available_cpus.length >= threads) available_cpus.take(threads) else Nil
+
+      Node_Info(host.info.hostname, numa_node, rel_cpus)
+    }
+
+    def available(host: Host, threads: Int): Boolean = {
+      val used = allocated(host)
+
+      if (used.length >= host.build.jobs) false
+      else {
+        if (host.info.numa_nodes.length <= 1)
+          used.map(host_infos.num_threads).sum + threads <= host.info.num_cpus
+        else {
+          def node_threads(n: Int): Int =
+            used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum
+
+          host.info.numa_nodes.exists(
+            node_threads(_) + threads <= host.info.num_cpus / host.info.numa_nodes.length)
+        }
+      }
+    }
+  }
+
+
+  /* schedule generation */
+
+  case class State(build_state: Build_Process.State, current_time: Time) {
+    def start(config: Config): State =
+      copy(build_state =
+        build_state.copy(running = build_state.running +
+          (config.job_name -> config.job_of(current_time))))
+
+    def step(timing_data: Timing_Data): State = {
+      val remaining =
+        build_state.running.values.toList.map { job =>
+          val elapsed = current_time - job.start_date.time
+          val predicted = timing_data.estimate_config(Config.from_job(job))
+          val remaining = if (elapsed > predicted) Time.zero else predicted - elapsed
+          job -> remaining
+        }
+
+      if (remaining.isEmpty) error("Schedule step without running sessions")
+      else {
+        val (job, elapsed) = remaining.minBy(_._2.ms)
+        State(build_state.remove_running(job.name).remove_pending(job.name), current_time + elapsed)
+      }
+    }
+
+    def finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty
+  }
+
+  abstract class Scheduler {
+    def ready_jobs(state: Build_Process.State): Build_Process.State.Pending =
+      state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name))
+
+    def next(timing: Timing_Data, state: Build_Process.State): List[Config]
+
+    def build_duration(timing_data: Timing_Data, build_state: Build_Process.State): Time = {
+      @tailrec
+      def simulate(state: State): State =
+        if (state.finished) state
+        else {
+          val state1 =
+            next(timing_data, state.build_state).foldLeft(state)(_.start(_)).step(timing_data)
+          simulate(state1)
+        }
+
+      val start = Time.now()
+      simulate(State(build_state, start)).current_time - start
+    }
+  }
+
+
+  /* heuristics */
+
+  class Timing_Heuristic(threshold: Time) extends Scheduler {
+    def next(timing_data: Timing_Data, state: Build_Process.State): List[Config] = {
+      val host_infos = timing_data.host_infos
+      val resources = host_infos.available(state)
+
+      def best_threads(task: Build_Process.Task): Int =
+        timing_data.best_threads(task.name).getOrElse(
+          host_infos.hosts.map(_.info.num_cpus).max min 8)
+
+      val ready = ready_jobs(state)
+      val free = resources.unused_hosts
+
+      if (ready.length <= free.length)
+        resources.try_allocate_tasks(free, ready.map(task => task -> best_threads(task)))._1
+      else {
+        val pending_tasks = state.pending.map(_.name).toSet
+        val graph = state.sessions.graph.restrict(pending_tasks)
+
+        val accumulated_time =
+          graph.node_depth(timing_data.best_time(_).ms).filter((name, _) => graph.is_maximal(name))
+
+        val path_time =
+          accumulated_time.flatMap((name, ms) => graph.all_preds(List(name)).map(_ -> ms)).toMap
+
+        def is_critical(task: String): Boolean = path_time(task) > threshold.ms
+
+        val (critical, other) =
+          ready.sortBy(task => path_time(task.name)).partition(task => is_critical(task.name))
+
+        val critical_graph = graph.restrict(is_critical)
+        def parallel_paths(node: String): Int =
+          critical_graph.imm_succs(node).map(suc => parallel_paths(suc) max 1).sum max 1
+
+        val (critical_hosts, other_hosts) =
+          host_infos.hosts.sorted(host_infos.host_speeds).reverse.splitAt(
+            critical.map(_.name).map(parallel_paths).sum)
+
+        val (configs1, resources1) =
+          resources.try_allocate_tasks(critical_hosts,
+            critical.map(task => task -> best_threads(task)))
+
+        val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other.map(_ -> 1))
+
+        configs1 ::: configs2
+      }
+    }
+  }
+
+  class Meta_Heuristic(schedulers: List[Scheduler]) extends Scheduler {
+    require(schedulers.nonEmpty)
+
+    def next(timing_data: Timing_Data, state: Build_Process.State): List[Config] = {
+      val (best, _) = schedulers.map(h => h -> h.build_duration(timing_data, state)).minBy(_._2.ms)
+      best.next(timing_data, state)
+    }
+  }
+
+
+  /* process for scheduled build */
+
+  class Scheduled_Build_Process(
+    scheduler: Scheduler,
+    build_context: Build.Context,
+    build_progress: Progress,
+    server: SSH.Server,
+  ) extends Build_Process(build_context, build_progress, server) {
+    protected val start_date = Date.now()
+
+
+    /* global resources with common close() operation */
+
+    private final lazy val _log_store: Build_Log.Store = Build_Log.store(build_options)
+    private final lazy val _log_database: SQL.Database =
+      try {
+        val db = _log_store.open_database(server = this.server)
+        Build_Log.Data.tables.foreach(db.create_table(_))
+        db
+      }
+      catch { case exn: Throwable => close(); throw exn }
+
+    override def close(): Unit = {
+      super.close()
+      Option(_log_database).foreach(_.close())
+    }
+
+
+    /* previous results via build log */
+
+    override def open_build_cluster(): Build_Cluster = {
+      val build_cluster = super.open_build_cluster()
+      build_cluster.init()
+      if (build_context.master && build_context.max_jobs > 0) {
+        val benchmark_options = build_options.string("build_hostname") = hostname
+        Benchmark.benchmark(benchmark_options, progress)
+      }
+      build_cluster.benchmark()
+      build_cluster
+    }
+
+    private val timing_data: Timing_Data = {
+      val cluster_hosts: List[Build_Cluster.Host] =
+        if (build_context.max_jobs == 0) build_context.build_hosts
+        else {
+          val local_build_host =
+            Build_Cluster.Host(
+              hostname, jobs = build_context.max_jobs, numa = build_context.numa_shuffling)
+          local_build_host :: build_context.build_hosts
+        }
+
+      val host_infos = Host_Infos(cluster_hosts, _host_database)
+
+      val build_history =
+        for {
+          log_name <- _log_database.execute_query_statement(
+            Build_Log.Data.meta_info_table.select(List(Build_Log.Data.log_name)),
+            List.from[String], res => res.string(Build_Log.Data.log_name))
+          meta_info <- _log_store.read_meta_info(_log_database, log_name)
+          build_info = _log_store.read_build_info(_log_database, log_name)
+        } yield (meta_info, build_info)
+
+      Timing_Data.make(host_infos, build_history)
+    }
+
+    def write_build_log(results: Build.Results, state: Build_Process.State.Results): Unit = {
+      val sessions =
+        for {
+          (session_name, result) <- state.toList
+          if !result.current
+        } yield {
+          val info = build_context.sessions_structure(session_name)
+          val entry =
+            if (!results.cancelled(session_name)) {
+              val status =
+                if (result.ok) Build_Log.Session_Status.finished
+                else Build_Log.Session_Status.failed
+
+              Build_Log.Session_Entry(
+                chapter = info.chapter,
+                groups = info.groups,
+                hostname = Some(result.node_info.hostname),
+                threads = Some(timing_data.host_infos.num_threads(result.node_info)),
+                timing = result.process_result.timing,
+                sources = Some(result.output_shasum.digest.toString),
+                status = Some(status))
+            }
+            else
+              Build_Log.Session_Entry(
+                chapter = info.chapter,
+                groups = info.groups,
+                status = Some(Build_Log.Session_Status.cancelled))
+          session_name -> entry
+        }
+
+      val settings =
+        Build_Log.Settings.all_settings.map(_.name).map(name =>
+          name -> Isabelle_System.getenv(name))
+      val props =
+        List(
+          Build_Log.Prop.build_id.name -> build_context.build_uuid,
+          Build_Log.Prop.build_engine.name -> build_context.engine.name,
+          Build_Log.Prop.build_host.name -> hostname,
+          Build_Log.Prop.build_start.name -> Build_Log.print_date(start_date))
+
+      val meta_info = Build_Log.Meta_Info(props, settings)
+      val build_info = Build_Log.Build_Info(sessions.toMap)
+      val log_name = Build_Log.log_filename(engine = engine_name, date = start_date)
+
+      _log_store.update_sessions(_log_database, log_name.file_name, build_info)
+      _log_store.update_meta_info(_log_database, log_name.file_name, meta_info)
+    }
+
+
+    /* build process */
+
+    case class Cache(state: Build_Process.State, configs: List[Config], estimate: Date) {
+      def is_current(state: Build_Process.State): Boolean = this.state == state
+      def is_current_estimate(estimate: Date): Boolean =
+        this.estimate.time - estimate.time >= Time.seconds(1)
+    }
+
+    private var cache = Cache(Build_Process.State(), Nil, Date.now())
+
+    override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = {
+      val configs =
+        if (cache.is_current(state)) cache.configs
+        else scheduler.next(timing_data, state)
+      configs.find(_.job_name == session_name).get.node_info
+    }
+
+    override def next_jobs(state: Build_Process.State): List[String] =
+      if (cache.is_current(state)) cache.configs.map(_.job_name)
+      else {
+        val next = scheduler.next(timing_data, state)
+        val estimate = Date(Time.now() + scheduler.build_duration(timing_data, state))
+        progress.echo_if(build_context.master && cache.is_current_estimate(estimate),
+          "Estimated completion: " + estimate)
+
+        val configs = next.filter(_.node_info.hostname == hostname)
+        cache = Cache(state, configs, estimate)
+        configs.map(_.job_name)
+      }
+
+    override def run(): Build.Results = {
+      val results = super.run()
+      if (build_context.master) write_build_log(results, snapshot().results)
+      results
+    }
+  }
+
+  class Engine extends Build.Engine(engine_name) {
+    override def open_build_process(
+      context: Build.Context,
+      progress: Progress,
+      server: SSH.Server
+    ): Build_Process = {
+      val heuristics = List(5, 10, 20).map(minutes => Timing_Heuristic(Time.minutes(minutes)))
+      val scheduler = new Meta_Heuristic(heuristics)
+      new Scheduled_Build_Process(scheduler, context, progress, server)
+    }
+  }
+}