build_worker is stopped independently from master build_process;
authorwenzelm
Wed, 16 Aug 2023 14:42:43 +0200
changeset 78529 0e79fa88cab6
parent 78528 3d6dbf215559
child 78530 d637e60427db
build_worker is stopped independently from master build_process;
src/Pure/System/progress.scala
src/Pure/Tools/build_job.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/System/progress.scala	Sun Aug 13 19:27:58 2023 +0200
+++ b/src/Pure/System/progress.scala	Wed Aug 16 14:42:43 2023 +0200
@@ -217,6 +217,7 @@
     if (Thread.interrupted()) is_stopped = true
     is_stopped
   }
+  def stopped_local: Boolean = false
 
   final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e }
   final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt()
@@ -264,6 +265,7 @@
 class Database_Progress(
   val db: SQL.Database,
   val base_progress: Progress,
+  val output_stopped: Boolean = false,
   val kind: String = "progress",
   val hostname: String = Isabelle_System.hostname(),
   val context_uuid: String = UUID.random().toString)
@@ -273,6 +275,7 @@
   private var _agent_uuid: String = ""
   private var _context: Long = -1
   private var _serial: Long = 0
+  private var _stopped_db: Boolean = false
 
   def agent_uuid: String = synchronized { _agent_uuid }
 
@@ -327,11 +330,12 @@
     if (_context == 0) throw new IllegalStateException("Database_Progress after exit")
 
     Progress.private_data.transaction_lock(db, label = "Database_Progress.sync") {
-      val stopped_db = Progress.private_data.read_progress_stopped(db, _context)
-      val stopped = base_progress.stopped
+      _stopped_db = Progress.private_data.read_progress_stopped(db, _context)
 
-      if (stopped_db && !stopped) base_progress.stop()
-      if (stopped && !stopped_db) Progress.private_data.write_progress_stopped(db, _context, true)
+      if (_stopped_db && !base_progress.stopped) base_progress.stop()
+      if (!_stopped_db && base_progress.stopped && output_stopped) {
+        Progress.private_data.write_progress_stopped(db, _context, true)
+      }
 
       val messages = Progress.private_data.read_messages(db, _context, seen = _serial)
       for ((message_serial, message) <- messages) {
@@ -371,6 +375,7 @@
 
   override def stop(): Unit = synchronized { base_progress.stop(); sync() }
   override def stopped: Boolean = sync_database { base_progress.stopped }
+  override def stopped_local: Boolean = sync_database { base_progress.stopped && !_stopped_db }
 
   override def toString: String = super.toString + ": database " + db
 
--- a/src/Pure/Tools/build_job.scala	Sun Aug 13 19:27:58 2023 +0200
+++ b/src/Pure/Tools/build_job.scala	Wed Aug 16 14:42:43 2023 +0200
@@ -13,7 +13,7 @@
 trait Build_Job {
   def cancel(): Unit = ()
   def is_finished: Boolean = false
-  def join: (Process_Result, SHA1.Shasum) = (Process_Result.undefined, SHA1.no_shasum)
+  def join: Option[(Process_Result, SHA1.Shasum)] = None
 }
 
 object Build_Job {
@@ -111,7 +111,7 @@
   ) extends Build_Job {
     def session_name: String = session_background.session_name
 
-    private val future_result: Future[(Process_Result, SHA1.Shasum)] =
+    private val future_result: Future[Option[(Process_Result, SHA1.Shasum)]] =
       Future.thread("build", uninterruptible = true) {
         val info = session_background.sessions_structure(session_name)
         val options = build_context.engine.process_options(info.options, node_info)
@@ -502,10 +502,16 @@
                   output_heap = output_shasum,
                   process_result.rc,
                   build_context.build_uuid))
-          database_server match {
-            case Some(db) => write_info(db)
-            case None => using(store.open_database(session_name, output = true))(write_info)
-          }
+
+          val valid =
+            if (progress.stopped_local) false
+            else {
+              database_server match {
+                case Some(db) => write_info(db)
+                case None => using(store.open_database(session_name, output = true))(write_info)
+              }
+              true
+            }
 
           // messages
           process_result.err_lines.foreach(progress.echo(_))
@@ -531,12 +537,12 @@
             }
           }
 
-          (process_result.copy(out_lines = log_lines), output_shasum)
+          if (valid) Some((process_result.copy(out_lines = log_lines), output_shasum)) else None
         }
       }
 
     override def cancel(): Unit = future_result.cancel()
     override def is_finished: Boolean = future_result.is_finished
