# HG changeset patch # User wenzelm # Date 1689957869 -7200 # Node ID 0461fc9d43e81cfb345888be3cf5bad88d25b308 # Parent 103a81e60126af8a7e10c7ac70f1b023a50b003a more build_cluster management: open SSH connections in parallel, but synchronously; diff -r 103a81e60126 -r 0461fc9d43e8 src/Pure/Tools/build_cluster.scala --- a/src/Pure/Tools/build_cluster.scala Fri Jul 21 17:17:28 2023 +0200 +++ b/src/Pure/Tools/build_cluster.scala Fri Jul 21 18:44:29 2023 +0200 @@ -108,15 +108,44 @@ SSH.print_local(name) + if_proper(rest, ":" + rest) } - def open_ssh_session(options: Options): SSH.Session = - SSH.open_session(options, name, port = port, user = user) + def message(msg: String): String = "Host " + quote(name) + if_proper(msg, ": " + msg) } /* remote sessions */ - class Session(host: Host) extends AutoCloseable { - override def close(): Unit = () + def capture_open_session( + options: Options, + host: Host, + progress: Progress = new Progress + ): Exn.Result[Session] = { + progress.echo(host.message("connect ...")) + try { + val ssh_options = options ++ host.options + val ssh = SSH.open_session(ssh_options, host.name, port = host.port, user = host.user) + Exn.Res[Session](new Session(host, ssh)) + } + catch { + case exn: Throwable => + progress.echo_error_message(host.message("failed to connect\n" + Exn.message(exn))) + Exn.Exn[Session](exn) + } + } + + final class Session private[Build_Cluster](val host: Host, val ssh: SSH.Session) + extends AutoCloseable { + override def toString: String = ssh.toString + + def start(): Result = { + val res = Process_Result.ok // FIXME + Result(host, res) + } + + override def close(): Unit = ssh.close() + } + + sealed case class Result(host: Host, process_result: Process_Result) { + def ok: Boolean = process_result.ok } } @@ -130,10 +159,53 @@ override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")") - progress.echo("Remote hosts:\n" + cat_lines(remote_hosts.map(" " + _))) + + /* SSH sessions */ + + private var _sessions = List.empty[Build_Cluster.Session] + + def open(): Unit = synchronized { + require(_sessions.isEmpty) + + val attempts = + Par_List.map( + Build_Cluster.capture_open_session(build_context.build_options, _, progress = progress), + remote_hosts, thread = true) + + if (attempts.forall(Exn.the_res.isDefinedAt)) { + _sessions = attempts.map(Exn.the_res) + } + else { + for (Exn.Res(session) <- attempts) session.close() + error("Failed to connect build cluster") + } + } - def start(): Unit = () - def stop(): Unit = () + override def close(): Unit = synchronized { + join + _sessions.foreach(_.close()) + _sessions = Nil + } + + + /* workers */ + + private var _workers = List.empty[Future[Build_Cluster.Result]] - override def close(): Unit = () + def start(): Unit = synchronized { + require(_sessions.nonEmpty && _workers.isEmpty) + _workers = _sessions.map(session => + Future.thread(session.host.message("session")) { session.start() }) + } + + def join: List[Exn.Result[Build_Cluster.Result]] = synchronized { + val res = _workers.map(_.join_result) + _workers = Nil + res + } + + + /* init */ + + open() } diff -r 103a81e60126 -r 0461fc9d43e8 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Fri Jul 21 17:17:28 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Fri Jul 21 18:44:29 2023 +0200 @@ -881,14 +881,20 @@ protected val log: Logger = Logger.make_system_log(progress, build_options) - protected def init_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster = + protected def init_build_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster = new Build_Cluster(build_context, remote_hosts, progress = build_progress) + protected def exit_build_cluster(build_cluster: Build_Cluster): Boolean = { + val results = build_cluster.join + build_cluster.close() + results.forall({ case Exn.Res(res) => res.ok case _ => false }) + } + private val _build_cluster = try { val remote_hosts = build_context.build_hosts.filterNot(_.is_local) if (build_context.master && _build_database.isDefined && remote_hosts.nonEmpty) { - Some(init_cluster(remote_hosts)) + Some(init_build_cluster(remote_hosts)) } else None } @@ -1121,7 +1127,7 @@ } } finally { - _build_cluster.foreach(_.stop()) + _build_cluster.foreach(exit_build_cluster) stop_worker() if (build_context.master) stop_build() }