revert most parts of 0e79fa88cab6: somewhat ambitious attempt to move towards "editing" builds via added/canceled workers;
authorwenzelm
Wed, 13 Mar 2024 23:26:30 +0100
changeset 79887 17220dc05991
parent 79886 7ae25372ab04
child 79888 7b4b524cdee2
revert most parts of 0e79fa88cab6: somewhat ambitious attempt to move towards "editing" builds via added/canceled workers; retain notion of next_jobs.limit and finished() from 0e79fa88cab6; clarified Job vs. optional Build_Job;
src/Pure/Build/build_benchmark.scala
src/Pure/Build/build_job.scala
src/Pure/Build/build_process.scala
src/Pure/Build/database_progress.scala
src/Pure/System/progress.scala
--- a/src/Pure/Build/build_benchmark.scala	Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/Build/build_benchmark.scala	Wed Mar 13 23:26:30 2024 +0100
@@ -82,15 +82,13 @@
 
         val local_build_context = build_context.copy(store = Store(local_options))
 
-        val build =
+        val result =
           Build_Job.start_session(local_build_context, session, progress, new Logger, server,
-            background, session.sources_shasum, input_shasum, node_info, false)
+            background, session.sources_shasum, input_shasum, node_info, false).join
 
         val timing =
-          build.join match {
-            case Some(result) if result.process_result.ok => result.process_result.timing
-            case _ => error("Failed to build benchmark session")
-          }
+          if (result.process_result.ok) result.process_result.timing
+          else error("Failed to build benchmark session")
 
         val score = Time.seconds(1000).ms.toDouble / (1 + timing.elapsed.ms)
         progress.echo(
--- a/src/Pure/Build/build_job.scala	Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/Build/build_job.scala	Wed Mar 13 23:26:30 2024 +0100
@@ -13,11 +13,12 @@
 trait Build_Job {
   def cancel(): Unit = ()
   def is_finished: Boolean = false
-  def join: Option[Build_Job.Result] = None
+  def join: Build_Job.Result = Build_Job.no_result
 }
 
 object Build_Job {
   sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum)
+  val no_result: Result = Result(Process_Result.undefined, SHA1.no_shasum)
 
 
   /* build session */
@@ -114,7 +115,7 @@
   ) extends Build_Job {
     def session_name: String = session_background.session_name
 
-    private val future_result: Future[Option[Result]] =
+    private val future_result: Future[Result] =
       Future.thread("build", uninterruptible = true) {
         val info = session_background.sessions_structure(session_name)
         val options = Host.node_options(info.options, node_info)
@@ -508,15 +509,10 @@
                   process_result.rc,
                   build_context.build_uuid))
 
-          val valid =
-            if (progress.stopped_local) false
-            else {
-              database_server match {
-                case Some(db) => write_info(db)
-                case None => using(store.open_database(session_name, output = true))(write_info)
-              }
-              true
-            }
+          database_server match {
+            case Some(db) => write_info(db)
+            case None => using(store.open_database(session_name, output = true))(write_info)
+          }
 
           using_optional(store.maybe_open_heaps_database(database_server, server = server)) {
             heaps_database =>
@@ -554,13 +550,12 @@
             }
           }
 
-          if (valid) Some(Result(process_result.copy(out_lines = log_lines), output_shasum))
-          else None
+          Result(process_result.copy(out_lines = log_lines), output_shasum)
         }
       }
 
     override def cancel(): Unit = future_result.cancel()
     override def is_finished: Boolean = future_result.is_finished
-    override def join: Option[Result] = future_result.join
+    override def join: Result = future_result.join
   }
 }
--- a/src/Pure/Build/build_process.scala	Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/Build/build_process.scala	Wed Mar 13 23:26:30 2024 +0100
@@ -65,11 +65,7 @@
     node_info: Host.Node_Info,
     start_date: Date,
     build: Option[Build_Job]
-  ) extends Library.Named {
-    def cancel(): Unit = build.foreach(_.cancel())
-    def is_finished: Boolean = build.isDefined && build.get.is_finished
-    def join_build: Option[Build_Job.Result] = build.flatMap(_.join)
-  }
+  ) extends Library.Named
 
   sealed case class Result(
     name: String,
@@ -252,13 +248,14 @@
 
     def is_running(name: String): Boolean = running.isDefinedAt(name)
 
-    def finished_running(): Boolean = running.valuesIterator.exists(_.is_finished)
+    def build_running: List[Build_Job] =
+      running.valuesIterator.flatMap(_.build).toList
+
+    def finished_running(): Boolean =
+      build_running.exists(_.is_finished)
 
     def busy_running(jobs: Int): Boolean =
-      jobs <= 0 || jobs <= running.valuesIterator.flatMap(_.build).length
-
-    def build_running: List[Job] =
-      List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job)
+      jobs <= 0 || jobs <= build_running.length
 
     def add_running(job: Job): State =
       copy(running = running + (job.name -> job))
