more build_cluster management: open SSH connections in parallel, but synchronously;
authorwenzelm
Fri, 21 Jul 2023 18:44:29 +0200
changeset 78430 0461fc9d43e8
parent 78429 103a81e60126
child 78431 1ab113f4db74
more build_cluster management: open SSH connections in parallel, but synchronously;
src/Pure/Tools/build_cluster.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/Tools/build_cluster.scala	Fri Jul 21 17:17:28 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala	Fri Jul 21 18:44:29 2023 +0200
@@ -108,15 +108,44 @@
       SSH.print_local(name) + if_proper(rest, ":" + rest)
     }
 
-    def open_ssh_session(options: Options): SSH.Session =
-      SSH.open_session(options, name, port = port, user = user)
+    def message(msg: String): String = "Host " + quote(name) + if_proper(msg, ": " + msg)
   }
 
 
   /* remote sessions */
 
-  class Session(host: Host) extends AutoCloseable {
-    override def close(): Unit = ()
+  def capture_open_session(
+    options: Options,
+    host: Host,
+    progress: Progress = new Progress
+  ): Exn.Result[Session] = {
+    progress.echo(host.message("connect ..."))
+    try {
+      val ssh_options = options ++ host.options
+      val ssh = SSH.open_session(ssh_options, host.name, port = host.port, user = host.user)
+      Exn.Res[Session](new Session(host, ssh))
+    }
+    catch {
+      case exn: Throwable =>
+        progress.echo_error_message(host.message("failed to connect\n" + Exn.message(exn)))
+        Exn.Exn[Session](exn)
+    }
+  }
+
+  final class Session private[Build_Cluster](val host: Host, val ssh: SSH.Session)
+  extends AutoCloseable {
+    override def toString: String = ssh.toString
+
+    def start(): Result = {
+      val res = Process_Result.ok     // FIXME
+      Result(host, res)
+    }
+
+    override def close(): Unit = ssh.close()
+  }
+
+  sealed case class Result(host: Host, process_result: Process_Result) {
+    def ok: Boolean = process_result.ok
   }
 }
 
@@ -130,10 +159,53 @@
 
   override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")")
 
-  progress.echo("Remote hosts:\n" + cat_lines(remote_hosts.map("  " + _)))
+
+  /* SSH sessions */
+
+  private var _sessions = List.empty[Build_Cluster.Session]
+
+  def open(): Unit = synchronized {
+    require(_sessions.isEmpty)
+
+    val attempts =
+      Par_List.map(
+        Build_Cluster.capture_open_session(build_context.build_options, _, progress = progress),
+        remote_hosts, thread = true)
+
+    if (attempts.forall(Exn.the_res.isDefinedAt)) {
+      _sessions = attempts.map(Exn.the_res)
+    }
+    else {
+      for (Exn.Res(session) <- attempts) session.close()
+      error("Failed to connect build cluster")
+    }
+  }
 
-  def start(): Unit = ()
-  def stop(): Unit = ()
+  override def close(): Unit = synchronized {
+    join
+    _sessions.foreach(_.close())
+    _sessions = Nil
+  }
+
+
+  /* workers */
+
+  private var _workers = List.empty[Future[Build_Cluster.Result]]
 
-  override def close(): Unit = ()
+  def start(): Unit = synchronized {
+    require(_sessions.nonEmpty && _workers.isEmpty)
+    _workers = _sessions.map(session =>
+      Future.thread(session.host.message("session")) { session.start() })
+  }
+
+  def join: List[Exn.Result[Build_Cluster.Result]] = synchronized {
+    val res = _workers.map(_.join_result)
+    _workers = Nil
+    res
+  }
+
+
+  /* init */
+
+  open()
 }
--- a/src/Pure/Tools/build_process.scala	Fri Jul 21 17:17:28 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Fri Jul 21 18:44:29 2023 +0200
@@ -881,14 +881,20 @@
 
   protected val log: Logger = Logger.make_system_log(progress, build_options)
 
-  protected def init_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster =
+  protected def init_build_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster =
     new Build_Cluster(build_context, remote_hosts, progress = build_progress)
 
+  protected def exit_build_cluster(build_cluster: Build_Cluster): Boolean = {
+    val results = build_cluster.join
+    build_cluster.close()
+    results.forall({ case Exn.Res(res) => res.ok case _ => false })
+  }
+
   private val _build_cluster =
     try {
       val remote_hosts = build_context.build_hosts.filterNot(_.is_local)
       if (build_context.master && _build_database.isDefined && remote_hosts.nonEmpty) {
-        Some(init_cluster(remote_hosts))
+        Some(init_build_cluster(remote_hosts))
       }
       else None
     }
@@ -1121,7 +1127,7 @@
         }
       }
       finally {
-        _build_cluster.foreach(_.stop())
+        _build_cluster.foreach(exit_build_cluster)
         stop_worker()
         if (build_context.master) stop_build()
       }