more build_cluster management: open SSH connections in parallel, but synchronously;
--- a/src/Pure/Tools/build_cluster.scala Fri Jul 21 17:17:28 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala Fri Jul 21 18:44:29 2023 +0200
@@ -108,15 +108,44 @@
SSH.print_local(name) + if_proper(rest, ":" + rest)
}
- def open_ssh_session(options: Options): SSH.Session =
- SSH.open_session(options, name, port = port, user = user)
+ def message(msg: String): String = "Host " + quote(name) + if_proper(msg, ": " + msg)
}
/* remote sessions */
- class Session(host: Host) extends AutoCloseable {
- override def close(): Unit = ()
+ def capture_open_session(
+ options: Options,
+ host: Host,
+ progress: Progress = new Progress
+ ): Exn.Result[Session] = {
+ progress.echo(host.message("connect ..."))
+ try {
+ val ssh_options = options ++ host.options
+ val ssh = SSH.open_session(ssh_options, host.name, port = host.port, user = host.user)
+ Exn.Res[Session](new Session(host, ssh))
+ }
+ catch {
+ case exn: Throwable =>
+ progress.echo_error_message(host.message("failed to connect\n" + Exn.message(exn)))
+ Exn.Exn[Session](exn)
+ }
+ }
+
+ final class Session private[Build_Cluster](val host: Host, val ssh: SSH.Session)
+ extends AutoCloseable {
+ override def toString: String = ssh.toString
+
+ def start(): Result = {
+ val res = Process_Result.ok // FIXME
+ Result(host, res)
+ }
+
+ override def close(): Unit = ssh.close()
+ }
+
+ sealed case class Result(host: Host, process_result: Process_Result) {
+ def ok: Boolean = process_result.ok
}
}
@@ -130,10 +159,53 @@
override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")")
- progress.echo("Remote hosts:\n" + cat_lines(remote_hosts.map(" " + _)))
+
+ /* SSH sessions */
+
+ private var _sessions = List.empty[Build_Cluster.Session]
+
+ def open(): Unit = synchronized {
+ require(_sessions.isEmpty)
+
+ val attempts =
+ Par_List.map(
+ Build_Cluster.capture_open_session(build_context.build_options, _, progress = progress),
+ remote_hosts, thread = true)
+
+ if (attempts.forall(Exn.the_res.isDefinedAt)) {
+ _sessions = attempts.map(Exn.the_res)
+ }
+ else {
+ for (Exn.Res(session) <- attempts) session.close()
+ error("Failed to connect build cluster")
+ }
+ }
- def start(): Unit = ()
- def stop(): Unit = ()
+ override def close(): Unit = synchronized {
+ join
+ _sessions.foreach(_.close())
+ _sessions = Nil
+ }
+
+
+ /* workers */
+
+ private var _workers = List.empty[Future[Build_Cluster.Result]]
- override def close(): Unit = ()
+ def start(): Unit = synchronized {
+ require(_sessions.nonEmpty && _workers.isEmpty)
+ _workers = _sessions.map(session =>
+ Future.thread(session.host.message("session")) { session.start() })
+ }
+
+ def join: List[Exn.Result[Build_Cluster.Result]] = synchronized {
+ val res = _workers.map(_.join_result)
+ _workers = Nil
+ res
+ }
+
+
+ /* init */
+
+ open()
}
--- a/src/Pure/Tools/build_process.scala Fri Jul 21 17:17:28 2023 +0200
+++ b/src/Pure/Tools/build_process.scala Fri Jul 21 18:44:29 2023 +0200
@@ -881,14 +881,20 @@
protected val log: Logger = Logger.make_system_log(progress, build_options)
- protected def init_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster =
+ protected def init_build_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster =
new Build_Cluster(build_context, remote_hosts, progress = build_progress)
+ protected def exit_build_cluster(build_cluster: Build_Cluster): Boolean = {
+ val results = build_cluster.join
+ build_cluster.close()
+ results.forall({ case Exn.Res(res) => res.ok case _ => false })
+ }
+
private val _build_cluster =
try {
val remote_hosts = build_context.build_hosts.filterNot(_.is_local)
if (build_context.master && _build_database.isDefined && remote_hosts.nonEmpty) {
- Some(init_cluster(remote_hosts))
+ Some(init_build_cluster(remote_hosts))
}
else None
}
@@ -1121,7 +1127,7 @@
}
}
finally {
- _build_cluster.foreach(_.stop())
+ _build_cluster.foreach(exit_build_cluster)
stop_worker()
if (build_context.master) stop_build()
}