clarified and tuned timing estimation;
authorFabian Huch <huch@in.tum.de>
Fri, 24 Nov 2023 17:05:49 +0100
changeset 79037 1b3a6cc4a2bf
parent 79036 be42ba4b4672
child 79038 cd7c1acc9cf2
clarified and tuned timing estimation;
src/Pure/Tools/build_schedule.scala
--- a/src/Pure/Tools/build_schedule.scala	Fri Nov 24 14:01:16 2023 +0100
+++ b/src/Pure/Tools/build_schedule.scala	Fri Nov 24 17:05:49 2023 +0100
@@ -105,30 +105,53 @@
       }
     }
 
-    def threads_factor(data: Timing_Entries, divided: Int, divisor: Int): Double =
-      (estimate_threads(data, divided), estimate_threads(data, divisor)) match {
-        case (Some(dividend), Some(divisor)) => dividend.ms.toDouble / divisor.ms
-        case _ => divided.toDouble / divisor
-      }
+    private def unify_hosts(job_name: String, on_host: String): List[(Int, Time)] = {
+      def unify(hostname: String, data: Timing_Entries) =
+        data.mean_time.scale(hostname_factor(hostname, on_host))
+
+      for {
+        data <- data.by_job.get(job_name).toList
+        (threads, data) <- data.by_threads
+        entries = data.by_hostname.toList.map(unify)
+      } yield threads -> Timing_Data.median_time(entries)
+    }
+
+    def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = {
+      for {
+        data <- data.by_job.get(job_name)
+        data <- data.by_hostname.get(hostname)
+        entries = data.by_threads.toList.map((threads, data) => threads -> data.mean_time)
+        time <- data.by_threads.get(threads).map(_.mean_time).orElse(
+          if (data.by_threads.size < 2) None else Some(approximate_threads(entries, threads)))
+      } yield time
+    }
+
+    def global_threads_factor(from: Int, to: Int): Double = {
+      def median(xs: Iterable[Double]): Double = xs.toList.sorted.apply(xs.size / 2)
 
-    def estimate_threads(data: Timing_Entries, threads: Int): Option[Time] = {
-      val approximations =
-        data.by_job.values.filter(_.by_threads.size > 1).map { data =>
-          val (ref_hostname, _) =
-            data.by_hostname.toList.flatMap((hostname, data) =>
-              data.by_threads.keys.map(hostname -> _)).minBy((_, n) => Math.abs(n - threads))
+      val estimates =
+        for {
+          (hostname, data) <- data.by_hostname
+          job_name <- data.by_job.keys
+          from_time <- estimate_threads(job_name, hostname, from)
+          to_time <- estimate_threads(job_name, hostname, to)
+        } yield from_time.ms.toDouble / to_time.ms
 
-          def unify_hosts(data: Timing_Entries): List[Time] =
-            data.by_hostname.toList.map((hostname, data) =>
-              data.mean_time.scale(hostname_factor(hostname, ref_hostname)))
+      if (estimates.nonEmpty) median(estimates)
+      else {
+        // unify hosts
+        val estimates =
+          for {
+            (job_name, data) <- data.by_job
+            hostname = data.by_hostname.keys.head
+            entries = unify_hosts(job_name, hostname)
+            if entries.length > 1
+          } yield
+            approximate_threads(entries, from).ms.toDouble / approximate_threads(entries, to).ms
 
-          val entries =
-            data.by_threads.toList.map((threads, data) =>
-              threads -> Timing_Data.median_time(unify_hosts(data))).sortBy(_._1)
-
-          approximate_threads(entries, threads)
-        }
-      if (approximations.isEmpty) None else Some(Timing_Data.mean_time(approximations))
+        if (estimates.nonEmpty) median(estimates)
+        else from.toDouble / to.toDouble
+      }
     }
 
     def estimate(job_name: String, hostname: String, threads: Int): Time =
@@ -137,25 +160,28 @@
         case Some(data) =>
           data.by_threads.get(threads) match {
             case None => // interpolate threads
-              data.by_hostname.get(hostname).flatMap(
-                estimate_threads(_, threads)).getOrElse {
-                  // per machine, try to approximate config for threads
-                  val approximated =
-                    data.by_hostname.toList.flatMap((hostname1, data) =>
-                      estimate_threads(data, threads).map(time =>
-                        time.scale(hostname_factor(hostname1, hostname))))
+              estimate_threads(job_name, hostname, threads).getOrElse {
+                // per machine, try to approximate config for threads
+                val approximated =
+                  for {
+                    hostname1 <- data.by_hostname.keys
+                    estimate <- estimate_threads(job_name, hostname1, threads)
+                    factor = hostname_factor(hostname1, hostname)
+                  } yield estimate.scale(factor)
 
-                  if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
+                if (approximated.nonEmpty) Timing_Data.mean_time(approximated)
+                else {
+                  // no single machine where config can be approximated, unify data points
+                  val unified_entries = unify_hosts(job_name, hostname)
+
+                  if (unified_entries.length > 1) approximate_threads(unified_entries, threads)
                   else {
-                    // no machine where config can be approximated
-                    estimate_threads(data, threads).getOrElse {
-                      // only single data point, use global curve to approximate
-                      val global_factor =
-                        threads_factor(this.data, data.by_threads.keys.head, threads)
-                      data.by_threads.values.head.mean_time.scale(global_factor)
-                    }
+                    // only single data point, use global curve to approximate
+                    val (job_threads, job_time) = unified_entries.head
+                    job_time.scale(global_threads_factor(job_threads, threads))
                   }
                 }
+              }
 
             case Some(data) => // time for job/thread exists, interpolate machine
               data.by_hostname.get(hostname).map(_.mean_time).getOrElse {