-    override def join: (Process_Result, SHA1.Shasum) = future_result.join
+    override def join: Option[(Process_Result, SHA1.Shasum)] = future_result.join
   }
 }
--- a/src/Pure/Tools/build_process.scala	Sun Aug 13 19:27:58 2023 +0200
+++ b/src/Pure/Tools/build_process.scala	Wed Aug 16 14:42:43 2023 +0200
@@ -55,6 +55,7 @@
     build: Option[Build_Job]
   ) extends Library.Named {
     def no_build: Job = copy(build = None)
+    def join_build: Option[(Process_Result, SHA1.Shasum)] = build.flatMap(_.join)
   }
 
   sealed case class Result(
@@ -217,8 +218,6 @@
       copy(serial = i)
     }
 
-    def finished: Boolean = pending.isEmpty
-
     def remove_pending(name: String): State =
       copy(pending = pending.flatMap(
         entry => if (entry.name == name) None else Some(entry.resolve(name))))
@@ -228,10 +227,13 @@
     def stop_running(): Unit =
       for (job <- running.valuesIterator; build <- job.build) build.cancel()
 
+    def build_running: List[Build_Job] =
+      List.from(for (job <- running.valuesIterator; build <- job.build) yield build)
+
     def finished_running(): List[Job] =
       List.from(
         for (job <- running.valuesIterator; build <- job.build if build.is_finished)
-        yield job)
+          yield job)
 
     def add_running(job: Job): State =
       copy(running = running + (job.name -> job))
@@ -880,6 +882,7 @@
           val progress_db = store.open_build_database(Progress.private_data.database, server = server)
           val progress =
             new Database_Progress(progress_db, build_progress,
+              output_stopped = build_context.master,
               hostname = hostname,
               context_uuid = build_uuid,
               kind = "build_process")
@@ -957,8 +960,10 @@
   }
 
   protected def next_jobs(state: Build_Process.State): List[String] = {
-    val running = List.from(state.running.valuesIterator.filter(_.worker_uuid == worker_uuid))
-    val limit = if (progress.stopped) Int.MaxValue else build_context.max_jobs - running.length
+    val limit = {
+      if (progress.stopped) { if (build_context.master) Int.MaxValue else 0 }
+      else build_context.max_jobs - state.build_running.length
+    }
     if (limit > 0) {
       state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name))
         .sortBy(_.name)(state.sessions.ordering)
@@ -1013,10 +1018,13 @@
         make_result(result_name, Process_Result.error, output_shasum)
     }
     else if (cancelled) {
-      progress.echo(session_name + " CANCELLED")
-      state
-        .remove_pending(session_name)
-        .make_result(result_name, Process_Result.undefined, output_shasum)
+      if (build_context.master) {
+        progress.echo(session_name + " CANCELLED")
+        state
+          .remove_pending(session_name)
+          .make_result(result_name, Process_Result.undefined, output_shasum)
+      }
+      else state
     }
     else {
       def used_nodes: Set[Int] =
@@ -1085,7 +1093,10 @@
       synchronized_database("Build_Process.init") { _state = init_state(_state) }
     }
 
-    def finished(): Boolean = synchronized_database("Build_Process.test") { _state.finished }
+    def finished(): Boolean = synchronized_database("Build_Process.test") {
+      if (!build_context.master && progress.stopped) _state.build_running.isEmpty
+      else _state.pending.isEmpty
+    }
 
     def sleep(): Unit =
       Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() }
@@ -1128,12 +1139,17 @@
             if (progress.stopped) _state.stop_running()
 
             for (job <- _state.finished_running()) {
-              val result_name = (job.name, worker_uuid, build_uuid)
-              val (process_result, output_shasum) = job.build.get.join
-              _state = _state.
-                remove_pending(job.name).
-                remove_running(job.name).
-                make_result(result_name, process_result, output_shasum, node_info = job.node_info)
+              job.join_build match {
+                case None =>
+                  _state = _state.remove_running(job.name)
+                case Some((process_result, output_shasum)) =>
+                  val result_name = (job.name, worker_uuid, build_uuid)
+                  _state = _state.
+                    remove_pending(job.name).
+                    remove_running(job.name).
+                    make_result(result_name, process_result, output_shasum,
+                      node_info = job.node_info)
+              }
             }
           }