merged
authorwenzelm
Wed, 13 Mar 2024 23:27:44 +0100
changeset 79888 7b4b524cdee2
parent 79880 a3d53f2bc41d (current diff)
parent 79887 17220dc05991 (diff)
child 79889 b187c1b9d6a9
merged
--- a/etc/options	Wed Mar 13 11:54:06 2024 +0100
+++ b/etc/options	Wed Mar 13 23:27:44 2024 +0100
@@ -204,8 +204,14 @@
 option build_delay : real = 0.2
   -- "delay build process main loop (local)"
 
-option build_cluster_delay : real = 1.0
-  -- "delay build process main loop (cluster)"
+option build_delay_master : real = 1.0
+  -- "delay build process main loop (cluster master)"
+
+option build_delay_worker : real = 0.5
+  -- "delay build process main loop (cluster worker)"
+
+option build_cluster_expire : int = 50
+  -- "enforce database synchronization after given number of delay loops"
 
 option build_cluster_root : string = "$USER_HOME/.isabelle/build_cluster"
   -- "root directory for remote build cluster sessions"
--- a/src/Pure/Build/build_benchmark.scala	Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/Build/build_benchmark.scala	Wed Mar 13 23:27:44 2024 +0100
@@ -82,15 +82,13 @@
 
         val local_build_context = build_context.copy(store = Store(local_options))
 
-        val build =
+        val result =
           Build_Job.start_session(local_build_context, session, progress, new Logger, server,
-            background, session.sources_shasum, input_shasum, node_info, false)
+            background, session.sources_shasum, input_shasum, node_info, false).join
 
         val timing =
-          build.join match {
-            case Some(result) if result.process_result.ok => result.process_result.timing
-            case _ => error("Failed to build benchmark session")
-          }
+          if (result.process_result.ok) result.process_result.timing
+          else error("Failed to build benchmark session")
 
         val score = Time.seconds(1000).ms.toDouble / (1 + timing.elapsed.ms)
         progress.echo(
--- a/src/Pure/Build/build_job.scala	Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/Build/build_job.scala	Wed Mar 13 23:27:44 2024 +0100
@@ -13,11 +13,12 @@
 trait Build_Job {
   def cancel(): Unit = ()
   def is_finished: Boolean = false
-  def join: Option[Build_Job.Result] = None
+  def join: Build_Job.Result = Build_Job.no_result
 }
 
 object Build_Job {
   sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum)
+  val no_result: Result = Result(Process_Result.undefined, SHA1.no_shasum)
 
 
   /* build session */
@@ -114,7 +115,7 @@
   ) extends Build_Job {
     def session_name: String = session_background.session_name
 
-    private val future_result: Future[Option[Result]] =
+    private val future_result: Future[Result] =
       Future.thread("build", uninterruptible = true) {
         val info = session_background.sessions_structure(session_name)
         val options = Host.node_options(info.options, node_info)
@@ -508,15 +509,10 @@
                   process_result.rc,
                   build_context.build_uuid))
 
-          val valid =
-            if (progress.stopped_local) false
-            else {
-              database_server match {
-                case Some(db) => write_info(db)
-                case None => using(store.open_database(session_name, output = true))(write_info)
-              }
-              true
-            }
+          database_server match {
+            case Some(db) => write_info(db)
+            case None => using(store.open_database(session_name, output = true))(write_info)
+          }
 
           using_optional(store.maybe_open_heaps_database(database_server, server = server)) {
             heaps_database =>
@@ -554,13 +550,12 @@
             }
           }
 
-          if (valid) Some(Result(process_result.copy(out_lines = log_lines), output_shasum))
-          else None
+          Result(process_result.copy(out_lines = log_lines), output_shasum)
         }
       }
 
     override def cancel(): Unit = future_result.cancel()
     override def is_finished: Boolean = future_result.is_finished
-    override def join: Option[Result] = future_result.join
+    override def join: Result = future_result.join
   }
 }
--- a/src/Pure/Build/build_process.scala	Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/Build/build_process.scala	Wed Mar 13 23:27:44 2024 +0100
@@ -65,11 +65,7 @@
     node_info: Host.Node_Info,
     start_date: Date,
     build: Option[Build_Job]
-  ) extends Library.Named {
-    def cancel(): Unit = build.foreach(_.cancel())
-    def is_finished: Boolean = build.isDefined && build.get.is_finished
-    def join_build: Option[Build_Job.Result] = build.flatMap(_.join)
-  }
+  ) extends Library.Named
 
   sealed case class Result(
     name: String,
@@ -233,7 +229,9 @@
     def next_serial: Long = State.inc_serial(serial)
 
     def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name)
-    def next_ready: List[Task] = ready.filter(entry => !is_running(entry.name))
+    def next_ready: List[Task] = ready.filter(task => !is_running(task.name))
+    def exists_ready: Boolean =
+      pending.valuesIterator.exists(task => task.is_ready && !is_running(task.name))
 
     def remove_pending(a: String): State =
       copy(pending =
@@ -250,8 +248,14 @@
 
     def is_running(name: String): Boolean = running.isDefinedAt(name)
 
-    def build_running: List[Job] =
-      List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job)
+    def build_running: List[Build_Job] =
+      running.valuesIterator.flatMap(_.build).toList
+
+    def finished_running(): Boolean =
+      build_running.exists(_.is_finished)
+
+    def busy_running(jobs: Int): Boolean =
+      jobs <= 0 || jobs <= build_running.length
 
     def add_running(job: Job): State =
       copy(running = running + (job.name -> job))
@@ -287,6 +291,9 @@
   object private_data extends SQL.Data("isabelle_build") {
     val database: Path = Path.explode("$ISABELLE_HOME_USER/build.db")
 
+
+    /* tables */
+
     override lazy val tables: SQL.Tables =
       SQL.Tables(
         Updates.table,
@@ -301,6 +308,15 @@
     private lazy val build_id_tables =
       tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t))
 
+
+    /* notifications */
+
+    lazy val channel: String = Base.table.name
+    lazy val channel_ready: SQL.Notification = SQL.Notification(channel, payload = "ready")
+
+
+    /* generic columns */
+
     object Generic {
       val build_id = SQL.Column.long("build_id")
       val build_uuid = SQL.Column.string("build_uuid")
@@ -940,6 +956,7 @@
     build_start: Date
   ): Long =
     private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") {
+      db.listen(private_data.channel)
       val build_uuid = build_context.build_uuid
       val build_id = private_data.get_build_id(db, build_uuid)
       if (build_context.master) {
@@ -1010,14 +1027,27 @@
     }
     catch { case exn: Throwable => close(); throw exn }
 
-  protected val build_delay: Time = {
-    val option =
-      _build_database match {
-        case Some(db) if db.is_postgresql => "build_cluster_delay"
-        case _ => "build_delay"
-      }
-    build_options.seconds(option)
-  }
+  protected def build_receive(filter: SQL.Notification => Boolean): List[SQL.Notification] =
+    _build_database.flatMap(_.receive(filter)).getOrElse(Nil)
+
+  protected def build_send(notification: SQL.Notification): Unit =
+    _build_database.foreach(_.send(notification))
+
+  protected def build_cluster: Boolean =
+    _build_database match {
+      case Some(db) => db.is_postgresql
+      case None => false
+    }
+
+  protected val build_delay: Time =
+    build_options.seconds(
+      if (!build_cluster) "build_delay"
+      else if (build_context.master) "build_delay_master"
+      else "build_delay_worker")
+
+  protected val build_expire: Int =
+    if (!build_cluster || build_context.master) 1
+    else build_options.int("build_cluster_expire").max(1)
 
   protected val _host_database: SQL.Database =
     try { store.open_build_database(path = Host.private_data.database, server = server) }
@@ -1031,11 +1061,11 @@
         val progress =
           new Database_Progress(db, build_progress,
             input_messages = build_context.master,
-            output_stopped = build_context.master,
             hostname = hostname,
             context_uuid = build_uuid,
             kind = "build_process",
-            timeout = Some(build_delay))
+            timeout = Some(build_delay),
+            tick_expire = build_expire)
         (progress, progress.agent_uuid)
       }
       catch { case exn: Throwable => close(); throw exn }
@@ -1166,13 +1196,10 @@
         make_result(result_name, Process_Result.error, output_shasum)
     }
     else if (cancelled) {
-      if (build_context.master) {
-        progress.echo(session_name + " CANCELLED")
-        state
-          .remove_pending(session_name)
-          .make_result(result_name, Process_Result.undefined, output_shasum)
-      }
-      else state
+      progress.echo(session_name + " CANCELLED")
+      state
+        .remove_pending(session_name)
+        .make_result(result_name, Process_Result.undefined, output_shasum)
     }
     else {
       val build_log_verbose = build_options.bool("build_log_verbose")
@@ -1247,8 +1274,24 @@
     else _state.pending.isEmpty
   }
 
-  protected def sleep(): Unit =
-    Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() }
+  private var _build_tick: Long = 0L
+
+  protected def build_action(): Boolean =
+    Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
+      val received = build_receive(n => n.channel == Build_Process.private_data.channel)
+      val ready = received.contains(Build_Process.private_data.channel_ready)
+      val reactive = ready && synchronized { !_state.busy_running(build_context.jobs) }
+
+      val finished = synchronized { _state.finished_running() }
+
+      def sleep: Boolean = {
+        build_delay.sleep()
+        val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 }
+        expired || reactive || progress.stopped
+      }
+
+      finished || sleep
+    }
 
   protected def init_unsynchronized(): Unit = {
     if (build_context.master) {
@@ -1268,17 +1311,16 @@
   }
 
   protected def main_unsynchronized(): Unit = {
-    for (job <- _state.build_running.filter(_.is_finished)) {
-      _state = _state.remove_running(job.name)
-      for (result <- job.join_build) {
-        val result_name = (job.name, worker_uuid, build_uuid)
-        _state = _state.
-          remove_pending(job.name).
-          make_result(result_name,
-            result.process_result,
-            result.output_shasum,
-            node_info = job.node_info)
-      }
+    for (job <- _state.running.valuesIterator; build <- job.build if build.is_finished) {
+      val result = build.join
+      val result_name = (job.name, worker_uuid, build_uuid)
+      _state = _state.
+        remove_pending(job.name).
+        remove_running(job.name).
+        make_result(result_name,
+          result.process_result,
+          result.output_shasum,
+          node_info = job.node_info)
     }
 
     for (name <- next_jobs(_state)) {
@@ -1316,8 +1358,11 @@
           synchronized_database("Build_Process.main") {
             if (progress.stopped) _state.build_running.foreach(_.cancel())
             main_unsynchronized()
+            if (build_context.master && _state.exists_ready) {
+              build_send(Build_Process.private_data.channel_ready)
+            }
           }
-          sleep()
+          while(!build_action()) {}
         }
       }
       finally {
--- a/src/Pure/Build/database_progress.scala	Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/Build/database_progress.scala	Wed Mar 13 23:27:44 2024 +0100
@@ -151,7 +151,6 @@
   db: SQL.Database,
   base_progress: Progress,
   input_messages: Boolean = false,
-  output_stopped: Boolean = false,
   kind: String = "progress",
   hostname: String = Isabelle_System.hostname(),
   context_uuid: String = UUID.random_string(),
@@ -171,7 +170,6 @@
   private var _agent_uuid: String = ""
   private var _context: Long = -1
   private var _serial: Long = 0
-  private var _stopped_db: Boolean = false
   private var _consumer: Consumer_Thread[Progress.Output] = null
 
   def agent_uuid: String = synchronized { _agent_uuid }
@@ -218,7 +216,7 @@
       val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
       val received = db.receive(n => n.channel == Database_Progress.private_data.channel)
       val ok =
-        bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
+        bulk_output.nonEmpty || expired || base_progress.stopped ||
         received.isEmpty ||
         received.get.contains(Database_Progress.private_data.channel_ping) ||
         input_messages && received.get.contains(Database_Progress.private_data.channel_output)
@@ -249,7 +247,8 @@
     }
 
     _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")(
-      bulk = _ => true, timeout = timeout,
+      bulk = _ => true,
+      timeout = timeout,
       consume = { bulk_output =>
         val results =
           if (bulk_output.isEmpty) consume(Nil)
@@ -279,10 +278,10 @@
 
   private def sync_database[A](body: => A): A = synchronized {
     Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") {
-      _stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
+      val stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
 
-      if (_stopped_db && !base_progress.stopped) base_progress.stop()
-      if (!_stopped_db && base_progress.stopped && output_stopped) {
+      if (stopped_db && !base_progress.stopped) base_progress.stop()
+      if (!stopped_db && base_progress.stopped) {
         Database_Progress.private_data.write_progress_stopped(db, _context, true)
         db.send(Database_Progress.private_data.channel_ping)
       }
@@ -319,7 +318,6 @@
 
   override def stop(): Unit = sync_context { base_progress.stop(); sync() }
   override def stopped: Boolean = sync_context { base_progress.stopped }
-  override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db }
 
   override def toString: String = super.toString + ": database " + db
 
--- a/src/Pure/General/sql.scala	Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/General/sql.scala	Wed Mar 13 23:27:44 2024 +0100
@@ -406,7 +406,7 @@
   /* notifications: IPC via database server */
 
   sealed case class Notification(channel: String, payload: String = "") {
-    override def toString =
+    override def toString: String =
       "Notification(" + channel + if_proper(payload, "," + payload) + ")"
   }
 
--- a/src/Pure/System/progress.scala	Wed Mar 13 11:54:06 2024 +0100
+++ b/src/Pure/System/progress.scala	Wed Mar 13 23:27:44 2024 +0100
@@ -88,7 +88,6 @@
     if (Thread.interrupted()) is_stopped = true
     is_stopped
   }
-  def stopped_local: Boolean = false
 
   final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e }
   final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt()