# HG changeset patch # User wenzelm # Date 1710368790 -3600 # Node ID 17220dc05991d5ee3aa0e42769c6aa64549a3ef9 # Parent 7ae25372ab043eaca7b9675273025d4b0a389e36 revert most parts of 0e79fa88cab6: somewhat ambitious attempt to move towards "editing" builds via added/canceled workers; retain notion of next_jobs.limit and finished() from 0e79fa88cab6; clarified Job vs. optional Build_Job; diff -r 7ae25372ab04 -r 17220dc05991 src/Pure/Build/build_benchmark.scala --- a/src/Pure/Build/build_benchmark.scala Wed Mar 13 17:36:35 2024 +0100 +++ b/src/Pure/Build/build_benchmark.scala Wed Mar 13 23:26:30 2024 +0100 @@ -82,15 +82,13 @@ val local_build_context = build_context.copy(store = Store(local_options)) - val build = + val result = Build_Job.start_session(local_build_context, session, progress, new Logger, server, - background, session.sources_shasum, input_shasum, node_info, false) + background, session.sources_shasum, input_shasum, node_info, false).join val timing = - build.join match { - case Some(result) if result.process_result.ok => result.process_result.timing - case _ => error("Failed to build benchmark session") - } + if (result.process_result.ok) result.process_result.timing + else error("Failed to build benchmark session") val score = Time.seconds(1000).ms.toDouble / (1 + timing.elapsed.ms) progress.echo( diff -r 7ae25372ab04 -r 17220dc05991 src/Pure/Build/build_job.scala --- a/src/Pure/Build/build_job.scala Wed Mar 13 17:36:35 2024 +0100 +++ b/src/Pure/Build/build_job.scala Wed Mar 13 23:26:30 2024 +0100 @@ -13,11 +13,12 @@ trait Build_Job { def cancel(): Unit = () def is_finished: Boolean = false - def join: Option[Build_Job.Result] = None + def join: Build_Job.Result = Build_Job.no_result } object Build_Job { sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum) + val no_result: Result = Result(Process_Result.undefined, SHA1.no_shasum) /* build session */ @@ -114,7 +115,7 @@ ) extends Build_Job { def session_name: String = session_background.session_name - private val future_result: Future[Option[Result]] = + private val future_result: Future[Result] = Future.thread("build", uninterruptible = true) { val info = session_background.sessions_structure(session_name) val options = Host.node_options(info.options, node_info) @@ -508,15 +509,10 @@ process_result.rc, build_context.build_uuid)) - val valid = - if (progress.stopped_local) false - else { - database_server match { - case Some(db) => write_info(db) - case None => using(store.open_database(session_name, output = true))(write_info) - } - true - } + database_server match { + case Some(db) => write_info(db) + case None => using(store.open_database(session_name, output = true))(write_info) + } using_optional(store.maybe_open_heaps_database(database_server, server = server)) { heaps_database => @@ -554,13 +550,12 @@ } } - if (valid) Some(Result(process_result.copy(out_lines = log_lines), output_shasum)) - else None + Result(process_result.copy(out_lines = log_lines), output_shasum) } } override def cancel(): Unit = future_result.cancel() override def is_finished: Boolean = future_result.is_finished - override def join: Option[Result] = future_result.join + override def join: Result = future_result.join } } diff -r 7ae25372ab04 -r 17220dc05991 src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala Wed Mar 13 17:36:35 2024 +0100 +++ b/src/Pure/Build/build_process.scala Wed Mar 13 23:26:30 2024 +0100 @@ -65,11 +65,7 @@ node_info: Host.Node_Info, start_date: Date, build: Option[Build_Job] - ) extends Library.Named { - def cancel(): Unit = build.foreach(_.cancel()) - def is_finished: Boolean = build.isDefined && build.get.is_finished - def join_build: Option[Build_Job.Result] = build.flatMap(_.join) - } + ) extends Library.Named sealed case class Result( name: String, @@ -252,13 +248,14 @@ def is_running(name: String): Boolean = running.isDefinedAt(name) - def finished_running(): Boolean = running.valuesIterator.exists(_.is_finished) + def build_running: List[Build_Job] = + running.valuesIterator.flatMap(_.build).toList + + def finished_running(): Boolean = + build_running.exists(_.is_finished) def busy_running(jobs: Int): Boolean = - jobs <= 0 || jobs <= running.valuesIterator.flatMap(_.build).length - - def build_running: List[Job] = - List.from(for (job <- running.valuesIterator if job.build.isDefined) yield job) + jobs <= 0 || jobs <= build_running.length def add_running(job: Job): State = copy(running = running + (job.name -> job)) @@ -1064,7 +1061,6 @@ val progress = new Database_Progress(db, build_progress, input_messages = build_context.master, - output_stopped = build_context.master, hostname = hostname, context_uuid = build_uuid, kind = "build_process", @@ -1200,13 +1196,10 @@ make_result(result_name, Process_Result.error, output_shasum) } else if (cancelled) { - if (build_context.master) { - progress.echo(session_name + " CANCELLED") - state - .remove_pending(session_name) - .make_result(result_name, Process_Result.undefined, output_shasum) - } - else state + progress.echo(session_name + " CANCELLED") + state + .remove_pending(session_name) + .make_result(result_name, Process_Result.undefined, output_shasum) } else { val build_log_verbose = build_options.bool("build_log_verbose") @@ -1318,17 +1311,16 @@ } protected def main_unsynchronized(): Unit = { - for (job <- _state.build_running.filter(_.is_finished)) { - _state = _state.remove_running(job.name) - for (result <- job.join_build) { - val result_name = (job.name, worker_uuid, build_uuid) - _state = _state. - remove_pending(job.name). - make_result(result_name, - result.process_result, - result.output_shasum, - node_info = job.node_info) - } + for (job <- _state.running.valuesIterator; build <- job.build if build.is_finished) { + val result = build.join + val result_name = (job.name, worker_uuid, build_uuid) + _state = _state. + remove_pending(job.name). + remove_running(job.name). + make_result(result_name, + result.process_result, + result.output_shasum, + node_info = job.node_info) } for (name <- next_jobs(_state)) { diff -r 7ae25372ab04 -r 17220dc05991 src/Pure/Build/database_progress.scala --- a/src/Pure/Build/database_progress.scala Wed Mar 13 17:36:35 2024 +0100 +++ b/src/Pure/Build/database_progress.scala Wed Mar 13 23:26:30 2024 +0100 @@ -151,7 +151,6 @@ db: SQL.Database, base_progress: Progress, input_messages: Boolean = false, - output_stopped: Boolean = false, kind: String = "progress", hostname: String = Isabelle_System.hostname(), context_uuid: String = UUID.random_string(), @@ -171,7 +170,6 @@ private var _agent_uuid: String = "" private var _context: Long = -1 private var _serial: Long = 0 - private var _stopped_db: Boolean = false private var _consumer: Consumer_Thread[Progress.Output] = null def agent_uuid: String = synchronized { _agent_uuid } @@ -218,7 +216,7 @@ val expired = synchronized { _tick += 1; _tick % tick_expire == 0 } val received = db.receive(n => n.channel == Database_Progress.private_data.channel) val ok = - bulk_output.nonEmpty || expired || base_progress.stopped && output_stopped || + bulk_output.nonEmpty || expired || base_progress.stopped || received.isEmpty || received.get.contains(Database_Progress.private_data.channel_ping) || input_messages && received.get.contains(Database_Progress.private_data.channel_output) @@ -280,10 +278,10 @@ private def sync_database[A](body: => A): A = synchronized { Database_Progress.private_data.transaction_lock(db, label = "Database_Progress.sync_database") { - _stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context) + val stopped_db = Database_Progress.private_data.read_progress_stopped(db, _context) - if (_stopped_db && !base_progress.stopped) base_progress.stop() - if (!_stopped_db && base_progress.stopped && output_stopped) { + if (stopped_db && !base_progress.stopped) base_progress.stop() + if (!stopped_db && base_progress.stopped) { Database_Progress.private_data.write_progress_stopped(db, _context, true) db.send(Database_Progress.private_data.channel_ping) } @@ -320,7 +318,6 @@ override def stop(): Unit = sync_context { base_progress.stop(); sync() } override def stopped: Boolean = sync_context { base_progress.stopped } - override def stopped_local: Boolean = sync_context { base_progress.stopped && !_stopped_db } override def toString: String = super.toString + ": database " + db diff -r 7ae25372ab04 -r 17220dc05991 src/Pure/System/progress.scala --- a/src/Pure/System/progress.scala Wed Mar 13 17:36:35 2024 +0100 +++ b/src/Pure/System/progress.scala Wed Mar 13 23:26:30 2024 +0100 @@ -88,7 +88,6 @@ if (Thread.interrupted()) is_stopped = true is_stopped } - def stopped_local: Boolean = false final def interrupt_handler[A](e: => A): A = POSIX_Interrupt.handler { stop() } { e } final def expose_interrupt(): Unit = if (stopped) throw Exn.Interrupt()