--- a/src/Pure/Tools/build_cluster.scala Sun Jul 23 13:09:15 2023 +0200
+++ b/src/Pure/Tools/build_cluster.scala Sun Jul 23 14:51:07 2023 +0200
@@ -115,11 +115,16 @@
}
def message(msg: String): String = "Host " + quote(host.name) + if_proper(msg, ": " + msg)
+ def err_message(msg: String)(exn: Throwable): String = message(msg + "\n" + Exn.message(exn))
- def open_session(options: Options, progress: Progress = new Progress): Session = {
+ def open_session(
+ options: Options,
+ afp_root: Option[Path] = None,
+ progress: Progress = new Progress
+ ): Session = {
val session_options = options ++ host.options
val ssh = SSH.open_session(session_options, host.name, port = host.port, user = host.user)
- new Session(host, session_options, ssh, progress)
+ new Session(host, session_options, afp_root, ssh, progress)
}
}
@@ -129,11 +134,34 @@
class Session(
val host: Host,
val options: Options,
+ val afp_root: Option[Path],
val ssh: SSH.System,
val progress: Progress
) extends AutoCloseable {
override def toString: String = ssh.toString
+ val remote_identifier: String = options.string("build_cluster_identifier")
+ val remote_root: Path = Path.explode(options.string("build_cluster_root"))
+ val remote_isabelle_home: Path = remote_root + Path.explode("isabelle")
+ val remote_afp: Path = remote_isabelle_home + Path.explode("AFP")
+
+ def remote_isabelle: Other_Isabelle =
+ Other_Isabelle(remote_isabelle_home, isabelle_identifier = remote_identifier, ssh = ssh)
+
+ def init(): Unit = {
+ Exn.release(
+ progress.capture(
+ Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home,
+ afp_root = afp_root, purge_heaps = true, preserve_jars = true),
+ msg = host.message("sync ..."),
+ err = host.err_message("failed to sync")))
+
+ Exn.release(
+ progress.capture(remote_isabelle.init(),
+ msg = host.message("init ..."),
+ err = host.err_message("failed to init")))
+ }
+
def start(): Process_Result = {
Process_Result.ok // FIXME
}
@@ -171,6 +199,7 @@
def active(): Boolean = false
def join: List[Build_Cluster.Result] = Nil
def stop(): Unit = { join; close() }
+ def rc: Int = Process_Result.RC.ok
override def close(): Unit = ()
}
@@ -197,7 +226,7 @@
progress.capture(
host.open_session(build_context.build_options, progress = progress),
msg = host.message("open ..."),
- err = exn => host.message("failed to open\n" + Exn.message(exn))),
+ err = host.err_message("failed to open")),
remote_hosts, thread = true)
if (attempts.forall(Exn.the_res.isDefinedAt)) {
@@ -211,6 +240,20 @@
}
+ /* init */
+
+ private var _init = List.empty[Future[Unit]]
+
+ override def init(): Unit = synchronized {
+ require(_sessions.nonEmpty, "build cluster not yet open")
+
+ if (_init.isEmpty) {
+ _init = _sessions.map(session =>
+ Future.thread(session.host.message("init")) { session.init() })
+ }
+ }
+
+
/* workers */
private var _workers = List.empty[Future[Process_Result]]
@@ -221,8 +264,11 @@
require(_sessions.nonEmpty, "build cluster not yet open")
require(_workers.isEmpty, "build cluster already active")
+ init()
+ _init.foreach(_.join)
+
_workers = _sessions.map(session =>
- Future.thread(session.host.message("session")) { session.start() })
+ Future.thread(session.host.message("start")) { session.start() })
}
override def join: List[Build_Cluster.Result] = synchronized {
@@ -237,10 +283,20 @@
/* close */
+ private var _rc: Int = Process_Result.RC.ok
+
+ override def rc: Int = synchronized { _rc }
+
override def close(): Unit = synchronized {
+ _init.foreach(_.cancel())
_workers.foreach(_.cancel())
join
_sessions.foreach(_.close())
+
+ val rc1 = Process_Result.RC(_init.iterator.map(_.join_result).forall(Exn.the_res.isDefinedAt))
+ val rcs2 = _workers.map(worker => Process_Result.RC(worker.join_result))
+ _rc = Process_Result.RC.merge(rc1 :: rcs2)
+
_workers = Nil
_sessions = Nil
}