--- a/src/Pure/Tools/build_cluster.scala Sun Jul 23 19:20:29 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala Sun Jul 23 19:21:54 2023 +0200
@@ -145,22 +145,16 @@
val remote_isabelle_home: Path = remote_root + Path.explode("isabelle")
val remote_afp: Path = remote_isabelle_home + Path.explode("AFP")
- def remote_isabelle: Other_Isabelle =
+ lazy val remote_isabelle: Other_Isabelle =
Other_Isabelle(remote_isabelle_home, isabelle_identifier = remote_identifier, ssh = ssh)
- def init(): Unit = {
- Exn.release(
- progress.capture(
- Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home,
- afp_root = afp_root, purge_heaps = true, preserve_jars = true),
- msg = host.message("sync ..."),
- err = host.err_message("failed to sync")))
+ def sync(): Other_Isabelle = {
+ Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home,
+ afp_root = afp_root, purge_heaps = true, preserve_jars = true)
+ remote_isabelle
+ }
- Exn.release(
- progress.capture(remote_isabelle.init(),
- msg = host.message("init ..."),
- err = host.err_message("failed to init")))
- }
+ def init(): Unit = remote_isabelle.init()
def start(): Process_Result = {
Process_Result.ok // FIXME
@@ -170,11 +164,8 @@
}
class Result(val host: Host, val process_result: Exn.Result[Process_Result]) {
- def ok: Boolean =
- process_result match {
- case Exn.Res(res) => res.ok
- case Exn.Exn(_) => false
- }
+ def rc: Int = Process_Result.RC(process_result)
+ def ok: Boolean = rc == Process_Result.RC.ok
}
@@ -193,13 +184,17 @@
// NB: extensible via Build.Engine.build_process() and Build_Process.init_cluster()
trait Build_Cluster extends AutoCloseable {
+ def rc: Int = Process_Result.RC.ok
+ def ok: Boolean = rc == Process_Result.RC.ok
+ def return_code(rc: Int): Unit = ()
+ def return_code(res: Process_Result): Unit = return_code(res.rc)
+ def return_code(exn: Throwable): Unit = return_code(Process_Result.RC(exn))
def open(): Unit = ()
def init(): Unit = ()
def start(): Unit = ()
def active(): Boolean = false
def join: List[Build_Cluster.Result] = Nil
def stop(): Unit = { join; close() }
- def rc: Int = Process_Result.RC.ok
override def close(): Unit = ()
}
@@ -214,7 +209,21 @@
remote_hosts.iterator.map(_.name).mkString("Build_Cluster(", ", ", ")")
- /* remote sessions */
+ /* cumulative return code */
+
+ private var _rc: Int = Process_Result.RC.ok
+
+ override def rc: Int = synchronized { _rc }
+ override def return_code(rc: Int): Unit = synchronized { _rc = Process_Result.RC.merge(_rc, rc) }
+
+ def capture[A](host: Build_Cluster.Host, op: String)(
+ e: => A,
+ msg: String = host.message(op + " ..."),
+ err: Throwable => String = { exn => return_code(exn); host.message("failed to " + op) }
+ ): Exn.Result[A] = progress.capture(e, msg = msg, err = err)
+
+
+ /* open remote sessions */
private var _sessions = List.empty[Build_Cluster.Session]
@@ -223,10 +232,7 @@
val attempts =
Par_List.map((host: Build_Cluster.Host) =>
- progress.capture(
- host.open_session(build_context.build_options, progress = progress),
- msg = host.message("open ..."),
- err = host.err_message("failed to open")),
+ capture(host, "open") { host.open_session(build_context.build_options, progress = progress) },
remote_hosts, thread = true)
if (attempts.forall(Exn.the_res.isDefinedAt)) {
@@ -240,7 +246,7 @@
}
- /* init */
+ /* init remote Isabelle distributions */
private var _init = List.empty[Future[Unit]]
@@ -248,8 +254,13 @@
require(_sessions.nonEmpty, "build cluster not yet open")
if (_init.isEmpty) {
- _init = _sessions.map(session =>
- Future.thread(session.host.message("init")) { session.init() })
+ _init =
+ for (session <- _sessions) yield {
+ Future.thread(session.host.message("session")) {
+ capture(session.host, "sync") { session.sync() }
+ capture(session.host, "init") { session.init() }
+ }
+ }
}
}
@@ -267,11 +278,14 @@
init()
_init.foreach(_.join)
- _workers = _sessions.map(session =>
- Future.thread(session.host.message("start")) { session.start() })
+ _workers =
+ for (session <- _sessions) yield {
+ Future.thread(session.host.message("worker")) { session.start() }
+ }
}
override def join: List[Build_Cluster.Result] = synchronized {
+ _init.foreach(_.join)
if (_workers.isEmpty) Nil
else {
assert(_sessions.length == _workers.length)
@@ -283,20 +297,12 @@
/* close */
- private var _rc: Int = Process_Result.RC.ok
-
- override def rc: Int = synchronized { _rc }
-
override def close(): Unit = synchronized {
_init.foreach(_.join)
_workers.foreach(_.join)
- join
_sessions.foreach(_.close())
- val rc1 = Process_Result.RC(_init.iterator.map(_.join_result).forall(Exn.the_res.isDefinedAt))
- val rcs2 = _workers.map(worker => Process_Result.RC(worker.join_result))
- _rc = Process_Result.RC.merge(rc1 :: rcs2)
-
+ _init = Nil
_workers = Nil
_sessions = Nil
}