# HG changeset patch # User wenzelm # Date 1690025515 -7200 # Node ID b4ec7ea073da7f8e03ccae60316ddcf94923e05c # Parent 872f10c808103fb53b77a5d3307e78f9cc4a79be clarified signature: delegate policies to Build_Cluster implementation, potentially provided by Build.Engine via Build_Process.open_build_cluster; diff -r 872f10c80810 -r b4ec7ea073da src/Pure/Tools/build_cluster.scala --- a/src/Pure/Tools/build_cluster.scala Sat Jul 22 12:11:50 2023 +0200 +++ b/src/Pure/Tools/build_cluster.scala Sat Jul 22 13:31:55 2023 +0200 @@ -131,36 +131,65 @@ def options: Options = ssh.options - def start(): Result = { - val res = Process_Result.ok // FIXME - Result(host, res) + def start(): Process_Result = { + Process_Result.ok // FIXME } override def close(): Unit = ssh.close() } - sealed case class Result(host: Host, process_result: Process_Result) { - def ok: Boolean = process_result.ok + sealed case class Result(host: Host, process_result: Exn.Result[Process_Result]) { + def ok: Boolean = + process_result match { + case Exn.Res(res) => res.ok + case Exn.Exn(_) => false + } + } + + + /* build clusters */ + + val none: Build_Cluster = new No_Build_Cluster + + def make(build_context: Build.Context, progress: Progress = new Progress): Build_Cluster = { + val remote_hosts = build_context.build_hosts.filterNot(_.is_local) + if (remote_hosts.isEmpty) none + else new Remote_Build_Cluster(build_context, remote_hosts, progress = progress) } } -// class extensible via Build.Engine.build_process() and Build_Process.init_cluster() -class Build_Cluster( +// NB: extensible via Build.Engine.build_process() and Build_Process.init_cluster() +trait Build_Cluster extends AutoCloseable { + def open(): Unit = () + def init(): Unit = () + def start(): Unit = () + def active(): Boolean = false + def join: List[Build_Cluster.Result] = Nil + def stop(): Unit = { join; close() } + override def close(): Unit = () +} + +final class No_Build_Cluster extends Build_Cluster { + override def toString: String = "Build_Cluster.none" +} + +class Remote_Build_Cluster( build_context: Build.Context, remote_hosts: List[Build_Cluster.Host], progress: Progress = new Progress -) extends AutoCloseable { +) extends Build_Cluster { require(remote_hosts.nonEmpty && !remote_hosts.exists(_.is_local), "remote hosts required") - override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")") + override def toString: String = + remote_hosts.iterator.map(_.name).mkString("Build_Cluster(", ", ", ")") - /* SSH sessions */ + /* remote sessions */ private var _sessions = List.empty[Build_Cluster.Session] - def open(): Unit = synchronized { - require(_sessions.isEmpty) + override def open(): Unit = synchronized { + require(_sessions.isEmpty, "build cluster already open") val attempts = Par_List.map((host: Build_Cluster.Host) => @@ -172,6 +201,7 @@ if (attempts.forall(Exn.the_res.isDefinedAt)) { _sessions = attempts.map(Exn.the_res) + _sessions } else { for (Exn.Res(session) <- attempts) session.close() @@ -179,31 +209,38 @@ } } - override def close(): Unit = synchronized { - join - _sessions.foreach(_.close()) - _sessions = Nil - } - /* workers */ - private var _workers = List.empty[Future[Build_Cluster.Result]] + private var _workers = List.empty[Future[Process_Result]] + + override def active(): Boolean = synchronized { _workers.nonEmpty } - def start(): Unit = synchronized { - require(_sessions.nonEmpty && _workers.isEmpty) + override def start(): Unit = synchronized { + require(_sessions.nonEmpty, "build cluster not yet open") + require(_workers.isEmpty, "build cluster already active") + _workers = _sessions.map(session => Future.thread(session.host.message("session")) { session.start() }) } - def join: List[Exn.Result[Build_Cluster.Result]] = synchronized { - val res = _workers.map(_.join_result) - _workers = Nil - res + override def join: List[Build_Cluster.Result] = synchronized { + if (_workers.isEmpty) Nil + else { + assert(_sessions.length == _workers.length) + for ((session, worker) <- _sessions zip _workers) + yield Build_Cluster.Result(session.host, worker.join_result) + } } - /* init */ + /* close */ - open() + override def close(): Unit = synchronized { + _workers.foreach(_.cancel()) + join + _sessions.foreach(_.close()) + _workers = Nil + _sessions = Nil + } } diff -r 872f10c80810 -r b4ec7ea073da src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sat Jul 22 12:11:50 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Sat Jul 22 13:31:55 2023 +0200 @@ -881,22 +881,16 @@ protected val log: Logger = Logger.make_system_log(progress, build_options) - protected def init_build_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster = - new Build_Cluster(build_context, remote_hosts, progress = build_progress) - - protected def exit_build_cluster(build_cluster: Build_Cluster): Boolean = { - val results = build_cluster.join - build_cluster.close() - results.forall({ case Exn.Res(res) => res.ok case _ => false }) + protected def open_build_cluster(): Build_Cluster = { + val build_cluster = Build_Cluster.make(build_context, progress = build_progress) + build_cluster.open() + build_cluster } private val _build_cluster = try { - val remote_hosts = build_context.build_hosts.filterNot(_.is_local) - if (build_context.master && _build_database.isDefined && remote_hosts.nonEmpty) { - Some(init_build_cluster(remote_hosts)) - } - else None + if (build_context.master && _build_database.isDefined) open_build_cluster() + else Build_Cluster.none } catch { case exn: Throwable => close(); throw exn } @@ -904,7 +898,7 @@ Option(_database_server).flatten.foreach(_.close()) Option(_build_database).flatten.foreach(_.close()) Option(_host_database).flatten.foreach(_.close()) - Option(_build_cluster).flatten.foreach(_.close()) + Option(_build_cluster).foreach(_.close()) progress match { case db_progress: Database_Progress => db_progress.exit(close = true) @@ -1076,6 +1070,7 @@ def run(): Map[String, Process_Result] = { if (build_context.master) { + _build_cluster.init() synchronized_database("Build_Process.init") { _state = init_state(_state) } } @@ -1102,9 +1097,9 @@ else { if (build_context.master) start_build() start_worker() - _build_cluster.foreach(_.start()) + _build_cluster.start() - if (build_context.master && !build_context.worker_active && _build_cluster.isDefined) { + if (build_context.master && !build_context.worker_active && _build_cluster.active()) { build_progress.echo("Waiting for external workers ...") } @@ -1127,7 +1122,7 @@ } } finally { - _build_cluster.foreach(exit_build_cluster) + _build_cluster.stop() stop_worker() if (build_context.master) stop_build() }