clarified signature: delegate policies to Build_Cluster implementation, potentially provided by Build.Engine via Build_Process.open_build_cluster;
--- a/src/Pure/Tools/build_cluster.scala Sat Jul 22 12:11:50 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala Sat Jul 22 13:31:55 2023 +0200
@@ -131,36 +131,65 @@
def options: Options = ssh.options
- def start(): Result = {
- val res = Process_Result.ok // FIXME
- Result(host, res)
+ def start(): Process_Result = {
+ Process_Result.ok // FIXME
}
override def close(): Unit = ssh.close()
}
- sealed case class Result(host: Host, process_result: Process_Result) {
- def ok: Boolean = process_result.ok
+ sealed case class Result(host: Host, process_result: Exn.Result[Process_Result]) {
+ def ok: Boolean =
+ process_result match {
+ case Exn.Res(res) => res.ok
+ case Exn.Exn(_) => false
+ }
+ }
+
+
+ /* build clusters */
+
+ val none: Build_Cluster = new No_Build_Cluster
+
+ def make(build_context: Build.Context, progress: Progress = new Progress): Build_Cluster = {
+ val remote_hosts = build_context.build_hosts.filterNot(_.is_local)
+ if (remote_hosts.isEmpty) none
+ else new Remote_Build_Cluster(build_context, remote_hosts, progress = progress)
}
}
-// class extensible via Build.Engine.build_process() and Build_Process.init_cluster()
-class Build_Cluster(
+// NB: extensible via Build.Engine.build_process() and Build_Process.init_cluster()
+trait Build_Cluster extends AutoCloseable {
+ def open(): Unit = ()
+ def init(): Unit = ()
+ def start(): Unit = ()
+ def active(): Boolean = false
+ def join: List[Build_Cluster.Result] = Nil
+ def stop(): Unit = { join; close() }
+ override def close(): Unit = ()
+}
+
+final class No_Build_Cluster extends Build_Cluster {
+ override def toString: String = "Build_Cluster.none"
+}
+
+class Remote_Build_Cluster(
build_context: Build.Context,
remote_hosts: List[Build_Cluster.Host],
progress: Progress = new Progress
-) extends AutoCloseable {
+) extends Build_Cluster {
require(remote_hosts.nonEmpty && !remote_hosts.exists(_.is_local), "remote hosts required")
- override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")")
+ override def toString: String =
+ remote_hosts.iterator.map(_.name).mkString("Build_Cluster(", ", ", ")")
- /* SSH sessions */
+ /* remote sessions */
private var _sessions = List.empty[Build_Cluster.Session]
- def open(): Unit = synchronized {
- require(_sessions.isEmpty)
+ override def open(): Unit = synchronized {
+ require(_sessions.isEmpty, "build cluster already open")
val attempts =
Par_List.map((host: Build_Cluster.Host) =>
@@ -172,6 +201,7 @@
if (attempts.forall(Exn.the_res.isDefinedAt)) {
_sessions = attempts.map(Exn.the_res)
+ _sessions
}
else {
for (Exn.Res(session) <- attempts) session.close()
@@ -179,31 +209,38 @@
}
}
- override def close(): Unit = synchronized {
- join
- _sessions.foreach(_.close())
- _sessions = Nil
- }
-
/* workers */
- private var _workers = List.empty[Future[Build_Cluster.Result]]
+ private var _workers = List.empty[Future[Process_Result]]
+
+ override def active(): Boolean = synchronized { _workers.nonEmpty }
- def start(): Unit = synchronized {
- require(_sessions.nonEmpty && _workers.isEmpty)
+ override def start(): Unit = synchronized {
+ require(_sessions.nonEmpty, "build cluster not yet open")
+ require(_workers.isEmpty, "build cluster already active")
+
_workers = _sessions.map(session =>
Future.thread(session.host.message("session")) { session.start() })
}
- def join: List[Exn.Result[Build_Cluster.Result]] = synchronized {
- val res = _workers.map(_.join_result)
- _workers = Nil
- res
+ override def join: List[Build_Cluster.Result] = synchronized {
+ if (_workers.isEmpty) Nil
+ else {
+ assert(_sessions.length == _workers.length)
+ for ((session, worker) <- _sessions zip _workers)
+ yield Build_Cluster.Result(session.host, worker.join_result)
+ }
}
- /* init */
+ /* close */
- open()
+ override def close(): Unit = synchronized {
+ _workers.foreach(_.cancel())
+ join
+ _sessions.foreach(_.close())
+ _workers = Nil
+ _sessions = Nil
+ }
}
--- a/src/Pure/Tools/build_process.scala Sat Jul 22 12:11:50 2023 +0200
+++ b/src/Pure/Tools/build_process.scala Sat Jul 22 13:31:55 2023 +0200
@@ -881,22 +881,16 @@
protected val log: Logger = Logger.make_system_log(progress, build_options)
- protected def init_build_cluster(remote_hosts: List[Build_Cluster.Host]): Build_Cluster =
- new Build_Cluster(build_context, remote_hosts, progress = build_progress)
-
- protected def exit_build_cluster(build_cluster: Build_Cluster): Boolean = {
- val results = build_cluster.join
- build_cluster.close()
- results.forall({ case Exn.Res(res) => res.ok case _ => false })
+ protected def open_build_cluster(): Build_Cluster = {
+ val build_cluster = Build_Cluster.make(build_context, progress = build_progress)
+ build_cluster.open()
+ build_cluster
}
private val _build_cluster =
try {
- val remote_hosts = build_context.build_hosts.filterNot(_.is_local)
- if (build_context.master && _build_database.isDefined && remote_hosts.nonEmpty) {
- Some(init_build_cluster(remote_hosts))
- }
- else None
+ if (build_context.master && _build_database.isDefined) open_build_cluster()
+ else Build_Cluster.none
}
catch { case exn: Throwable => close(); throw exn }
@@ -904,7 +898,7 @@
Option(_database_server).flatten.foreach(_.close())
Option(_build_database).flatten.foreach(_.close())
Option(_host_database).flatten.foreach(_.close())
- Option(_build_cluster).flatten.foreach(_.close())
+ Option(_build_cluster).foreach(_.close())
progress match {
case db_progress: Database_Progress =>
db_progress.exit(close = true)
@@ -1076,6 +1070,7 @@
def run(): Map[String, Process_Result] = {
if (build_context.master) {
+ _build_cluster.init()
synchronized_database("Build_Process.init") { _state = init_state(_state) }
}
@@ -1102,9 +1097,9 @@
else {
if (build_context.master) start_build()
start_worker()
- _build_cluster.foreach(_.start())
+ _build_cluster.start()
- if (build_context.master && !build_context.worker_active && _build_cluster.isDefined) {
+ if (build_context.master && !build_context.worker_active && _build_cluster.active()) {
build_progress.echo("Waiting for external workers ...")
}
@@ -1127,7 +1122,7 @@
}
}
finally {
- _build_cluster.foreach(exit_build_cluster)
+ _build_cluster.stop()
stop_worker()
if (build_context.master) stop_build()
}