support for Build_Cluster.Session.init (rsync + Admin/init);
authorwenzelm
Sun, 23 Jul 2023 14:51:07 +0200
changeset 78440 126a12483c67
parent 78439 001d423daf7c
child 78441 3153311f0f6c
support for Build_Cluster.Session.init (rsync + Admin/init); clarified Build.Results and overall return code;
etc/options
src/Pure/Tools/build.scala
src/Pure/Tools/build_cluster.scala
src/Pure/Tools/build_process.scala
--- a/etc/options	Sun Jul 23 13:09:15 2023 +0200
+++ b/etc/options	Sun Jul 23 14:51:07 2023 +0200
@@ -207,6 +207,12 @@
 option build_cluster_delay : real = 1.0
   -- "delay build process main loop (cluster)"
 
+option build_cluster_root : string = "$USER_HOME/.isabelle/build_cluster"
+  -- "root directory for remote build cluster sessions"
+
+option build_cluster_identifier : string = "build_cluster"
+  -- "ISABELLE_IDENTIFIER for remote build cluster sessions"
+
 
 section "Editor Session"
 
--- a/src/Pure/Tools/build.scala	Sun Jul 23 13:09:15 2023 +0200
+++ b/src/Pure/Tools/build.scala	Sun Jul 23 14:51:07 2023 +0200
@@ -56,14 +56,20 @@
   /* results */
 
   object Results {
-    def apply(context: Context, results: Map[String, Process_Result]): Results =
-      new Results(context.store, context.build_deps, results)
+    def apply(
+      context: Context,
+      results: Map[String, Process_Result] = Map.empty,
+      other_rc: Int = Process_Result.RC.ok
+    ): Results = {
+      new Results(context.store, context.build_deps, results, other_rc)
+    }
   }
 
   class Results private(
     val store: Store,
     val deps: Sessions.Deps,
-    results: Map[String, Process_Result]
+    results: Map[String, Process_Result],
+    other_rc: Int
   ) {
     def cache: Term.Cache = store.cache
 
@@ -77,7 +83,10 @@
     def sessions: Set[String] = results.keySet
     def cancelled(name: String): Boolean = !results(name).defined
     def apply(name: String): Process_Result = results(name).strict
-    val rc: Int = results.valuesIterator.map(_.strict.rc).foldLeft(Process_Result.RC.ok)(_ max _)
+
+    val rc: Int =
+      Process_Result.RC.merge(other_rc,
+        Process_Result.RC.merge(results.valuesIterator.map(_.strict.rc)))
     def ok: Boolean = rc == Process_Result.RC.ok
 
     def unfinished: List[String] = sessions.iterator.filterNot(apply(_).ok).toList.sorted
@@ -130,8 +139,7 @@
       server: SSH.Server
     ): Results = {
       Isabelle_Thread.uninterruptible {
-        using(open_build_process(context, progress, server))(
-          build_process => Results(context, build_process.run()))
+        using(open_build_process(context, progress, server))(_.run())
       }
     }
   }
--- a/src/Pure/Tools/build_cluster.scala	Sun Jul 23 13:09:15 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala	Sun Jul 23 14:51:07 2023 +0200
@@ -115,11 +115,16 @@
     }
 
     def message(msg: String): String = "Host " + quote(host.name) + if_proper(msg, ": " + msg)
+    def err_message(msg: String)(exn: Throwable): String = message(msg + "\n" + Exn.message(exn))
 
-    def open_session(options: Options, progress: Progress = new Progress): Session = {
+    def open_session(
+      options: Options,
+      afp_root: Option[Path] = None,
+      progress: Progress = new Progress
+    ): Session = {
       val session_options = options ++ host.options
       val ssh = SSH.open_session(session_options, host.name, port = host.port, user = host.user)
-      new Session(host, session_options, ssh, progress)
+      new Session(host, session_options, afp_root, ssh, progress)
     }
   }
 
