clarified signature: delegate policies to Build_Cluster implementation, potentially provided by Build.Engine via Build_Process.open_build_cluster;
authorwenzelm
Sat, 22 Jul 2023 13:31:55 +0200
changeset 78434 b4ec7ea073da
parent 78433 872f10c80810
child 78435 a623cb346b4a
clarified signature: delegate policies to Build_Cluster implementation, potentially provided by Build.Engine via Build_Process.open_build_cluster;
src/Pure/Tools/build_cluster.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/Tools/build_cluster.scala	Sat Jul 22 12:11:50 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala	Sat Jul 22 13:31:55 2023 +0200
@@ -131,36 +131,65 @@
 
     def options: Options = ssh.options
 
-    def start(): Result = {
-      val res = Process_Result.ok     // FIXME
-      Result(host, res)
+    def start(): Process_Result = {
+      Process_Result.ok  // FIXME
     }
 
     override def close(): Unit = ssh.close()
   }
 
-  sealed case class Result(host: Host, process_result: Process_Result) {
-    def ok: Boolean = process_result.ok
+  sealed case class Result(host: Host, process_result: Exn.Result[Process_Result]) {
+    def ok: Boolean =
+      process_result match {
+        case Exn.Res(res) => res.ok
+        case Exn.Exn(_) => false
+      }
+  }
+
+
+  /* build clusters */
+
+  val none: Build_Cluster = new No_Build_Cluster
+
+  def make(build_context: Build.Context, progress: Progress = new Progress): Build_Cluster = {
+    val remote_hosts = build_context.build_hosts.filterNot(_.is_local)
+    if (remote_hosts.isEmpty) none
+    else new Remote_Build_Cluster(build_context, remote_hosts, progress = progress)
   }
 }
 
-// class extensible via Build.Engine.build_process() and Build_Process.init_cluster()
-class Build_Cluster(
+// NB: extensible via Build.Engine.build_process() and Build_Process.init_cluster()
+trait Build_Cluster extends AutoCloseable {
+  def open(): Unit = ()
+  def init(): Unit = ()
+  def start(): Unit = ()
+  def active(): Boolean = false
+  def join: List[Build_Cluster.Result] = Nil
+  def stop(): Unit = { join; close() }
+  override def close(): Unit = ()
+}
+
+final class No_Build_Cluster extends Build_Cluster {
+  override def toString: String = "Build_Cluster.none"
+}
+
+class Remote_Build_Cluster(
   build_context: Build.Context,
   remote_hosts: List[Build_Cluster.Host],
   progress: Progress = new Progress
-) extends AutoCloseable {
+) extends Build_Cluster {
   require(remote_hosts.nonEmpty && !remote_hosts.exists(_.is_local), "remote hosts required")
 
-  override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")")
+  override def toString: String =
+    remote_hosts.iterator.map(_.name).mkString("Build_Cluster(", ", ", ")")
 
 
-  /* SSH sessions */
+  /* remote sessions */
 
   private var _sessions = List.empty[Build_Cluster.Session]
 
-  def open(): Unit = synchronized {
-    require(_sessions.isEmpty)
+  override def open(): Unit = synchronized {
+    require(_sessions.isEmpty, "build cluster already open")
 
     val attempts =
       Par_List.map((host: Build_Cluster.Host) =>
@@ -172,6 +201,7 @@
 
     if (attempts.forall(Exn.the_res.isDefinedAt)) {
       _sessions = attempts.map(Exn.the_res)
+      _sessions
     }
     else {
       for (Exn.Res(session) <- attempts) session.close()
@@ -179,31 +209,38 @@
     }
   }
 
-  override def close(): Unit = synchronized {
-    join
-    _sessions.foreach(_.close())
-    _sessions = Nil
-  }
-
 
   /* workers */
 
-  private var _workers = List.empty[Future[Build_Cluster.Result]]
+  private var _workers = List.empty[Future[Process_Result]]
+
+  override def active(): Boolean = synchronized { _workers.nonEmpty }
 
-  def start(): Unit = synchronized {
-    require(_sessions.nonEmpty && _workers.isEmpty)
+  override def start(): Unit = synchronized {
+    require(_sessions.nonEmpty, "build cluster not yet open")
+    require(_workers.isEmpty, "build cluster already active")
+
     _workers = _sessions.map(session =>
       Future.thread(session.host.message("session")) { session.start() })
   }
 
-  def join: List[Exn.Result[Build_Cluster.Result]] = synchronized {
-    val res = _workers.map(_.join_result)
-    _workers = Nil
-    res
+  override def join: List[Build_Cluster.Result] = synchronized {
+    if (_workers.isEmpty) Nil
+    else {
+      assert(_sessions.length == _workers.length)
+      for ((session, worker) <- _sessions zip _workers)
+        yield Build_Cluster.Result(session.host, worker.join_result)
+    }
   }
 
 
-  /* init */
+  /* close */
 
-  open()
+  override def close(): Unit = synchronized {
+    _workers.foreach(_.cancel())
+    join
+    _sessions.foreach(_.close())
+    _workers = Nil
+    _sessions = Nil
+  }
 }
--- a/src/Pure/Tools/build_process.scala	Sat Jul 22 12:11:50 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Sat Jul 22 13:31:55 2023 +0200
@@ -881,22 +881,16 @@
 
   protected val log: Logger = Logger.make_system_log(progress, build_options)
 
-  protected def init_build_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster =
-    new Build_Cluster(build_context, remote_hosts, progress = build_progress)
-
-  protected def exit_build_cluster(build_cluster: Build_Cluster): Boolean = {
-    val results = build_cluster.join
-    build_cluster.close()
-    results.forall({ case Exn.Res(res) => res.ok case _ => false })
+  protected def open_build_cluster(): Build_Cluster = {
+    val build_cluster = Build_Cluster.make(build_context, progress = build_progress)
+    build_cluster.open()
+    build_cluster
   }
 
   private val _build_cluster =
     try {
-      val remote_hosts = build_context.build_hosts.filterNot(_.is_local)
-      if (build_context.master && _build_database.isDefined && remote_hosts.nonEmpty) {
-        Some(init_build_cluster(remote_hosts))
-      }
-      else None
+      if (build_context.master && _build_database.isDefined) open_build_cluster()
+      else Build_Cluster.none
     }
     catch { case exn: Throwable => close(); throw exn }
 
@@ -904,7 +898,7 @@
     Option(_database_server).flatten.foreach(_.close())
     Option(_build_database).flatten.foreach(_.close())
     Option(_host_database).flatten.foreach(_.close())
-    Option(_build_cluster).flatten.foreach(_.close())
+    Option(_build_cluster).foreach(_.close())
     progress match {
       case db_progress: Database_Progress =>
         db_progress.exit(close = true)
@@ -1076,6 +1070,7 @@
 
   def run(): Map[String, Process_Result] = {
     if (build_context.master) {
+      _build_cluster.init()
       synchronized_database("Build_Process.init") { _state = init_state(_state) }
     }
 
@@ -1102,9 +1097,9 @@
     else {
       if (build_context.master) start_build()
       start_worker()
-      _build_cluster.foreach(_.start())
+      _build_cluster.start()
 
-      if (build_context.master && !build_context.worker_active && _build_cluster.isDefined) {
+      if (build_context.master && !build_context.worker_active && _build_cluster.active()) {
         build_progress.echo("Waiting for external workers ...")
       }
 
@@ -1127,7 +1122,7 @@
         }
       }
       finally {
-        _build_cluster.foreach(exit_build_cluster)
+        _build_cluster.stop()
         stop_worker()
         if (build_context.master) stop_build()
       }