# HG changeset patch # User wenzelm # Date 1710368864 -3600 # Node ID 7b4b524cdee27cf96909359233584704b8d570b6 # Parent a3d53f2bc41d6a8d12ab6588fddb5e3aa392b592# Parent 17220dc05991d5ee3aa0e42769c6aa64549a3ef9 merged diff -r a3d53f2bc41d -r 7b4b524cdee2 etc/options --- a/etc/options Wed Mar 13 11:54:06 2024 +0100 +++ b/etc/options Wed Mar 13 23:27:44 2024 +0100 @@ -204,8 +204,14 @@ option build_delay : real = 0.2 -- "delay build process main loop (local)" -option build_cluster_delay : real = 1.0 - -- "delay build process main loop (cluster)" +option build_delay_master : real = 1.0 + -- "delay build process main loop (cluster master)" + +option build_delay_worker : real = 0.5 + -- "delay build process main loop (cluster worker)" + +option build_cluster_expire : int = 50 + -- "enforce database synchronization after given number of delay loops" option build_cluster_root : string = "$USER_HOME/.isabelle/build_cluster" -- "root directory for remote build cluster sessions" diff -r a3d53f2bc41d -r 7b4b524cdee2 src/Pure/Build/build_benchmark.scala --- a/src/Pure/Build/build_benchmark.scala Wed Mar 13 11:54:06 2024 +0100 +++ b/src/Pure/Build/build_benchmark.scala Wed Mar 13 23:27:44 2024 +0100 @@ -82,15 +82,13 @@ val local_build_context = build_context.copy(store = Store(local_options)) - val build = + val result = Build_Job.start_session(local_build_context, session, progress, new Logger, server, - background, session.sources_shasum, input_shasum, node_info, false) + background, session.sources_shasum, input_shasum, node_info, false).join val timing = - build.join match { - case Some(result) if result.process_result.ok => result.process_result.timing - case _ => error("Failed to build benchmark session") - } + if (result.process_result.ok) result.process_result.timing + else error("Failed to build benchmark session") val score = Time.seconds(1000).ms.toDouble / (1 + timing.elapsed.ms) progress.echo( diff -r a3d53f2bc41d -r 7b4b524cdee2 src/Pure/Build/build_job.scala --- a/src/Pure/Build/build_job.scala Wed Mar 13 11:54:06 2024 +0100 +++ b/src/Pure/Build/build_job.scala Wed Mar 13 23:27:44 2024 +0100 @@ -13,11 +13,12 @@ trait Build_Job { def cancel(): Unit = () def is_finished: Boolean = false - def join: Option[Build_Job.Result] = None + def join: Build_Job.Result = Build_Job.no_result } object Build_Job { sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum) + val no_result: Result = Result(Process_Result.undefined, SHA1.no_shasum) /* build session */ @@ -114,7 +115,7 @@ ) extends Build_Job { def session_name: String = session_background.session_name - private val future_result: Future[Option[Result]] = + private val future_result: Future[Result] = Future.thread("build", uninterruptible = true) { val info = session_background.sessions_structure(session_name) val options = Host.node_options(info.options, node_info) @@ -508,15 +509,10 @@ process_result.rc, build_context.build_uuid)) - val valid = - if (progress.stopped_local) false - else { - database_server match { - case Some(db) => write_info(db) - case None => using(store.open_database(session_name, output = true))(write_info) - } - true - } + database_server match { + case Some(db) => write_info(db) + case None => using(store.open_database(session_name, output = true))(write_info) + } using_optional(store.maybe_open_heaps_database(database_server, server = server)) { heaps_database => @@ -554,13 +550,12 @@ } } - if (valid) Some(Result(process_result.copy(out_lines = log_lines), output_shasum)) - else None + Result(process_result.copy(out_lines = log_lines), output_shasum) } } override def cancel(): Unit = future_result.cancel() override def is_finished: Boolean = future_result.is_finished - override def join: Option[Result] = future_result.join + override def join: Result = future_result.join } } diff -r a3d53f2bc41d -r 7b4b524cdee2 src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala Wed Mar 13 11:54:06 2024 +0100 +++ b/src/Pure/Build/build_process.scala Wed Mar 13 23:27:44 2024 +0100 @@ -65,11 +65,7 @@ node_info: Host.Node_Info, start_date: Date, build: Option[Build_Job] - ) extends Library.Named { - def cancel(): Unit = build.foreach(_.cancel()) - def is_finished: Boolean = build.isDefined && build.get.is_finished - def join_build: Option[Build_Job.Result] = build.flatMap(_.join) - } + ) extends Library.Named sealed case class Result( name: String, @@ -233,7 +229,9 @@ def next_serial: Long = State.inc_serial(serial) def ready: List[Task] = pending.valuesIterator.filter(_.is_ready).toList.sortBy(_.name) - def next_ready: List[Task] = ready.filter(entry => !is_running(entry.name)) + def next_ready: List[Task] = ready.filter(task => !is_running(task.name)) + def exists_ready: Boolean = + pending.valuesIterator.exists(task => task.is_ready && !is_running(task.name)) def remove_pending(a: String): State = copy(pending = @@ -250,8 +248,14 @@ def is_running(name: String): Boolean = running.isDefinedAt(name) - def build_running: List[Job] = - List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job) + def build_running: List[Build_Job] = + running.valuesIterator.flatMap(_.build).toList + + def finished_running(): Boolean = + build_running.exists(_.is_finished) + + def busy_running(jobs: Int): Boolean = + jobs <= 0 || jobs <= build_running.length def add_running(job: Job): State = copy(running = running + (job.name -> job)) @@ -287,6 +291,9 @@ object private_data extends SQL.Data("isabelle_build") { val database: Path = Path.explode("$ISABELLE_HOME_USER/build.db") + + /* tables */ + override lazy val tables: SQL.Tables = SQL.Tables( Updates.table, @@ -301,6 +308,15 @@ private lazy val build_id_tables = tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t)) + + /* notifications */ + + lazy val channel: String = Base.table.name + lazy val channel_ready: SQL.Notification = SQL.Notification(channel, payload = "ready") + + + /* generic columns */ + object Generic { val build_id = SQL.Column.long("build_id") val build_uuid = SQL.Column.string("build_uuid") @@ -940,6 +956,7 @@ build_start: Date ): Long = private_data.transaction_lock(db, create = true, label = "Build_Process.init_build") { + db.listen(private_data.channel) val build_uuid = build_context.build_uuid val build_id = private_data.get_build_id(db, build_uuid) if (build_context.master) { @@ -1010,14 +1027,27 @@ } catch { case exn: Throwable => close(); throw exn } - protected val build_delay: Time = { - val option = - _build_database match { - case Some(db) if db.is_postgresql => "build_cluster_delay" - case _ => "build_delay" - } - build_options.seconds(option) - } + protected def build_receive(filter: SQL.Notification => Boolean): List[SQL.Notification] = + _build_database.flatMap(_.receive(filter)).getOrElse(Nil) + + protected def build_send(notification: SQL.Notification): Unit = + _build_database.foreach(_.send(notification)) + + protected def build_cluster: Boolean = + _build_database match { + case Some(db) => db.is_postgresql + case None => false + } + + protected val build_delay: Time = + build_options.seconds( + if (!build_cluster) "build_delay" + else if (build_context.master) "build_delay_master" + else "build_delay_worker") + + protected val build_expire: Int = + if (!build_cluster || build_context.master) 1 + else build_options.int("build_cluster_expire").max(1) protected val _host_database: SQL.Database = try { store.open_build_database(path = Host.private_data.database, server = server) } @@ -1031,11 +1061,11 @@ val progress = new Database_Progress(db, build_progress, input_messages = build_context.master, - output_stopped = build_context.master, hostname = hostname, context_uuid = build_uuid, kind = "build_process", - timeout = Some(build_delay)) + timeout = Some(build_delay), + tick_expire = build_expire) (progress, progress.agent_uuid) } catch { case exn: Throwable => close(); throw exn } @@ -1166,13 +1196,10 @@ make_result(result_name, Process_Result.error, output_shasum) } else if (cancelled) { - if (build_context.master) { - progress.echo(session_name + " CANCELLED") - state - .remove_pending(session_name) - .make_result(result_name, Process_Result.undefined, output_shasum) - } - else state + progress.echo(session_name + " CANCELLED") + state + .remove_pending(session_name) + .make_result(result_name, Process_Result.undefined, output_shasum) } else { val build_log_verbose = build_options.bool("build_log_verbose") @@ -1247,8 +1274,24 @@ else _state.pending.isEmpty } - protected def sleep(): Unit = - Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() } + private var _build_tick: Long = 0L + + protected def build_action(): Boolean = + Isabelle_Thread.interrupt_handler(_ => progress.stop()) { + val received = build_receive(n => n.channel == Build_Process.private_data.channel) + val ready = received.contains(Build_Process.private_data.channel_ready) + val reactive = ready && synchronized { !_state.busy_running(build_context.jobs) } + + val finished = synchronized { _state.finished_running() } + + def sleep: Boolean = { + build_delay.sleep() + val expired = synchronized { _build_tick += 1; _build_tick % build_expire == 0 } + expired || reactive || progress.stopped + } + + finished || sleep + } protected def init_unsynchronized(): Unit = { if (build_context.master) { @@ -1268,17 +1311,16 @@ } protected def main_unsynchronized(): Unit = { - for (job <- _state.build_running.filter(_.is_finished)) { - _state = _state.remove_running(job.name) - for (result <- job.join_build) { - val result_name = (job.name, worker_uuid, build_uuid) - _state = _state. - remove_pending(job.name). - make_result(result_name, - result.process_result, - result.output_shasum, - node_info = job.node_info) - } + for (job <- _state.running.valuesIterator; build <- job.build if build.is_finished) { + val result = build.join + val result_name = (job.name, worker_uuid, build_uuid) + _state = _state. + remove_pending(job.name). + remove_running(job.name). + make_result(result_name, + result.process_result, + result.output_shasum, + node_info = job.node_info) } for (name <- next_jobs(_state)) { @@ -1316,8 +1358,11 @@ synchronized_database("Build_Process.main") { if (progress.stopped) _state.build_running.foreach(_.cancel()) main_unsynchronized() + if (build_context.master && _state.exists_ready) { + build_send(Build_Process.private_data.channel_ready) + } } - sleep() + while(!build_action()) {} } } finally { diff -r a3d53f2bc41d -r 7b4b524cdee2 src/Pure/Build/database_progress.scala --- a/src/Pure/Build/database_progress.scala Wed Mar 13 11:54:06 2024 +0100 +++ b/src/Pure/Build/database_progress.scala Wed Mar 13 23:27:44 2024 +0100 @@ -151,7 +151,6 @@ db: SQL.Database, base_progress: Progress, input_messages: Boolean = false, - output_stopped: Boolean = false, kind: String = "progress", hostname: String = Isabelle_System.hostname(), context_uuid: String = UUID.random_string(), @@ -171,7 +170,6 @@ private var _agent_uuid: String = "" private var _context: Long = -1 private var _serial: Long = 0 - private var _stopped_db: Boolean = false private var _consumer: Consumer_Thread[Progress.Output] = null def agent_uuid: String = synchronized { _agent_uuid } @@ -218,7 +216,7 @@ val expired = synchronized { _tick += 1; _tick % tick_expire == 0 } val received = db.receive(n => n.channel == Database_Progress.private_data.channel) val ok = - bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped || + bulk_output.nonEmpty || expired || base_progress.stopped || received.isEmpty || received.get.contains(Database_Progress.private_data.channel_ping) || input_messages && received.get.contains(Database_Progress.private_data.channel_output) @@ -249,7 +247,8 @@ } _consumer = Consumer_Thread.fork_bulk[Progress.Output](name = "Database_Progress.consumer")( - bulk = _ => true, timeout = timeout, + bulk = _ => true, + timeout = timeout, consume = { bulk_output => val results = if (bulk_output.isEmpty) consume(Nil) @@ -279,10 +278,10 @@ private def sync_database[A](body: => A): A = synchronized { Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") { - _stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context) + val stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context) - if (_stopped_db && !base_progress.stopped) base_progress.stop() - if (!_stopped_db && base_progress.stopped && output_stopped) { + if (stopped_db && !base_progress.stopped) base_progress.stop() + if (!stopped_db && base_progress.stopped) { Database_Progress.private_data.write_progress_stopped(db, _context, true) db.send(Database_Progress.private_data.channel_ping) } @@ -319,7 +318,6 @@ override def stop(): Unit = sync_context { base_progress.stop(); sync() } override def stopped: Boolean = sync_context { base_progress.stopped } - override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db } override def toString: String = super.toString + ": database " + db diff -r a3d53f2bc41d -r 7b4b524cdee2 src/Pure/General/sql.scala --- a/src/Pure/General/sql.scala Wed Mar 13 11:54:06 2024 +0100 +++ b/src/Pure/General/sql.scala Wed Mar 13 23:27:44 2024 +0100 @@ -406,7 +406,7 @@ /* notifications: IPC via database server */ sealed case class Notification(channel: String, payload: String = "") { - override def toString = + override def toString: String = "Notification(" + channel + if_proper(payload, "," + payload) + ")" } diff -r a3d53f2bc41d -r 7b4b524cdee2 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Wed Mar 13 11:54:06 2024 +0100 +++ b/src/Pure/System/progress.scala Wed Mar 13 23:27:44 2024 +0100 @@ -88,7 +88,6 @@ if (Thread.interrupted()) is_stopped = true is_stopped } - def stopped_local: Boolean = false final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e } final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt()