clarified context: operations now in build process;
authorFabian Huch <huch@in.tum.de>
Fri, 07 Jun 2024 15:04:07 +0200
changeset 80280 7987b33fb6c5
parent 80279 02424b81472a
child 80281 17d2f775907a
clarified context: operations now in build process;
src/Pure/Build/build_manager.scala
--- a/src/Pure/Build/build_manager.scala	Fri Jun 07 14:00:59 2024 +0200
+++ b/src/Pure/Build/build_manager.scala	Fri Jun 07 15:04:07 2024 +0200
@@ -572,8 +572,8 @@
     ) {
       def is_empty = process_futures.isEmpty && result_futures.isEmpty
 
-      def init(build_config: Build_Config, job: Job, context: Context): State = {
-        val process_future = Future.fork(Build_Process.open(build_config, context))
+      def init(context: Context): State = {
+        val process_future = Future.fork(Build_Process.open(context))
         val result_future =
           Future.fork(
             process_future.join_result match {
@@ -581,8 +581,8 @@
               case Exn.Exn(_) => Process_Result(Process_Result.RC.interrupt)
             })
         new State(
-          process_futures + (job.name -> process_future),
-          result_futures + (job.name -> result_future))
+          process_futures + (context.name -> process_future),
+          result_futures + (context.name -> result_future))
       }
 
       def running: List[String] = process_futures.keys.toList
@@ -629,27 +629,28 @@
       }
     }
 
-    private def start_next(): Option[(Build_Config, Job)] =
-      synchronized_database("start_job") {
+    private def start_next(): Option[Context] =
+      synchronized_database("start_next") {
         _state.next.headOption.flatMap { task =>
           progress.echo("Initializing " + task.name)
 
           _state = _state.remove_pending(task.name)
 
-          val context = Context(store, task, build_hosts)
           val number = _state.next_number(task.kind)
+          val context = Context(store, task, number, build_hosts)
 
           Exn.capture {
-            store.sync_permissions(context.dir)
+            context.init()
+            store.sync_permissions(context.task_dir)
 
             val isabelle_rev =
-              sync(isabelle_repository, task.isabelle_rev, context.dir)
+              sync(isabelle_repository, task.isabelle_rev, context.task_dir)
 
             val components =
               for (component <- task.components)
               yield sync_dirs.find(_.name == component.name) match {
                 case Some(sync_dir) =>
-                  val target = context.dir + sync_dir.target
+                  val target = context.task_dir + sync_dir.target
                   component.copy(rev = sync(sync_dir.hg, component.rev, target))
                 case None =>
                   if (component.rev.isEmpty) component
@@ -660,23 +661,21 @@
           } match {
             case Exn.Res(job) =>
               _state = _state.add_running(job)
-              val context1 = context.move(Context(store, job))
 
               val msg = "Starting " + job.name
               echo(msg + " (id " + job.id + ")")
-              context1.progress.echo(msg)
+              context.progress.echo(msg)
 
-              Some(task.build_config, job)
+              Some(context)
             case Exn.Exn(exn) =>
               val result = Result(task.kind, number, Status.aborted)
-              val context1 = Context(store, result)
+              _state = _state.add_finished(result)
 
               val msg = "Failed to start job: " + exn.getMessage
               echo_error_message(msg)
-              context1.progress.echo_error_message(msg)
+              context.progress.echo_error_message(msg)
 
-              context.remove()
-              _state = _state.add_finished(result)
+              Isabelle_System.rm_tree(context.task_dir)
 
               None
           }
@@ -692,11 +691,8 @@
     private def finish_job(name: String, process_result: Process_Result): Unit =
       synchronized_database("finish_job") {
         val job = _state.running(name)
-        val context = Context(store, job, build_hosts)
+        val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id))
 
-        val result = Result(job.kind, job.number, Status.from_result(process_result), Some(job.id))
-        context.copy_results(Context(store, result))
-        context.remove()
         echo("Finished job " + job.id + " with status code " + process_result.rc)
 
         _state = _state
@@ -711,8 +707,7 @@
       if (state.is_empty && !progress.stopped) {
         start_next() match {
           case None => state
-          case Some((build_config, job)) =>
-            state.init(build_config, job, Context(store, job, build_hosts))
+          case Some(context) => state.init(context)
         }
       }
       else {
@@ -840,17 +835,17 @@
           for {
             (name, (time, log)) <- logs
             if time + keep > Time.now()
-          } yield name -> (time, Context(store, state.get(name).get).log)
+          } yield name -> (time, File.read(store.log_file(name)))
       }
 
-      def lookup(store: Store, elem: T): String = synchronized {
-        logs.get(elem.name) match {
+      def lookup(store: Store, name: String): String = synchronized {
+        logs.get(name) match {
           case Some((_, log)) =>
-            logs += elem.name -> (Time.now(), log)
+            logs += name -> (Time.now(), log)
           case None =>
-            logs += elem.name -> (Time.now(), Context(store, elem).log)
+            logs += name -> (Time.now(), File.read(store.log_file(name)))
         }
-        logs(elem.name)._2
+        logs(name)._2
       }
     }
   }
@@ -976,11 +971,11 @@
                 if (job.cancelled) text("Cancelling...")
                 else text("Running...") ::: render_cancel(job.id)) ::
               render_rev(job.isabelle_rev, job.components) :::
-              source(cache.lookup(store, job)) :: Nil
+              source(cache.lookup(store, job.name)) :: Nil
             case result: Result =>
               par(text("Date: " + result.date)) ::
               par(text("Status: " + result.status)) ::
-              source(cache.lookup(store, result)) :: Nil
+              source(cache.lookup(store, result.name)) :: Nil
           }
         }
 
@@ -1068,58 +1063,42 @@
 
   /* context */
 
-  object Context {
-    def apply(store: Store, elem: T, build_hosts: List[Build_Cluster.Host] = Nil): Context =
-      new Context(store, store.dir(elem), build_hosts)
-  }
-
-  class Context private(store: Store, val dir: Path, val build_hosts: List[Build_Cluster.Host]) {
+  case class Context(
+    store: Store,
+    task: Task,
+    number: Long,
+    build_hosts: List[Build_Cluster.Host]
+  ) {
+    def name: String = task.kind + "/" + number
+    def init(): Unit = Isabelle_System.make_directory(store.log_file(name).dir)
+    def task_dir: Path = store.task_dir(task)
+    def components: List[Component] = task.build_config.components
     def isabelle_identifier: String = store.identifier
-
-    private val log_file = dir + Path.basic("log")
-    val progress = new File_Progress(log_file, verbose = true)
-    def log: String =
-      Exn.capture(File.read(log_file)) match {
-        case Exn.Exn(_) => ""
-        case Exn.Res(res) => res
-      }
-
-    def move(other: Context): Context = {
-      Isabelle_System.make_directory(other.dir.dir)
-      Isabelle_System.move_file(dir, other.dir)
-      other
-    }
-
-    def copy_results(other: Context): Context = {
-      Isabelle_System.make_directory(other.dir)
-      Isabelle_System.copy_file(log_file, other.log_file)
-      other
-    }
-
-    def remove(): Unit = Isabelle_System.rm_tree(dir)
-
-    def ssh = store.open_ssh()
+    def fresh_build: Boolean = task.build_config.fresh_build
+    def command: String = task.build_config.command(build_hosts)
+    def progress: Progress = new File_Progress(store.log_file(name))
+    def open_ssh(): SSH.Session = store.open_ssh()
   }
 
 
   /* build process */
 
   object Build_Process {
-    def open(build_config: Build_Config, context: Context): Build_Process =
-      new Build_Process(context.ssh, build_config, context)
+    def open(context: Context): Build_Process = new Build_Process(context.open_ssh(), context)
   }
 
-  class Build_Process(ssh: SSH.Session, build_config: Build_Config, context: Context) {
+  class Build_Process(ssh: SSH.Session, context: Context) {
     private val _dir = ssh.tmp_dir()
     private val _progress = context.progress
 
     private val _isabelle =
       try {
         val rsync_context = Rsync.Context(ssh = ssh)
-        val source = File.standard_path(context.dir)
+        val source = File.standard_path(context.task_dir)
         Rsync.exec(rsync_context, clean = true, args = List("--", Url.direct_path(source),
           rsync_context.target(_dir))).check
 
+        Isabelle_System.rm_tree(context.task_dir)
         Other_Isabelle(_dir, context.isabelle_identifier, ssh, _progress)
       }
       catch { case exn: Throwable => close(); throw exn }
@@ -1128,16 +1107,16 @@
       try {
         val init_components =
           for {
-            component <- build_config.components
+            component <- context.components
             target = _dir + Sync.DIRS + Path.basic(component.name)
             if Components.is_component_dir(target)
           } yield "init_component " + quote(target.absolute.implode)
 
         _isabelle.init(
           other_settings = _isabelle.init_components() ::: init_components,
-          fresh = build_config.fresh_build, echo = true)
+          fresh = context.fresh_build, echo = true)
 
-        val cmd = build_config.command(context.build_hosts)
+        val cmd = context.command
         _progress.echo("isabelle" + cmd)
 
         val script = File.bash_path(Isabelle_Tool.exe(_isabelle.isabelle_home)) + cmd
@@ -1156,6 +1135,7 @@
 
     def close(): Unit = {
       Option(_dir).foreach(ssh.rm_tree)
+      Isabelle_System.rm_tree(context.task_dir)
       ssh.close()
     }
   }
@@ -1168,15 +1148,10 @@
     val identifier = options.string("build_manager_identifier")
 
     private val pending = base_dir + Path.basic("pending")
-    private val running = base_dir + Path.basic("running")
     private val finished = base_dir + Path.basic("finished")
 
-    def dir(elem: T): Path =
-      elem match {
-        case task: Task => pending + Path.basic(task.id.toString)
-        case job: Job => running + Path.make(List(job.kind, job.number.toString))
-        case result: Result => finished + Path.make(List(result.kind, result.number.toString))
-      }
+    def task_dir(task: Task) = pending + Path.basic(task.id.toString)
+    def log_file(name: String): Path = finished + Path.explode(name)
 
     def sync_permissions(dir: Path, ssh: SSH.System = SSH.Local): Unit = {
       ssh.execute("chmod -R g+rwx " + File.bash_path(dir))
@@ -1184,8 +1159,7 @@
     }
 
     def init_dirs(): Unit =
-      List(pending, running, finished).foreach(dir =>
-        sync_permissions(Isabelle_System.make_directory(dir)))
+      List(pending, finished).foreach(dir => sync_permissions(Isabelle_System.make_directory(dir)))
 
     val ssh_group: String = options.string("build_manager_ssh_group")
 
@@ -1281,19 +1255,19 @@
       export_files, fresh_build, presentation, verbose)
     val task = Task(build_config, id, Date.now(), Priority.high)
 
-    val context = Context(store, task)
+    val dir = store.task_dir(task)
 
     progress.interrupt_handler {
       using(store.open_ssh()) { ssh =>
         val rsync_context = Rsync.Context(ssh = ssh)
         progress.echo("Transferring repositories...")
-        Sync.sync(store.options, rsync_context, context.dir, preserve_jars = true,
+        Sync.sync(store.options, rsync_context, dir, preserve_jars = true,
           dirs = Sync.afp_dirs(afp_root), rev = rev)
-        store.sync_permissions(context.dir, ssh)
+        store.sync_permissions(dir, ssh)
 
         if (progress.stopped) {
           progress.echo("Cancelling submission...")
-          ssh.rm_tree(context.dir)
+          ssh.rm_tree(dir)
         } else {
           using(store.open_postgresql_server()) { server =>
             using(store.open_database(server = server)) { db =>