# HG changeset patch # User wenzelm # Date 1690132914 -7200 # Node ID a8e1d9202dd98480facd2b1bfd1e4bac740ddc32 # Parent c38aebdf1a3d9ad068cf5c33ecd942e0fc0fe6e8 clarified exception handling and return_code; diff -r c38aebdf1a3d -r a8e1d9202dd9 src/Pure/Tools/build_cluster.scala --- a/src/Pure/Tools/build_cluster.scala Sun Jul 23 19:20:29 2023 +0200 +++ b/src/Pure/Tools/build_cluster.scala Sun Jul 23 19:21:54 2023 +0200 @@ -145,22 +145,16 @@ val remote_isabelle_home: Path = remote_root + Path.explode("isabelle") val remote_afp: Path = remote_isabelle_home + Path.explode("AFP") - def remote_isabelle: Other_Isabelle = + lazy val remote_isabelle: Other_Isabelle = Other_Isabelle(remote_isabelle_home, isabelle_identifier = remote_identifier, ssh = ssh) - def init(): Unit = { - Exn.release( - progress.capture( - Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home, - afp_root = afp_root, purge_heaps = true, preserve_jars = true), - msg = host.message("sync ..."), - err = host.err_message("failed to sync"))) + def sync(): Other_Isabelle = { + Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home, + afp_root = afp_root, purge_heaps = true, preserve_jars = true) + remote_isabelle + } - Exn.release( - progress.capture(remote_isabelle.init(), - msg = host.message("init ..."), - err = host.err_message("failed to init"))) - } + def init(): Unit = remote_isabelle.init() def start(): Process_Result = { Process_Result.ok // FIXME @@ -170,11 +164,8 @@ } class Result(val host: Host, val process_result: Exn.Result[Process_Result]) { - def ok: Boolean = - process_result match { - case Exn.Res(res) => res.ok - case Exn.Exn(_) => false - } + def rc: Int = Process_Result.RC(process_result) + def ok: Boolean = rc == Process_Result.RC.ok } @@ -193,13 +184,17 @@ // NB: extensible via Build.Engine.build_process() and Build_Process.init_cluster() trait Build_Cluster extends AutoCloseable { + def rc: Int = Process_Result.RC.ok + def ok: Boolean = rc == Process_Result.RC.ok + def return_code(rc: Int): Unit = () + def return_code(res: Process_Result): Unit = return_code(res.rc) + def return_code(exn: Throwable): Unit = return_code(Process_Result.RC(exn)) def open(): Unit = () def init(): Unit = () def start(): Unit = () def active(): Boolean = false def join: List[Build_Cluster.Result] = Nil def stop(): Unit = { join; close() } - def rc: Int = Process_Result.RC.ok override def close(): Unit = () } @@ -214,7 +209,21 @@ remote_hosts.iterator.map(_.name).mkString("Build_Cluster(", ", ", ")") - /* remote sessions */ + /* cumulative return code */ + + private var _rc: Int = Process_Result.RC.ok + + override def rc: Int = synchronized { _rc } + override def return_code(rc: Int): Unit = synchronized { _rc = Process_Result.RC.merge(_rc, rc) } + + def capture[A](host: Build_Cluster.Host, op: String)( + e: => A, + msg: String = host.message(op + " ..."), + err: Throwable => String = { exn => return_code(exn); host.message("failed to " + op) } + ): Exn.Result[A] = progress.capture(e, msg = msg, err = err) + + + /* open remote sessions */ private var _sessions = List.empty[Build_Cluster.Session] @@ -223,10 +232,7 @@ val attempts = Par_List.map((host: Build_Cluster.Host) => - progress.capture( - host.open_session(build_context.build_options, progress = progress), - msg = host.message("open ..."), - err = host.err_message("failed to open")), + capture(host, "open") { host.open_session(build_context.build_options, progress = progress) }, remote_hosts, thread = true) if (attempts.forall(Exn.the_res.isDefinedAt)) { @@ -240,7 +246,7 @@ } - /* init */ + /* init remote Isabelle distributions */ private var _init = List.empty[Future[Unit]] @@ -248,8 +254,13 @@ require(_sessions.nonEmpty, "build cluster not yet open") if (_init.isEmpty) { - _init = _sessions.map(session => - Future.thread(session.host.message("init")) { session.init() }) + _init = + for (session <- _sessions) yield { + Future.thread(session.host.message("session")) { + capture(session.host, "sync") { session.sync() } + capture(session.host, "init") { session.init() } + } + } } } @@ -267,11 +278,14 @@ init() _init.foreach(_.join) - _workers = _sessions.map(session => - Future.thread(session.host.message("start")) { session.start() }) + _workers = + for (session <- _sessions) yield { + Future.thread(session.host.message("worker")) { session.start() } + } } override def join: List[Build_Cluster.Result] = synchronized { + _init.foreach(_.join) if (_workers.isEmpty) Nil else { assert(_sessions.length == _workers.length) @@ -283,20 +297,12 @@ /* close */ - private var _rc: Int = Process_Result.RC.ok - - override def rc: Int = synchronized { _rc } - override def close(): Unit = synchronized { _init.foreach(_.join) _workers.foreach(_.join) - join _sessions.foreach(_.close()) - val rc1 = Process_Result.RC(_init.iterator.map(_.join_result).forall(Exn.the_res.isDefinedAt)) - val rcs2 = _workers.map(worker => Process_Result.RC(worker.join_result)) - _rc = Process_Result.RC.merge(rc1 :: rcs2) - + _init = Nil _workers = Nil _sessions = Nil }