@@ -129,11 +134,34 @@
   class Session(
     val host: Host,
     val options: Options,
+    val afp_root: Option[Path],
     val ssh: SSH.System,
     val progress: Progress
   ) extends AutoCloseable {
     override def toString: String = ssh.toString
 
+    val remote_identifier: String = options.string("build_cluster_identifier")
+    val remote_root: Path = Path.explode(options.string("build_cluster_root"))
+    val remote_isabelle_home: Path = remote_root + Path.explode("isabelle")
+    val remote_afp: Path = remote_isabelle_home + Path.explode("AFP")
+
+    def remote_isabelle: Other_Isabelle =
+      Other_Isabelle(remote_isabelle_home, isabelle_identifier = remote_identifier, ssh = ssh)
+
+    def init(): Unit = {
+      Exn.release(
+        progress.capture(
+          Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home,
+            afp_root = afp_root, purge_heaps = true, preserve_jars = true),
+          msg = host.message("sync ..."),
+          err = host.err_message("failed to sync")))
+
+      Exn.release(
+        progress.capture(remote_isabelle.init(),
+          msg = host.message("init ..."),
+          err = host.err_message("failed to init")))
+    }
+
     def start(): Process_Result = {
       Process_Result.ok  // FIXME
     }
@@ -171,6 +199,7 @@
   def active(): Boolean = false
   def join: List[Build_Cluster.Result] = Nil
   def stop(): Unit = { join; close() }
+  def rc: Int = Process_Result.RC.ok
   override def close(): Unit = ()
 }
 
@@ -197,7 +226,7 @@
         progress.capture(
           host.open_session(build_context.build_options, progress = progress),
           msg = host.message("open ..."),
-          err = exn => host.message("failed to open\n" + Exn.message(exn))),
+          err = host.err_message("failed to open")),
         remote_hosts, thread = true)
 
     if (attempts.forall(Exn.the_res.isDefinedAt)) {
@@ -211,6 +240,20 @@
   }
 
 
+  /* init */
+
+  private var _init = List.empty[Future[Unit]]
+
+  override def init(): Unit = synchronized {
+    require(_sessions.nonEmpty, "build cluster not yet open")
+
+    if (_init.isEmpty) {
+      _init = _sessions.map(session =>
+        Future.thread(session.host.message("init")) { session.init() })
+    }
+  }
+
+
   /* workers */
 
   private var _workers = List.empty[Future[Process_Result]]
@@ -221,8 +264,11 @@
     require(_sessions.nonEmpty, "build cluster not yet open")
     require(_workers.isEmpty, "build cluster already active")
 
+    init()
+    _init.foreach(_.join)
+
     _workers = _sessions.map(session =>
-      Future.thread(session.host.message("session")) { session.start() })
+      Future.thread(session.host.message("start")) { session.start() })
   }
 
   override def join: List[Build_Cluster.Result] = synchronized {
@@ -237,10 +283,20 @@
 
   /* close */
 
+  private var _rc: Int = Process_Result.RC.ok
+
+  override def rc: Int = synchronized { _rc }
+
   override def close(): Unit = synchronized {
+    _init.foreach(_.cancel())
     _workers.foreach(_.cancel())
     join
     _sessions.foreach(_.close())
+
+    val rc1 = Process_Result.RC(_init.iterator.map(_.join_result).forall(Exn.the_res.isDefinedAt))
+    val rcs2 = _workers.map(worker => Process_Result.RC(worker.join_result))
+    _rc = Process_Result.RC.merge(rc1 :: rcs2)
+
     _workers = Nil
     _sessions = Nil
   }
--- a/src/Pure/Tools/build_process.scala	Sun Jul 23 13:09:15 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Sun Jul 23 14:51:07 2023 +0200
@@ -1068,7 +1068,7 @@
 
   /* run */
 
-  def run(): Map[String, Process_Result] = {
+  def run(): Build.Results = {
     if (build_context.master) {
       _build_cluster.init()
       synchronized_database("Build_Process.init") { _state = init_state(_state) }
@@ -1092,7 +1092,7 @@
 
     if (finished()) {
       progress.echo_warning("Nothing to build")
-      Map.empty[String, Process_Result]
+      Build.Results(build_context)
     }
     else {
       if (build_context.master) start_build()
@@ -1128,7 +1128,8 @@
       }
 
       synchronized_database("Build_Process.result") {
-        for ((name, result) <- _state.results) yield name -> result.process_result
+        val results = for ((name, result) <- _state.results) yield name -> result.process_result
+        Build.Results(build_context, results = results, other_rc = _build_cluster.rc)
       }
     }
   }