revert most parts of 0e79fa88cab6: somewhat ambitious attempt to move towards "editing" builds via added/canceled workers;
retain notion of next_jobs.limit and finished() from 0e79fa88cab6;
clarified Job vs. optional Build_Job;
--- a/src/Pure/Build/build_benchmark.scala Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/Build/build_benchmark.scala Wed Mar 13 23:26:30 2024 +0100
@@ -82,15 +82,13 @@
val local_build_context = build_context.copy(store = Store(local_options))
- val build =
+ val result =
Build_Job.start_session(local_build_context, session, progress, new Logger, server,
- background, session.sources_shasum, input_shasum, node_info, false)
+ background, session.sources_shasum, input_shasum, node_info, false).join
val timing =
- build.join match {
- case Some(result) if result.process_result.ok => result.process_result.timing
- case _ => error("Failed to build benchmark session")
- }
+ if (result.process_result.ok) result.process_result.timing
+ else error("Failed to build benchmark session")
val score = Time.seconds(1000).ms.toDouble / (1 + timing.elapsed.ms)
progress.echo(
--- a/src/Pure/Build/build_job.scala Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/Build/build_job.scala Wed Mar 13 23:26:30 2024 +0100
@@ -13,11 +13,12 @@
trait Build_Job {
def cancel(): Unit = ()
def is_finished: Boolean = false
- def join: Option[Build_Job.Result] = None
+ def join: Build_Job.Result = Build_Job.no_result
}
object Build_Job {
sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum)
+ val no_result: Result = Result(Process_Result.undefined, SHA1.no_shasum)
/* build session */
@@ -114,7 +115,7 @@
) extends Build_Job {
def session_name: String = session_background.session_name
- private val future_result: Future[Option[Result]] =
+ private val future_result: Future[Result] =
Future.thread("build", uninterruptible = true) {
val info = session_background.sessions_structure(session_name)
val options = Host.node_options(info.options, node_info)
@@ -508,15 +509,10 @@
process_result.rc,
build_context.build_uuid))
- val valid =
- if (progress.stopped_local) false
- else {
- database_server match {
- case Some(db) => write_info(db)
- case None => using(store.open_database(session_name, output = true))(write_info)
- }
- true
- }
+ database_server match {
+ case Some(db) => write_info(db)
+ case None => using(store.open_database(session_name, output = true))(write_info)
+ }
using_optional(store.maybe_open_heaps_database(database_server, server = server)) {
heaps_database =>
@@ -554,13 +550,12 @@
}
}
- if (valid) Some(Result(process_result.copy(out_lines = log_lines), output_shasum))
- else None
+ Result(process_result.copy(out_lines = log_lines), output_shasum)
}
}
override def cancel(): Unit = future_result.cancel()
override def is_finished: Boolean = future_result.is_finished
- override def join: Option[Result] = future_result.join
+ override def join: Result = future_result.join
}
}
--- a/src/Pure/Build/build_process.scala Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/Build/build_process.scala Wed Mar 13 23:26:30 2024 +0100
@@ -65,11 +65,7 @@
node_info: Host.Node_Info,
start_date: Date,
build: Option[Build_Job]
- ) extends Library.Named {
- def cancel(): Unit = build.foreach(_.cancel())
- def is_finished: Boolean = build.isDefined && build.get.is_finished
- def join_build: Option[Build_Job.Result] = build.flatMap(_.join)
- }
+ ) extends Library.Named
sealed case class Result(
name: String,
@@ -252,13 +248,14 @@
def is_running(name: String): Boolean = running.isDefinedAt(name)
- def finished_running(): Boolean = running.valuesIterator.exists(_.is_finished)
+ def build_running: List[Build_Job] =
+ running.valuesIterator.flatMap(_.build).toList
+
+ def finished_running(): Boolean =
+ build_running.exists(_.is_finished)
def busy_running(jobs: Int): Boolean =
- jobs <= 0 || jobs <= running.valuesIterator.flatMap(_.build).length
-
- def build_running: List[Job] =
- List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job)
+ jobs <= 0 || jobs <= build_running.length
def add_running(job: Job): State =
copy(running = running + (job.name -> job))
@@ -1064,7 +1061,6 @@
val progress =
new Database_Progress(db, build_progress,
input_messages = build_context.master,
- output_stopped = build_context.master,
hostname = hostname,
context_uuid = build_uuid,
kind = "build_process",
@@ -1200,13 +1196,10 @@
make_result(result_name, Process_Result.error, output_shasum)
}
else if (cancelled) {
- if (build_context.master) {
- progress.echo(session_name + " CANCELLED")
- state
- .remove_pending(session_name)
- .make_result(result_name, Process_Result.undefined, output_shasum)
- }
- else state
+ progress.echo(session_name + " CANCELLED")
+ state
+ .remove_pending(session_name)
+ .make_result(result_name, Process_Result.undefined, output_shasum)
}
else {
val build_log_verbose = build_options.bool("build_log_verbose")
@@ -1318,17 +1311,16 @@
}
protected def main_unsynchronized(): Unit = {
- for (job <- _state.build_running.filter(_.is_finished)) {
- _state = _state.remove_running(job.name)
- for (result <- job.join_build) {
- val result_name = (job.name, worker_uuid, build_uuid)
- _state = _state.
- remove_pending(job.name).
- make_result(result_name,
- result.process_result,
- result.output_shasum,
- node_info = job.node_info)
- }
+ for (job <- _state.running.valuesIterator; build <- job.build if build.is_finished) {
+ val result = build.join
+ val result_name = (job.name, worker_uuid, build_uuid)
+ _state = _state.
+ remove_pending(job.name).
+ remove_running(job.name).
+ make_result(result_name,
+ result.process_result,
+ result.output_shasum,
+ node_info = job.node_info)
}
for (name <- next_jobs(_state)) {
--- a/src/Pure/Build/database_progress.scala Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/Build/database_progress.scala Wed Mar 13 23:26:30 2024 +0100
@@ -151,7 +151,6 @@
db: SQL.Database,
base_progress: Progress,
input_messages: Boolean = false,
- output_stopped: Boolean = false,
kind: String = "progress",
hostname: String = Isabelle_System.hostname(),
context_uuid: String = UUID.random_string(),
@@ -171,7 +170,6 @@
private var _agent_uuid: String = ""
private var _context: Long = -1
private var _serial: Long = 0
- private var _stopped_db: Boolean = false
private var _consumer: Consumer_Thread[Progress.Output] = null
def agent_uuid: String = synchronized { _agent_uuid }
@@ -218,7 +216,7 @@
val expired = synchronized { _tick += 1; _tick % tick_expire == 0 }
val received = db.receive(n => n.channel == Database_Progress.private_data.channel)
val ok =
- bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped ||
+ bulk_output.nonEmpty || expired || base_progress.stopped ||
received.isEmpty ||
received.get.contains(Database_Progress.private_data.channel_ping) ||
input_messages && received.get.contains(Database_Progress.private_data.channel_output)
@@ -280,10 +278,10 @@
private def sync_database[A](body: => A): A = synchronized {
Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") {
- _stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
+ val stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context)
- if (_stopped_db && !base_progress.stopped) base_progress.stop()
- if (!_stopped_db && base_progress.stopped && output_stopped) {
+ if (stopped_db && !base_progress.stopped) base_progress.stop()
+ if (!stopped_db && base_progress.stopped) {
Database_Progress.private_data.write_progress_stopped(db, _context, true)
db.send(Database_Progress.private_data.channel_ping)
}
@@ -320,7 +318,6 @@
override def stop(): Unit = sync_context { base_progress.stop(); sync() }
override def stopped: Boolean = sync_context { base_progress.stopped }
- override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db }
override def toString: String = super.toString + ": database " + db
--- a/src/Pure/System/progress.scala Wed Mar 13 17:36:35 2024 +0100
+++ b/src/Pure/System/progress.scala Wed Mar 13 23:26:30 2024 +0100
@@ -88,7 +88,6 @@
if (Thread.interrupted()) is_stopped = true
is_stopped
}
- def stopped_local: Boolean = false
final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e }
final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt()