src/Pure/Tools/build_cluster.scala
changeset 78430 0461fc9d43e8
parent 78427 5b7d1cb073db
child 78433 872f10c80810
--- a/src/Pure/Tools/build_cluster.scala	Fri Jul 21 17:17:28 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala	Fri Jul 21 18:44:29 2023 +0200
@@ -108,15 +108,44 @@
       SSH.print_local(name) + if_proper(rest, ":" + rest)
     }
 
-    def open_ssh_session(options: Options): SSH.Session =
-      SSH.open_session(options, name, port = port, user = user)
+    def message(msg: String): String = "Host " + quote(name) + if_proper(msg, ": " + msg)
   }
 
 
   /* remote sessions */
 
-  class Session(host: Host) extends AutoCloseable {
-    override def close(): Unit = ()
+  def capture_open_session(
+    options: Options,
+    host: Host,
+    progress: Progress = new Progress
+  ): Exn.Result[Session] = {
+    progress.echo(host.message("connect ..."))
+    try {
+      val ssh_options = options ++ host.options
+      val ssh = SSH.open_session(ssh_options, host.name, port = host.port, user = host.user)
+      Exn.Res[Session](new Session(host, ssh))
+    }
+    catch {
+      case exn: Throwable =>
+        progress.echo_error_message(host.message("failed to connect\n" + Exn.message(exn)))
+        Exn.Exn[Session](exn)
+    }
+  }
+
+  final class Session private[Build_Cluster](val host: Host, val ssh: SSH.Session)
+  extends AutoCloseable {
+    override def toString: String = ssh.toString
+
+    def start(): Result = {
+      val res = Process_Result.ok     // FIXME
+      Result(host, res)
+    }
+
+    override def close(): Unit = ssh.close()
+  }
+
+  sealed case class Result(host: Host, process_result: Process_Result) {
+    def ok: Boolean = process_result.ok
   }
 }
 
@@ -130,10 +159,53 @@
 
   override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")")
 
-  progress.echo("Remote hosts:\n" + cat_lines(remote_hosts.map("  " + _)))
+
+  /* SSH sessions */
+
+  private var _sessions = List.empty[Build_Cluster.Session]
+
+  def open(): Unit = synchronized {
+    require(_sessions.isEmpty)
+
+    val attempts =
+      Par_List.map(
+        Build_Cluster.capture_open_session(build_context.build_options, _, progress = progress),
+        remote_hosts, thread = true)
+
+    if (attempts.forall(Exn.the_res.isDefinedAt)) {
+      _sessions = attempts.map(Exn.the_res)
+    }
+    else {
+      for (Exn.Res(session) <- attempts) session.close()
+      error("Failed to connect build cluster")
+    }
+  }
 
-  def start(): Unit = ()
-  def stop(): Unit = ()
+  override def close(): Unit = synchronized {
+    join
+    _sessions.foreach(_.close())
+    _sessions = Nil
+  }
+
+
+  /* workers */
+
+  private var _workers = List.empty[Future[Build_Cluster.Result]]
 
-  override def close(): Unit = ()
+  def start(): Unit = synchronized {
+    require(_sessions.nonEmpty && _workers.isEmpty)
+    _workers = _sessions.map(session =>
+      Future.thread(session.host.message("session")) { session.start() })
+  }
+
+  def join: List[Exn.Result[Build_Cluster.Result]] = synchronized {
+    val res = _workers.map(_.join_result)
+    _workers = Nil
+    res
+  }
+
+
+  /* init */
+
+  open()
 }