# HG changeset patch # User wenzelm # Date 1692189763 -7200 # Node ID 0e79fa88cab6f488381a0a629daf029eed74a71d # Parent 3d6dbf215559bc597a94c9745dd4e071010cc36a build_worker is stopped independently from master build_process; diff -r 3d6dbf215559 -r 0e79fa88cab6 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Sun Aug 13 19:27:58 2023 +0200 +++ b/src/Pure/System/progress.scala Wed Aug 16 14:42:43 2023 +0200 @@ -217,6 +217,7 @@ if (Thread.interrupted()) is_stopped = true is_stopped } + def stopped_local: Boolean = false final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e } final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt() @@ -264,6 +265,7 @@ class Database_Progress( val db: SQL.Database, val base_progress: Progress, + val output_stopped: Boolean = false, val kind: String = "progress", val hostname: String = Isabelle_System.hostname(), val context_uuid: String = UUID.random().toString) @@ -273,6 +275,7 @@ private var _agent_uuid: String = "" private var _context: Long = -1 private var _serial: Long = 0 + private var _stopped_db: Boolean = false def agent_uuid: String = synchronized { _agent_uuid } @@ -327,11 +330,12 @@ if (_context == 0) throw new IllegalStateException("Database_Progress after exit") Progress.private_data.transaction_lock(db, label = "Database_Progress.sync") { - val stopped_db = Progress.private_data.read_progress_stopped(db, _context) - val stopped = base_progress.stopped + _stopped_db = Progress.private_data.read_progress_stopped(db, _context) - if (stopped_db && !stopped) base_progress.stop() - if (stopped && !stopped_db) Progress.private_data.write_progress_stopped(db, _context, true) + if (_stopped_db && !base_progress.stopped) base_progress.stop() + if (!_stopped_db && base_progress.stopped && output_stopped) { + Progress.private_data.write_progress_stopped(db, _context, true) + } val messages = Progress.private_data.read_messages(db, _context, seen = _serial) for ((message_serial, message) <- messages) { @@ -371,6 +375,7 @@ override def stop(): Unit = synchronized { base_progress.stop(); sync() } override def stopped: Boolean = sync_database { base_progress.stopped } + override def stopped_local: Boolean = sync_database { base_progress.stopped && !_stopped_db } override def toString: String = super.toString + ": database " + db diff -r 3d6dbf215559 -r 0e79fa88cab6 src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Sun Aug 13 19:27:58 2023 +0200 +++ b/src/Pure/Tools/build_job.scala Wed Aug 16 14:42:43 2023 +0200 @@ -13,7 +13,7 @@ trait Build_Job { def cancel(): Unit = () def is_finished: Boolean = false - def join: (Process_Result, SHA1.Shasum) = (Process_Result.undefined, SHA1.no_shasum) + def join: Option[(Process_Result, SHA1.Shasum)] = None } object Build_Job { @@ -111,7 +111,7 @@ ) extends Build_Job { def session_name: String = session_background.session_name - private val future_result: Future[(Process_Result, SHA1.Shasum)] = + private val future_result: Future[Option[(Process_Result, SHA1.Shasum)]] = Future.thread("build", uninterruptible = true) { val info = session_background.sessions_structure(session_name) val options = build_context.engine.process_options(info.options, node_info) @@ -502,10 +502,16 @@ output_heap = output_shasum, process_result.rc, build_context.build_uuid)) - database_server match { - case Some(db) => write_info(db) - case None => using(store.open_database(session_name, output = true))(write_info) - } + + val valid = + if (progress.stopped_local) false + else { + database_server match { + case Some(db) => write_info(db) + case None => using(store.open_database(session_name, output = true))(write_info) + } + true + } // messages process_result.err_lines.foreach(progress.echo(_)) @@ -531,12 +537,12 @@ } } - (process_result.copy(out_lines = log_lines), output_shasum) + if (valid) Some((process_result.copy(out_lines = log_lines), output_shasum)) else None } } override def cancel(): Unit = future_result.cancel() override def is_finished: Boolean = future_result.is_finished - override def join: (Process_Result, SHA1.Shasum) = future_result.join + override def join: Option[(Process_Result, SHA1.Shasum)] = future_result.join } } diff -r 3d6dbf215559 -r 0e79fa88cab6 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sun Aug 13 19:27:58 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Wed Aug 16 14:42:43 2023 +0200 @@ -55,6 +55,7 @@ build: Option[Build_Job] ) extends Library.Named { def no_build: Job = copy(build = None) + def join_build: Option[(Process_Result, SHA1.Shasum)] = build.flatMap(_.join) } sealed case class Result( @@ -217,8 +218,6 @@ copy(serial = i) } - def finished: Boolean = pending.isEmpty - def remove_pending(name: String): State = copy(pending = pending.flatMap( entry => if (entry.name == name) None else Some(entry.resolve(name)))) @@ -228,10 +227,13 @@ def stop_running(): Unit = for (job <- running.valuesIterator; build <- job.build) build.cancel() + def build_running: List[Build_Job] = + List.from(for (job <- running.valuesIterator; build <- job.build) yield build) + def finished_running(): List[Job] = List.from( for (job <- running.valuesIterator; build <- job.build if build.is_finished) - yield job) + yield job) def add_running(job: Job): State = copy(running = running + (job.name -> job)) @@ -880,6 +882,7 @@ val progress_db = store.open_build_database(Progress.private_data.database, server = server) val progress = new Database_Progress(progress_db, build_progress, + output_stopped = build_context.master, hostname = hostname, context_uuid = build_uuid, kind = "build_process") @@ -957,8 +960,10 @@ } protected def next_jobs(state: Build_Process.State): List[String] = { - val running = List.from(state.running.valuesIterator.filter(_.worker_uuid == worker_uuid)) - val limit = if (progress.stopped) Int.MaxValue else build_context.max_jobs - running.length + val limit = { + if (progress.stopped) { if (build_context.master) Int.MaxValue else 0 } + else build_context.max_jobs - state.build_running.length + } if (limit > 0) { state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name)) .sortBy(_.name)(state.sessions.ordering) @@ -1013,10 +1018,13 @@ make_result(result_name, Process_Result.error, output_shasum) } else if (cancelled) { - progress.echo(session_name + " CANCELLED") - state - .remove_pending(session_name) - .make_result(result_name, Process_Result.undefined, output_shasum) + if (build_context.master) { + progress.echo(session_name + " CANCELLED") + state + .remove_pending(session_name) + .make_result(result_name, Process_Result.undefined, output_shasum) + } + else state } else { def used_nodes: Set[Int] = @@ -1085,7 +1093,10 @@ synchronized_database("Build_Process.init") { _state = init_state(_state) } } - def finished(): Boolean = synchronized_database("Build_Process.test") { _state.finished } + def finished(): Boolean = synchronized_database("Build_Process.test") { + if (!build_context.master && progress.stopped) _state.build_running.isEmpty + else _state.pending.isEmpty + } def sleep(): Unit = Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() } @@ -1128,12 +1139,17 @@ if (progress.stopped) _state.stop_running() for (job <- _state.finished_running()) { - val result_name = (job.name, worker_uuid, build_uuid) - val (process_result, output_shasum) = job.build.get.join - _state = _state. - remove_pending(job.name). - remove_running(job.name). - make_result(result_name, process_result, output_shasum, node_info = job.node_info) + job.join_build match { + case None => + _state = _state.remove_running(job.name) + case Some((process_result, output_shasum)) => + val result_name = (job.name, worker_uuid, build_uuid) + _state = _state. + remove_pending(job.name). + remove_running(job.name). + make_result(result_name, process_result, output_shasum, + node_info = job.node_info) + } } }