@@ -1064,7 +1061,6 @@
         val progress =
           new Database_Progress(db, build_progress,
             input_messages = build_context.master,
-            output_stopped = build_context.master,
             hostname = hostname,
             context_uuid = build_uuid,
             kind = "build_process",
@@ -1200,13 +1196,10 @@
         make_result(result_name, Process_Result.error, output_shasum)
     }
     else if (cancelled) {
-      if (build_context.master) {
-        progress.echo(session_name + " CANCELLED")
-        state
-          .remove_pending(session_name)
-          .make_result(result_name, Process_Result.undefined, output_shasum)
-      }
-      else state
+      progress.echo(session_name + " CANCELLED")
+      state
+        .remove_pending(session_name)
+        .make_result(result_name, Process_Result.undefined, output_shasum)
     }
     else {
       val build_log_verbose = build_options.bool("build_log_verbose")
@@ -1318,17 +1311,16 @@
   }
 
   protected def main_unsynchronized(): Unit = {
-    for (job <- _state.build_running.filter(_.is_finished)) {
-      _state = _state.remove_running(job.name)
-      for (result <- job.join_build) {
-        val result_name = (job.name, worker_uuid, build_uuid)
-        _state = _state.
-          remove_pending(job.name).
-          make_result(result_name,
-            result.process_result,
-            result.output_shasum,
-            node_info = job.node_info)
-      }
+    for (job <- _state.running.valuesIterator; build <- job.build if build.is_finished) {
+      val result = build.join
+      val result_name = (job.name, worker_uuid, build_uuid)
+      _state = _state.
+        remove_pending(job.name).
+        remove_running(job.name).
+        make_result(result_name,
+          result.process_result,
+          result.output_shasum,
+          node_info = job.node_info)
     }
 
     for (name <- next_jobs(_state)) {
--- a/src/Pure/Build/database_progress.scala	Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/Build/database_progress.scala	Wed Mar 13 23:26:30 2024 +0100
@@ -151,7 +151,6 @@
   db: SQL.Database,
   base_progress: Progress,
   input_messages: Boolean = false,
-  output_stopped: Boolean = false,
   kind: String = "progress",
   hostname: String = Isabelle_System.hostname(),
   context_uuid: String = UUID.random_string(),
@@ -171,7 +170,6 @@
   private var _agent_uuid: String = ""
   private var _context: Long = -1
   private var _serial: Long = 0
-  private var _stopped_db: Boolean = false
   private var _consumer: Consumer_Thread[Progress.Output] = null
 
   def agent_uuid: String = synchronized { _agent_uuid }
@@ -218,7 +216,7 @@
       val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
       val received = db.receive(n => n.channel == Database_Progress.private_data.channel)
       val ok =
-        bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
+        bulk_output.nonEmpty || expired || base_progress.stopped ||
         received.isEmpty ||
         received.get.contains(Database_Progress.private_data.channel_ping) ||
         input_messages && received.get.contains(Database_Progress.private_data.channel_output)
@@ -280,10 +278,10 @@
 
   private def sync_database[A](body: => A): A = synchronized {
     Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") {
-      _stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
+      val stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
 
-      if (_stopped_db && !base_progress.stopped) base_progress.stop()
-      if (!_stopped_db && base_progress.stopped && output_stopped) {
+      if (stopped_db && !base_progress.stopped) base_progress.stop()
+      if (!stopped_db && base_progress.stopped) {
         Database_Progress.private_data.write_progress_stopped(db, _context, true)
         db.send(Database_Progress.private_data.channel_ping)
       }
@@ -320,7 +318,6 @@
 
   override def stop(): Unit = sync_context { base_progress.stop(); sync() }
   override def stopped: Boolean = sync_context { base_progress.stopped }
-  override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db }
 
   override def toString: String = super.toString + ": database " + db
 
--- a/src/Pure/System/progress.scala	Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/System/progress.scala	Wed Mar 13 23:26:30 2024 +0100
@@ -88,7 +88,6 @@
     if (Thread.interrupted()) is_stopped = true
     is_stopped
   }
-  def stopped_local: Boolean = false
 
   final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e }
   final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt()