clarified build process roles: "worker" vs. "build";
authorwenzelm
Mon, 06 Mar 2023 17:29:00 +0100
changeset 77546 9b9179cda155
parent 77545 4af88aca2a4f
child 77547 1d8a12d1c2e9
clarified build process roles: "worker" vs. "build";
src/Pure/Tools/build.scala
src/Pure/Tools/build_process.scala
--- a/src/Pure/Tools/build.scala	Mon Mar 06 16:20:12 2023 +0100
+++ b/src/Pure/Tools/build.scala	Mon Mar 06 17:29:00 2023 +0100
@@ -168,7 +168,7 @@
       Isabelle_Thread.uninterruptible {
         val engine = get_engine(build_options.string("build_engine"))
         using(engine.init(build_context, progress)) { build_process =>
-          val res = build_process.run()
+          val res = build_process.run(master = true)
           Results(build_context, res)
         }
       }
--- a/src/Pure/Tools/build_process.scala	Mon Mar 06 16:20:12 2023 +0100
+++ b/src/Pure/Tools/build_process.scala	Mon Mar 06 17:29:00 2023 +0100
@@ -428,33 +428,69 @@
     object Workers {
       val worker_uuid = Generic.worker_uuid.make_primary_key
       val build_uuid = Generic.build_uuid
+      val start = SQL.Column.date("start")
       val stamp = SQL.Column.date("stamp")
+      val stop = SQL.Column.date("stop")
       val serial = SQL.Column.long("serial")
 
-      val table = make_table("workers", List(worker_uuid, build_uuid, stamp, serial))
+      val table = make_table("workers", List(worker_uuid, build_uuid, start, stamp, stop, serial))
 
       val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")")
     }
 
-    def get_serial(db: SQL.Database, worker_uuid: String = ""): Long =
+    def serial_max(db: SQL.Database): Long =
       db.using_statement(
-        Workers.table.select(List(Workers.serial_max),
-          sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))
+        Workers.table.select(List(Workers.serial_max))
       )(stmt => stmt.execute_query().iterator(_.long(Workers.serial)).nextOption.getOrElse(0L))
 
-    def set_serial(db: SQL.Database, worker_uuid: String, build_uuid: String, serial: Long): Unit =
-      if (get_serial(db, worker_uuid = worker_uuid) != serial) {
-        db.execute_statement(
-          Workers.table.delete(sql = SQL.where(Generic.sql(worker_uuid = worker_uuid))))
-        db.execute_statement(Workers.table.insert(), body =
-          { stmt =>
-            stmt.string(1) = worker_uuid
-            stmt.string(2) = build_uuid
-            stmt.date(3) = db.now()
-            stmt.long(4) = serial
-          })
+    def start_worker(db: SQL.Database, worker_uuid: String, build_uuid: String): Long = {
+      def err(msg: String): Nothing =
+        error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg))
+
+      val build_stop = {
+        val sql =
+          Base.table.select(List(Base.stop),
+            sql = SQL.where(Generic.sql(build_uuid = build_uuid)))
+        db.using_statement(sql)(_.execute_query().iterator(_.get_date(Base.stop)).nextOption)
+      }
+      build_stop match {
+        case Some(None) =>
+        case Some(Some(_)) => err("for already stopped build process " + build_uuid)
+        case None => err("for unknown build process " + build_uuid)
       }
 
+      val serial = serial_max(db)
+      db.execute_statement(Workers.table.insert(), body =
+        { stmt =>
+          val now = db.now()
+          stmt.string(1) = worker_uuid
+          stmt.string(2) = build_uuid
+          stmt.date(3) = now
+          stmt.date(4) = now
+          stmt.date(5) = None
+          stmt.long(6) = serial
+        })
+      serial
+    }
+
+    def stamp_worker(
+      db: SQL.Database,
+      worker_uuid: String,
+      serial: Long,
+      stop: Boolean = false
+    ): Unit = {
+      val sql =
+        Workers.table.update(List(Workers.stamp, Workers.stop, Workers.serial),
+          sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))
+      db.execute_statement(sql, body =
+        { stmt =>
+          val now = db.now()
+          stmt.date(1) = now
+          stmt.date(2) = if (stop) Some(now) else None
+          stmt.long(3) = serial
+        })
+    }
+
 
     /* pending jobs */
 
@@ -645,10 +681,10 @@
           update_results(db, state.results),
           Host.Data.update_numa_next(db, hostname, state.numa_next))
 
-      val serial0 = get_serial(db)
+      val serial0 = serial_max(db)
       val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
 
-      set_serial(db, worker_uuid, build_uuid, serial)
+      stamp_worker(db, worker_uuid, serial)
       state.set_serial(serial)
     }
   }
@@ -705,7 +741,7 @@
       val state1 = _state.inc_serial.progress_serial()
       for (db <- _database) {
         Build_Process.Data.write_progress(db, state1.serial, message, build_uuid)
-        Build_Process.Data.set_serial(db, worker_uuid, build_uuid, state1.serial)
+        Build_Process.Data.stamp_worker(db, worker_uuid, state1.serial)
       }
       build_progress_output
       _state = state1
@@ -813,30 +849,46 @@
   }
 
 
+  /* build process roles */
+
+  final def start_build(): Unit = synchronized_database {
+    for (db <- _database) {
+      Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
+        store.options.make_prefs(Options.init(prefs = "")))
+    }
+  }
+
+  final def stop_build(): Unit = synchronized_database {
+    for (db <- _database) {
+      Build_Process.Data.stop_build(db, build_uuid)
+    }
+  }
+
+  final def start_worker(): Unit = synchronized_database {
+    for (db <- _database) {
+      val serial = Build_Process.Data.start_worker(db, worker_uuid, build_uuid)
+      _state = _state.set_serial(serial)
+    }
+  }
+
+  final def stop_worker(): Unit = synchronized_database {
+    for (db <- _database) {
+      Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true)
+    }
+  }
+
+
   /* run */
 
-  def run(): Map[String, Process_Result] = {
+  def run(master: Boolean = false): Map[String, Process_Result] = {
     def finished(): Boolean = synchronized_database { _state.finished }
 
-    def init(): Unit = synchronized_database {
-      for (db <- _database) {
-        Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
-          store.options.make_prefs(Options.init(prefs = "")))
-      }
-    }
-
-    def exit(): Unit = synchronized_database {
-      for (db <- _database) {
-        Build_Process.Data.stop_build(db, build_uuid)
-      }
-    }
-
     def sleep(): Unit =
       Isabelle_Thread.interrupt_handler(_ => progress.stop()) {
         build_options.seconds("editor_input_delay").sleep()
       }
 
-    def start(): Boolean = synchronized_database {
+    def start_job(): Boolean = synchronized_database {
       next_job(_state) match {
         case Some(name) =>
           if (Build_Job.is_session_name(name)) {
@@ -853,7 +905,8 @@
       Map.empty[String, Process_Result]
     }
     else {
-      init()
+      if (master) start_build()
+      start_worker()
       try {
         while (!finished()) {
           if (progress.stopped) synchronized_database { _state.stop_running() }
@@ -869,13 +922,16 @@
             }
           }
 
-          if (!start()) {
+          if (!start_job()) {
             sync_database()
             sleep()
           }
         }
       }
-      finally exit()
+      finally {
+        stop_worker()
+        if (master) stop_build()
+      }
 
       synchronized_database {
         for ((name, result) <- _state.results) yield name -> result.process_result