# HG changeset patch # User wenzelm # Date 1678120140 -3600 # Node ID 9b9179cda1551e91c9f0fe5100eceaca7b8a11c1 # Parent 4af88aca2a4ff813f70169c7d57d35ba4fdd9bf0 clarified build process roles: "worker" vs. "build"; diff -r 4af88aca2a4f -r 9b9179cda155 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Mon Mar 06 16:20:12 2023 +0100 +++ b/src/Pure/Tools/build.scala Mon Mar 06 17:29:00 2023 +0100 @@ -168,7 +168,7 @@ Isabelle_Thread.uninterruptible { val engine = get_engine(build_options.string("build_engine")) using(engine.init(build_context, progress)) { build_process => - val res = build_process.run() + val res = build_process.run(master = true) Results(build_context, res) } } diff -r 4af88aca2a4f -r 9b9179cda155 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Mon Mar 06 16:20:12 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Mon Mar 06 17:29:00 2023 +0100 @@ -428,33 +428,69 @@ object Workers { val worker_uuid = Generic.worker_uuid.make_primary_key val build_uuid = Generic.build_uuid + val start = SQL.Column.date("start") val stamp = SQL.Column.date("stamp") + val stop = SQL.Column.date("stop") val serial = SQL.Column.long("serial") - val table = make_table("workers", List(worker_uuid, build_uuid, stamp, serial)) + val table = make_table("workers", List(worker_uuid, build_uuid, start, stamp, stop, serial)) val serial_max = serial.copy(expr = "MAX(" + serial.ident + ")") } - def get_serial(db: SQL.Database, worker_uuid: String = ""): Long = + def serial_max(db: SQL.Database): Long = db.using_statement( - Workers.table.select(List(Workers.serial_max), - sql = SQL.where(Generic.sql(worker_uuid = worker_uuid))) + Workers.table.select(List(Workers.serial_max)) )(stmt => stmt.execute_query().iterator(_.long(Workers.serial)).nextOption.getOrElse(0L)) - def set_serial(db: SQL.Database, worker_uuid: String, build_uuid: String, serial: Long): Unit = - if (get_serial(db, worker_uuid = worker_uuid) != serial) { - db.execute_statement( - Workers.table.delete(sql = SQL.where(Generic.sql(worker_uuid = worker_uuid)))) - db.execute_statement(Workers.table.insert(), body = - { stmt => - stmt.string(1) = worker_uuid - stmt.string(2) = build_uuid - stmt.date(3) = db.now() - stmt.long(4) = serial - }) + def start_worker(db: SQL.Database, worker_uuid: String, build_uuid: String): Long = { + def err(msg: String): Nothing = + error("Cannot start worker " + worker_uuid + if_proper(msg, "\n" + msg)) + + val build_stop = { + val sql = + Base.table.select(List(Base.stop), + sql = SQL.where(Generic.sql(build_uuid = build_uuid))) + db.using_statement(sql)(_.execute_query().iterator(_.get_date(Base.stop)).nextOption) + } + build_stop match { + case Some(None) => + case Some(Some(_)) => err("for already stopped build process " + build_uuid) + case None => err("for unknown build process " + build_uuid) } + val serial = serial_max(db) + db.execute_statement(Workers.table.insert(), body = + { stmt => + val now = db.now() + stmt.string(1) = worker_uuid + stmt.string(2) = build_uuid + stmt.date(3) = now + stmt.date(4) = now + stmt.date(5) = None + stmt.long(6) = serial + }) + serial + } + + def stamp_worker( + db: SQL.Database, + worker_uuid: String, + serial: Long, + stop: Boolean = false + ): Unit = { + val sql = + Workers.table.update(List(Workers.stamp, Workers.stop, Workers.serial), + sql = SQL.where(Generic.sql(worker_uuid = worker_uuid))) + db.execute_statement(sql, body = + { stmt => + val now = db.now() + stmt.date(1) = now + stmt.date(2) = if (stop) Some(now) else None + stmt.long(3) = serial + }) + } + /* pending jobs */ @@ -645,10 +681,10 @@ update_results(db, state.results), Host.Data.update_numa_next(db, hostname, state.numa_next)) - val serial0 = get_serial(db) + val serial0 = serial_max(db) val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0 - set_serial(db, worker_uuid, build_uuid, serial) + stamp_worker(db, worker_uuid, serial) state.set_serial(serial) } } @@ -705,7 +741,7 @@ val state1 = _state.inc_serial.progress_serial() for (db <- _database) { Build_Process.Data.write_progress(db, state1.serial, message, build_uuid) - Build_Process.Data.set_serial(db, worker_uuid, build_uuid, state1.serial) + Build_Process.Data.stamp_worker(db, worker_uuid, state1.serial) } build_progress_output _state = state1 @@ -813,30 +849,46 @@ } + /* build process roles */ + + final def start_build(): Unit = synchronized_database { + for (db <- _database) { + Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform, + store.options.make_prefs(Options.init(prefs = ""))) + } + } + + final def stop_build(): Unit = synchronized_database { + for (db <- _database) { + Build_Process.Data.stop_build(db, build_uuid) + } + } + + final def start_worker(): Unit = synchronized_database { + for (db <- _database) { + val serial = Build_Process.Data.start_worker(db, worker_uuid, build_uuid) + _state = _state.set_serial(serial) + } + } + + final def stop_worker(): Unit = synchronized_database { + for (db <- _database) { + Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true) + } + } + + /* run */ - def run(): Map[String, Process_Result] = { + def run(master: Boolean = false): Map[String, Process_Result] = { def finished(): Boolean = synchronized_database { _state.finished } - def init(): Unit = synchronized_database { - for (db <- _database) { - Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform, - store.options.make_prefs(Options.init(prefs = ""))) - } - } - - def exit(): Unit = synchronized_database { - for (db <- _database) { - Build_Process.Data.stop_build(db, build_uuid) - } - } - def sleep(): Unit = Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_options.seconds("editor_input_delay").sleep() } - def start(): Boolean = synchronized_database { + def start_job(): Boolean = synchronized_database { next_job(_state) match { case Some(name) => if (Build_Job.is_session_name(name)) { @@ -853,7 +905,8 @@ Map.empty[String, Process_Result] } else { - init() + if (master) start_build() + start_worker() try { while (!finished()) { if (progress.stopped) synchronized_database { _state.stop_running() } @@ -869,13 +922,16 @@ } } - if (!start()) { + if (!start_job()) { sync_database() sleep() } } } - finally exit() + finally { + stop_worker() + if (master) stop_build() + } synchronized_database { for ((name, result) <- _state.results) yield name -> result.process_result