# HG changeset patch # User wenzelm # Date 1690116667 -7200 # Node ID 126a12483c67e08b60f691776758241daf227402 # Parent 001d423daf7c0fd121b4b3940584b911adcbc849 support for Build_Cluster.Session.init (rsync + Admin/init); clarified Build.Results and overall return code; diff -r 001d423daf7c -r 126a12483c67 etc/options --- a/etc/options Sun Jul 23 13:09:15 2023 +0200 +++ b/etc/options Sun Jul 23 14:51:07 2023 +0200 @@ -207,6 +207,12 @@ option build_cluster_delay : real = 1.0 -- "delay build process main loop (cluster)" +option build_cluster_root : string = "$USER_HOME/.isabelle/build_cluster" + -- "root directory for remote build cluster sessions" + +option build_cluster_identifier : string = "build_cluster" + -- "ISABELLE_IDENTIFIER for remote build cluster sessions" + section "Editor Session" diff -r 001d423daf7c -r 126a12483c67 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Sun Jul 23 13:09:15 2023 +0200 +++ b/src/Pure/Tools/build.scala Sun Jul 23 14:51:07 2023 +0200 @@ -56,14 +56,20 @@ /* results */ object Results { - def apply(context: Context, results: Map[String, Process_Result]): Results = - new Results(context.store, context.build_deps, results) + def apply( + context: Context, + results: Map[String, Process_Result] = Map.empty, + other_rc: Int = Process_Result.RC.ok + ): Results = { + new Results(context.store, context.build_deps, results, other_rc) + } } class Results private( val store: Store, val deps: Sessions.Deps, - results: Map[String, Process_Result] + results: Map[String, Process_Result], + other_rc: Int ) { def cache: Term.Cache = store.cache @@ -77,7 +83,10 @@ def sessions: Set[String] = results.keySet def cancelled(name: String): Boolean = !results(name).defined def apply(name: String): Process_Result = results(name).strict - val rc: Int = results.valuesIterator.map(_.strict.rc).foldLeft(Process_Result.RC.ok)(_ max _) + + val rc: Int = + Process_Result.RC.merge(other_rc, + Process_Result.RC.merge(results.valuesIterator.map(_.strict.rc))) def ok: Boolean = rc == Process_Result.RC.ok def unfinished: List[String] = sessions.iterator.filterNot(apply(_).ok).toList.sorted @@ -130,8 +139,7 @@ server: SSH.Server ): Results = { Isabelle_Thread.uninterruptible { - using(open_build_process(context, progress, server))( - build_process => Results(context, build_process.run())) + using(open_build_process(context, progress, server))(_.run()) } } } diff -r 001d423daf7c -r 126a12483c67 src/Pure/Tools/build_cluster.scala --- a/src/Pure/Tools/build_cluster.scala Sun Jul 23 13:09:15 2023 +0200 +++ b/src/Pure/Tools/build_cluster.scala Sun Jul 23 14:51:07 2023 +0200 @@ -115,11 +115,16 @@ } def message(msg: String): String = "Host " + quote(host.name) + if_proper(msg, ": " + msg) + def err_message(msg: String)(exn: Throwable): String = message(msg + "\n" + Exn.message(exn)) - def open_session(options: Options, progress: Progress = new Progress): Session = { + def open_session( + options: Options, + afp_root: Option[Path] = None, + progress: Progress = new Progress + ): Session = { val session_options = options ++ host.options val ssh = SSH.open_session(session_options, host.name, port = host.port, user = host.user) - new Session(host, session_options, ssh, progress) + new Session(host, session_options, afp_root, ssh, progress) } } @@ -129,11 +134,34 @@ class Session( val host: Host, val options: Options, + val afp_root: Option[Path], val ssh: SSH.System, val progress: Progress ) extends AutoCloseable { override def toString: String = ssh.toString + val remote_identifier: String = options.string("build_cluster_identifier") + val remote_root: Path = Path.explode(options.string("build_cluster_root")) + val remote_isabelle_home: Path = remote_root + Path.explode("isabelle") + val remote_afp: Path = remote_isabelle_home + Path.explode("AFP") + + def remote_isabelle: Other_Isabelle = + Other_Isabelle(remote_isabelle_home, isabelle_identifier = remote_identifier, ssh = ssh) + + def init(): Unit = { + Exn.release( + progress.capture( + Sync.sync(options, Rsync.Context(ssh = ssh), remote_isabelle_home, + afp_root = afp_root, purge_heaps = true, preserve_jars = true), + msg = host.message("sync ..."), + err = host.err_message("failed to sync"))) + + Exn.release( + progress.capture(remote_isabelle.init(), + msg = host.message("init ..."), + err = host.err_message("failed to init"))) + } + def start(): Process_Result = { Process_Result.ok // FIXME } @@ -171,6 +199,7 @@ def active(): Boolean = false def join: List[Build_Cluster.Result] = Nil def stop(): Unit = { join; close() } + def rc: Int = Process_Result.RC.ok override def close(): Unit = () } @@ -197,7 +226,7 @@ progress.capture( host.open_session(build_context.build_options, progress = progress), msg = host.message("open ..."), - err = exn => host.message("failed to open\n" + Exn.message(exn))), + err = host.err_message("failed to open")), remote_hosts, thread = true) if (attempts.forall(Exn.the_res.isDefinedAt)) { @@ -211,6 +240,20 @@ } + /* init */ + + private var _init = List.empty[Future[Unit]] + + override def init(): Unit = synchronized { + require(_sessions.nonEmpty, "build cluster not yet open") + + if (_init.isEmpty) { + _init = _sessions.map(session => + Future.thread(session.host.message("init")) { session.init() }) + } + } + + /* workers */ private var _workers = List.empty[Future[Process_Result]] @@ -221,8 +264,11 @@ require(_sessions.nonEmpty, "build cluster not yet open") require(_workers.isEmpty, "build cluster already active") + init() + _init.foreach(_.join) + _workers = _sessions.map(session => - Future.thread(session.host.message("session")) { session.start() }) + Future.thread(session.host.message("start")) { session.start() }) } override def join: List[Build_Cluster.Result] = synchronized { @@ -237,10 +283,20 @@ /* close */ + private var _rc: Int = Process_Result.RC.ok + + override def rc: Int = synchronized { _rc } + override def close(): Unit = synchronized { + _init.foreach(_.cancel()) _workers.foreach(_.cancel()) join _sessions.foreach(_.close()) + + val rc1 = Process_Result.RC(_init.iterator.map(_.join_result).forall(Exn.the_res.isDefinedAt)) + val rcs2 = _workers.map(worker => Process_Result.RC(worker.join_result)) + _rc = Process_Result.RC.merge(rc1 :: rcs2) + _workers = Nil _sessions = Nil } diff -r 001d423daf7c -r 126a12483c67 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sun Jul 23 13:09:15 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Sun Jul 23 14:51:07 2023 +0200 @@ -1068,7 +1068,7 @@ /* run */ - def run(): Map[String, Process_Result] = { + def run(): Build.Results = { if (build_context.master) { _build_cluster.init() synchronized_database("Build_Process.init") { _state = init_state(_state) } @@ -1092,7 +1092,7 @@ if (finished()) { progress.echo_warning("Nothing to build") - Map.empty[String, Process_Result] + Build.Results(build_context) } else { if (build_context.master) start_build() @@ -1128,7 +1128,8 @@ } synchronized_database("Build_Process.result") { - for ((name, result) <- _state.results) yield name -> result.process_result + val results = for ((name, result) <- _state.results) yield name -> result.process_result + Build.Results(build_context, results = results, other_rc = _build_cluster.rc) } } }