--- a/src/Pure/System/progress.scala Sun Aug 13 19:27:58 2023 +0200
+++ b/src/Pure/System/progress.scala Wed Aug 16 14:42:43 2023 +0200
@@ -217,6 +217,7 @@
if (Thread.interrupted()) is_stopped = true
is_stopped
}
+ def stopped_local: Boolean = false
final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e }
final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt()
@@ -264,6 +265,7 @@
class Database_Progress(
val db: SQL.Database,
val base_progress: Progress,
+ val output_stopped: Boolean = false,
val kind: String = "progress",
val hostname: String = Isabelle_System.hostname(),
val context_uuid: String = UUID.random().toString)
@@ -273,6 +275,7 @@
private var _agent_uuid: String = ""
private var _context: Long = -1
private var _serial: Long = 0
+ private var _stopped_db: Boolean = false
def agent_uuid: String = synchronized { _agent_uuid }
@@ -327,11 +330,12 @@
if (_context == 0) throw new IllegalStateException("Database_Progress after exit")
Progress.private_data.transaction_lock(db, label = "Database_Progress.sync") {
- val stopped_db = Progress.private_data.read_progress_stopped(db, _context)
- val stopped = base_progress.stopped
+ _stopped_db = Progress.private_data.read_progress_stopped(db, _context)
- if (stopped_db && !stopped) base_progress.stop()
- if (stopped && !stopped_db) Progress.private_data.write_progress_stopped(db, _context, true)
+ if (_stopped_db && !base_progress.stopped) base_progress.stop()
+ if (!_stopped_db && base_progress.stopped && output_stopped) {
+ Progress.private_data.write_progress_stopped(db, _context, true)
+ }
val messages = Progress.private_data.read_messages(db, _context, seen = _serial)
for ((message_serial, message) <- messages) {
@@ -371,6 +375,7 @@
override def stop(): Unit = synchronized { base_progress.stop(); sync() }
override def stopped: Boolean = sync_database { base_progress.stopped }
+ override def stopped_local: Boolean = sync_database { base_progress.stopped && !_stopped_db }
override def toString: String = super.toString + ": database " + db
--- a/src/Pure/Tools/build_job.scala Sun Aug 13 19:27:58 2023 +0200
+++ b/src/Pure/Tools/build_job.scala Wed Aug 16 14:42:43 2023 +0200
@@ -13,7 +13,7 @@
trait Build_Job {
def cancel(): Unit = ()
def is_finished: Boolean = false
- def join: (Process_Result, SHA1.Shasum) = (Process_Result.undefined, SHA1.no_shasum)
+ def join: Option[(Process_Result, SHA1.Shasum)] = None
}
object Build_Job {
@@ -111,7 +111,7 @@
) extends Build_Job {
def session_name: String = session_background.session_name
- private val future_result: Future[(Process_Result, SHA1.Shasum)] =
+ private val future_result: Future[Option[(Process_Result, SHA1.Shasum)]] =
Future.thread("build", uninterruptible = true) {
val info = session_background.sessions_structure(session_name)
val options = build_context.engine.process_options(info.options, node_info)
@@ -502,10 +502,16 @@
output_heap = output_shasum,
process_result.rc,
build_context.build_uuid))
- database_server match {
- case Some(db) => write_info(db)
- case None => using(store.open_database(session_name, output = true))(write_info)
- }
+
+ val valid =
+ if (progress.stopped_local) false
+ else {
+ database_server match {
+ case Some(db) => write_info(db)
+ case None => using(store.open_database(session_name, output = true))(write_info)
+ }
+ true
+ }
// messages
process_result.err_lines.foreach(progress.echo(_))
@@ -531,12 +537,12 @@
}
}
- (process_result.copy(out_lines = log_lines), output_shasum)
+ if (valid) Some((process_result.copy(out_lines = log_lines), output_shasum)) else None
}
}
override def cancel(): Unit = future_result.cancel()
override def is_finished: Boolean = future_result.is_finished
- override def join: (Process_Result, SHA1.Shasum) = future_result.join
+ override def join: Option[(Process_Result, SHA1.Shasum)] = future_result.join
}
}
--- a/src/Pure/Tools/build_process.scala Sun Aug 13 19:27:58 2023 +0200
+++ b/src/Pure/Tools/build_process.scala Wed Aug 16 14:42:43 2023 +0200
@@ -55,6 +55,7 @@
build: Option[Build_Job]
) extends Library.Named {
def no_build: Job = copy(build = None)
+ def join_build: Option[(Process_Result, SHA1.Shasum)] = build.flatMap(_.join)
}
sealed case class Result(
@@ -217,8 +218,6 @@
copy(serial = i)
}
- def finished: Boolean = pending.isEmpty
-
def remove_pending(name: String): State =
copy(pending = pending.flatMap(
entry => if (entry.name == name) None else Some(entry.resolve(name))))
@@ -228,10 +227,13 @@
def stop_running(): Unit =
for (job <- running.valuesIterator; build <- job.build) build.cancel()
+ def build_running: List[Build_Job] =
+ List.from(for (job <- running.valuesIterator; build <- job.build) yield build)
+
def finished_running(): List[Job] =
List.from(
for (job <- running.valuesIterator; build <- job.build if build.is_finished)
- yield job)
+ yield job)
def add_running(job: Job): State =
copy(running = running + (job.name -> job))
@@ -880,6 +882,7 @@
val progress_db = store.open_build_database(Progress.private_data.database, server = server)
val progress =
new Database_Progress(progress_db, build_progress,
+ output_stopped = build_context.master,
hostname = hostname,
context_uuid = build_uuid,
kind = "build_process")
@@ -957,8 +960,10 @@
}
protected def next_jobs(state: Build_Process.State): List[String] = {
- val running = List.from(state.running.valuesIterator.filter(_.worker_uuid == worker_uuid))
- val limit = if (progress.stopped) Int.MaxValue else build_context.max_jobs - running.length
+ val limit = {
+ if (progress.stopped) { if (build_context.master) Int.MaxValue else 0 }
+ else build_context.max_jobs - state.build_running.length
+ }
if (limit > 0) {
state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name))
.sortBy(_.name)(state.sessions.ordering)
@@ -1013,10 +1018,13 @@
make_result(result_name, Process_Result.error, output_shasum)
}
else if (cancelled) {
- progress.echo(session_name + " CANCELLED")
- state
- .remove_pending(session_name)
- .make_result(result_name, Process_Result.undefined, output_shasum)
+ if (build_context.master) {
+ progress.echo(session_name + " CANCELLED")
+ state
+ .remove_pending(session_name)
+ .make_result(result_name, Process_Result.undefined, output_shasum)
+ }
+ else state
}
else {
def used_nodes: Set[Int] =
@@ -1085,7 +1093,10 @@
synchronized_database("Build_Process.init") { _state = init_state(_state) }
}
- def finished(): Boolean = synchronized_database("Build_Process.test") { _state.finished }
+ def finished(): Boolean = synchronized_database("Build_Process.test") {
+ if (!build_context.master && progress.stopped) _state.build_running.isEmpty
+ else _state.pending.isEmpty
+ }
def sleep(): Unit =
Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_delay.sleep() }
@@ -1128,12 +1139,17 @@
if (progress.stopped) _state.stop_running()
for (job <- _state.finished_running()) {
- val result_name = (job.name, worker_uuid, build_uuid)
- val (process_result, output_shasum) = job.build.get.join
- _state = _state.
- remove_pending(job.name).
- remove_running(job.name).
- make_result(result_name, process_result, output_shasum, node_info = job.node_info)
+ job.join_build match {
+ case None =>
+ _state = _state.remove_running(job.name)
+ case Some((process_result, output_shasum)) =>
+ val result_name = (job.name, worker_uuid, build_uuid)
+ _state = _state.
+ remove_pending(job.name).
+ remove_running(job.name).
+ make_result(result_name, process_result, output_shasum,
+ node_info = job.node_info)
+ }
}
}