clarified exception handling and return_code;
authorwenzelm
Sun, 23 Jul 2023 19:21:54 +0200
changeset 78443 a8e1d9202dd9
parent 78442 c38aebdf1a3d
child 78444 3d1746a716fa
clarified exception handling and return_code;
src/Pure/Tools/build_cluster.scala
--- a/src/Pure/Tools/build_cluster.scala	Sun Jul 23 19:20:29 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala	Sun Jul 23 19:21:54 2023 +0200
@@ -145,22 +145,16 @@
     val remote_isabelle_home: Path = remote_root + Path.explode("isabelle")
     val remote_afp: Path = remote_isabelle_home + Path.explode("AFP")
 
-    def remote_isabelle: Other_Isabelle =
+    lazy val remote_isabelle: Other_Isabelle =
       Other_Isabelle(remote_isabelle_home, isabelle_identifier = remote_identifier, ssh = ssh)
 
-    def init(): Unit = {
-      Exn.release(
-        progress.capture(
-          Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home,
-            afp_root = afp_root, purge_heaps = true, preserve_jars = true),
-          msg = host.message("sync ..."),
-          err = host.err_message("failed to sync")))
+    def sync(): Other_Isabelle = {
+      Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home,
+        afp_root = afp_root, purge_heaps = true, preserve_jars = true)
+      remote_isabelle
+    }
 
-      Exn.release(
-        progress.capture(remote_isabelle.init(),
-          msg = host.message("init ..."),
-          err = host.err_message("failed to init")))
-    }
+    def init(): Unit = remote_isabelle.init()
 
     def start(): Process_Result = {
       Process_Result.ok  // FIXME
@@ -170,11 +164,8 @@
   }
 
   class Result(val host: Host, val process_result: Exn.Result[Process_Result]) {
-    def ok: Boolean =
-      process_result match {
-        case Exn.Res(res) => res.ok
-        case Exn.Exn(_) => false
-      }
+    def rc: Int = Process_Result.RC(process_result)
+    def ok: Boolean = rc == Process_Result.RC.ok
   }
 
 
@@ -193,13 +184,17 @@
 
 // NB: extensible via Build.Engine.build_process() and Build_Process.init_cluster()
 trait Build_Cluster extends AutoCloseable {
+  def rc: Int = Process_Result.RC.ok
+  def ok: Boolean = rc == Process_Result.RC.ok
+  def return_code(rc: Int): Unit = ()
+  def return_code(res: Process_Result): Unit = return_code(res.rc)
+  def return_code(exn: Throwable): Unit = return_code(Process_Result.RC(exn))
   def open(): Unit = ()
   def init(): Unit = ()
   def start(): Unit = ()
   def active(): Boolean = false
   def join: List[Build_Cluster.Result] = Nil
   def stop(): Unit = { join; close() }
-  def rc: Int = Process_Result.RC.ok
   override def close(): Unit = ()
 }
 
@@ -214,7 +209,21 @@
     remote_hosts.iterator.map(_.name).mkString("Build_Cluster(", ", ", ")")
 
 
-  /* remote sessions */
+  /* cumulative return code */
+
+  private var _rc: Int = Process_Result.RC.ok
+
+  override def rc: Int = synchronized { _rc }
+  override def return_code(rc: Int): Unit = synchronized { _rc = Process_Result.RC.merge(_rc, rc) }
+
+  def capture[A](host: Build_Cluster.Host, op: String)(
+    e: => A,
+    msg: String = host.message(op + " ..."),
+    err: Throwable => String = { exn => return_code(exn); host.message("failed to " + op) }
+  ): Exn.Result[A] = progress.capture(e, msg = msg, err = err)
+
+
+  /* open remote sessions */
 
   private var _sessions = List.empty[Build_Cluster.Session]
 
@@ -223,10 +232,7 @@
 
     val attempts =
       Par_List.map((host: Build_Cluster.Host) =>
-        progress.capture(
-          host.open_session(build_context.build_options, progress = progress),
-          msg = host.message("open ..."),
-          err = host.err_message("failed to open")),
+        capture(host, "open") { host.open_session(build_context.build_options, progress = progress) },
         remote_hosts, thread = true)
 
     if (attempts.forall(Exn.the_res.isDefinedAt)) {
@@ -240,7 +246,7 @@
   }
 
 
-  /* init */
+  /* init remote Isabelle distributions */
 
   private var _init = List.empty[Future[Unit]]
 
@@ -248,8 +254,13 @@
     require(_sessions.nonEmpty, "build cluster not yet open")
 
     if (_init.isEmpty) {
-      _init = _sessions.map(session =>
-        Future.thread(session.host.message("init")) { session.init() })
+      _init =
+        for (session <- _sessions) yield {
+          Future.thread(session.host.message("session")) {
+            capture(session.host, "sync") { session.sync() }
+            capture(session.host, "init") { session.init() }
+          }
+        }
     }
   }
 
@@ -267,11 +278,14 @@
     init()
     _init.foreach(_.join)
 
-    _workers = _sessions.map(session =>
-      Future.thread(session.host.message("start")) { session.start() })
+    _workers =
+      for (session <- _sessions) yield {
+        Future.thread(session.host.message("worker")) { session.start() }
+      }
   }
 
   override def join: List[Build_Cluster.Result] = synchronized {
+    _init.foreach(_.join)
     if (_workers.isEmpty) Nil
     else {
       assert(_sessions.length == _workers.length)
@@ -283,20 +297,12 @@
 
   /* close */
 
-  private var _rc: Int = Process_Result.RC.ok
-
-  override def rc: Int = synchronized { _rc }
-
   override def close(): Unit = synchronized {
     _init.foreach(_.join)
     _workers.foreach(_.join)
-    join
     _sessions.foreach(_.close())
 
-    val rc1 = Process_Result.RC(_init.iterator.map(_.join_result).forall(Exn.the_res.isDefinedAt))
-    val rcs2 = _workers.map(worker => Process_Result.RC(worker.join_result))
-    _rc = Process_Result.RC.merge(rc1 :: rcs2)
-
+    _init = Nil
     _workers = Nil
     _sessions = Nil
   }