# HG changeset patch # User paulson # Date 1677583201 0 # Node ID 8fe30123aaab4c47b9ea767e4728431eee5b9d9c # Parent 71f1abff827176b0c5616bc086455a0010c45c51# Parent 02af8a1b97f6cbd660d2a81188681e59c083b716 merged diff -r 02af8a1b97f6 -r 8fe30123aaab etc/options --- a/etc/options Tue Feb 28 11:19:47 2023 +0000 +++ b/etc/options Tue Feb 28 11:20:01 2023 +0000 @@ -183,7 +183,7 @@ option build_engine : string = "" -- "alternative session build engine" -option build_database : bool = false +option build_database_test : bool = false -- "expose state of build process via central database" diff -r 02af8a1b97f6 -r 8fe30123aaab src/Pure/Admin/build_log.scala --- a/src/Pure/Admin/build_log.scala Tue Feb 28 11:19:47 2023 +0000 +++ b/src/Pure/Admin/build_log.scala Tue Feb 28 11:20:01 2023 +0000 @@ -1058,23 +1058,17 @@ db: SQL.Database, log_name: String, session_names: List[String] = Nil, - ml_statistics: Boolean = false): Build_Info = { + ml_statistics: Boolean = false + ): Build_Info = { val table1 = Data.sessions_table val table2 = Data.ml_statistics_table - val where = - SQL.where( - SQL.and( - Data.log_name(table1).where_equal(log_name), - Data.session_name(table1).ident + " <> ''", - if_proper(session_names, Data.session_name(table1).member(session_names)))) - val columns1 = table1.columns.tail.map(_.apply(table1)) val (columns, from) = if (ml_statistics) { val columns = columns1 ::: List(Data.ml_statistics(table2)) val join = - table1.toString + SQL.join_outer + table2 + " ON " + + table1.ident + SQL.join_outer + table2.ident + " ON " + SQL.and( Data.log_name(table1).ident + " = " + Data.log_name(table2).ident, Data.session_name(table1).ident + " = " + Data.session_name(table2).ident) @@ -1082,8 +1076,15 @@ } else (columns1, table1.ident) + val where = + SQL.where( + SQL.and( + Data.log_name(table1).equal(log_name), + Data.session_name(table1).ident + " <> ''", + if_proper(session_names, Data.session_name(table1).member(session_names)))) + val sessions = - db.using_statement(SQL.select(columns) + from + where) { stmt => + db.using_statement(SQL.select(columns, sql = from + where)) { stmt => stmt.execute_query().iterator({ res => val session_name = res.string(Data.session_name) val session_entry = diff -r 02af8a1b97f6 -r 8fe30123aaab src/Pure/General/sql.scala --- a/src/Pure/General/sql.scala Tue Feb 28 11:19:47 2023 +0000 +++ b/src/Pure/General/sql.scala Tue Feb 28 11:20:01 2023 +0000 @@ -50,9 +50,9 @@ def separate(sql: Source): Source = (if (sql.isEmpty || sql.startsWith(" ")) "" else " ") + sql - def select(columns: List[Column] = Nil, distinct: Boolean = false): Source = + def select(columns: List[Column] = Nil, distinct: Boolean = false, sql: Source = ""): Source = "SELECT " + (if (distinct) "DISTINCT " else "") + - (if (columns.isEmpty) "*" else commas(columns.map(_.ident))) + " FROM " + (if (columns.isEmpty) "*" else commas(columns.map(_.ident))) + " FROM " + sql val join_outer: Source = " LEFT OUTER JOIN " val join_inner: Source = " INNER JOIN " @@ -203,7 +203,7 @@ select_columns: List[Column] = Nil, distinct: Boolean = false, sql: Source = "" - ): Source = SQL.select(select_columns, distinct = distinct) + ident + SQL.separate(sql) + ): Source = SQL.select(select_columns, distinct = distinct, sql = ident + SQL.separate(sql)) override def toString: Source = ident } diff -r 02af8a1b97f6 -r 8fe30123aaab src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Tue Feb 28 11:19:47 2023 +0000 +++ b/src/Pure/Tools/build_job.scala Tue Feb 28 11:20:01 2023 +0000 @@ -17,7 +17,7 @@ def start(): Unit = () def terminate(): Unit = () def is_finished: Boolean = false - def join: Process_Result = Process_Result.undefined + def finish: (Process_Result, SHA1.Shasum) = (Process_Result.undefined, SHA1.no_shasum) def make_abstract: Build_Job.Abstract = Build_Job.Abstract(job_name, node_info) } @@ -36,13 +36,19 @@ override def make_abstract: Abstract = this } - class Build_Session(progress: Progress, + + /* build session */ + + class Build_Session( + progress: Progress, + verbose: Boolean, session_background: Sessions.Background, store: Sessions.Store, - val do_store: Boolean, + do_store: Boolean, resources: Resources, session_setup: (String, Session) => Unit, - val input_heaps: SHA1.Shasum, + sources_shasum: SHA1.Shasum, + input_heaps: SHA1.Shasum, override val node_info: Node_Info ) extends Build_Job { def session_name: String = session_background.session_name @@ -56,6 +62,8 @@ private lazy val future_result: Future[Process_Result] = Future.thread("build", uninterruptible = true) { + Exn.Interrupt.expose() + val parent = info.parent.getOrElse("") val env = @@ -263,10 +271,12 @@ val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) val process = - Isabelle_Process.start(store, options, session, session_background, - logic = parent, raw_ml_system = is_pure, - use_prelude = use_prelude, eval_main = eval_main, - cwd = info.dir.file, env = env) + Isabelle_Thread.uninterruptible { + Isabelle_Process.start(store, options, session, session_background, + logic = parent, raw_ml_system = is_pure, + use_prelude = use_prelude, eval_main = eval_main, + cwd = info.dir.file, env = env) + } val build_errors = Isabelle_Thread.interrupt_handler(_ => process.terminate()) { @@ -371,30 +381,83 @@ else Some(Event_Timer.request(Time.now() + info.timeout) { terminate() }) } - override def join: Process_Result = { - val result = future_result.join + override lazy val finish: (Process_Result, SHA1.Shasum) = { + val process_result = { + val result = future_result.join + + val was_timeout = + timeout_request match { + case None => false + case Some(request) => !request.cancel() + } + + if (result.ok) result + else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc + else if (result.interrupted) result.error(Output.error_message_text("Interrupt")) + else result + } - val was_timeout = - timeout_request match { - case None => false - case Some(request) => !request.cancel() + val output_heap = + if (process_result.ok && do_store && store.output_heap(session_name).is_file) { + SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name) } + else SHA1.no_shasum + + val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) + + val build_log = + Build_Log.Log_File(session_name, process_result.out_lines). + parse_session_info( + command_timings = true, + theory_timings = true, + ml_statistics = true, + task_statistics = true) - if (result.ok) result - else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc - else if (result.interrupted) result.error(Output.error_message_text("Interrupt")) - else result - } + // write log file + if (process_result.ok) { + File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) + } + else File.write(store.output_log(session_name), terminate_lines(log_lines)) + + // write database + using(store.open_database(session_name, output = true))(db => + store.write_session_info(db, session_name, session_sources, + build_log = + if (process_result.timeout) build_log.error("Timeout") else build_log, + build = + Sessions.Build_Info(sources_shasum, input_heaps, output_heap, + process_result.rc, UUID.random().toString))) + + // messages + process_result.err_lines.foreach(progress.echo) - lazy val finish: SHA1.Shasum = { - require(is_finished, "Build job not finished: " + quote(session_name)) - if (join.ok && do_store && store.output_heap(session_name).is_file) { - SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name) + if (process_result.ok) { + if (verbose) { + val props = build_log.session_timing + val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 + val timing = Markup.Timing_Properties.get(props) + progress.echo( + "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")") + } + progress.echo( + "Finished " + session_name + " (" + process_result.timing.message_resources + ")") } - else SHA1.no_shasum + else { + progress.echo( + session_name + " FAILED (see also \"isabelle log -H Error " + session_name + "\")") + if (!process_result.interrupted) { + val tail = info.options.int("process_output_tail") + val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0) + val prefix = if (log_lines.length == suffix.length) Nil else List("...") + progress.echo(Library.trim_line(cat_lines(prefix ::: suffix))) + } + } + + (process_result.copy(out_lines = log_lines), output_heap) } } + /* theory markup/messages from session database */ def read_theory( diff -r 02af8a1b97f6 -r 8fe30123aaab src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Tue Feb 28 11:19:47 2023 +0000 +++ b/src/Pure/Tools/build_process.scala Tue Feb 28 11:20:01 2023 +0000 @@ -201,12 +201,8 @@ def stop_running(): Unit = running.valuesIterator.foreach(_.terminate()) - def finished_running(): List[Build_Job.Build_Session] = - List.from( - running.valuesIterator.flatMap { - case job: Build_Job.Build_Session if job.is_finished => Some(job) - case _ => None - }) + def finished_running(): List[Build_Job] = + List.from(running.valuesIterator.filter(_.is_finished)) def add_running(name: String, job: Build_Job): State = copy(running = running + (name -> job)) @@ -485,20 +481,10 @@ state.copy(serial = serial) } } +} - /* main process */ - - def session_finished(session_name: String, process_result: Process_Result): String = - "Finished " + session_name + " (" + process_result.timing.message_resources + ")" - - def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = { - val props = build_log.session_timing - val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 - val timing = Markup.Timing_Properties.get(props) - "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")" - } -} +/* main process */ class Build_Process(protected val build_context: Build_Process.Context) extends AutoCloseable { protected val store: Sessions.Store = build_context.store @@ -515,7 +501,7 @@ } protected val database: Option[SQL.Database] = - if (!build_options.bool("build_database") || true /*FIXME*/) None + if (!build_options.bool("build_database_test")) None else if (store.database_server) Some(store.open_database_server()) else { val db = SQLite.open_database(Build_Process.Data.database) @@ -534,8 +520,6 @@ (for ((name, (_, (preds, _))) <- build_context.sessions_structure.build_graph.iterator) yield Build_Process.Entry(name, preds.toList)).toList) - protected def finished(): Boolean = synchronized { _state.finished } - protected def next_pending(): Option[String] = synchronized { if (_state.running.size < (build_context.max_jobs max 1)) { _state.pending.filter(entry => entry.is_ready && !_state.is_running(entry.name)) @@ -545,69 +529,6 @@ else None } - protected def stop_running(): Unit = synchronized { _state.stop_running() } - - protected def finished_running(): List[Build_Job.Build_Session] = synchronized { - _state.finished_running() - } - - protected def finish_job(job: Build_Job.Build_Session): Unit = { - val session_name = job.session_name - val process_result = job.join - val output_heap = job.finish - - val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) - val process_result_tail = { - val tail = job.info.options.int("process_output_tail") - process_result.copy( - out_lines = - "(more details via \"isabelle log -H Error " + session_name + "\")" :: - (if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0))) - } - - val build_log = - Build_Log.Log_File(session_name, process_result.out_lines). - parse_session_info( - command_timings = true, - theory_timings = true, - ml_statistics = true, - task_statistics = true) - - // write log file - if (process_result.ok) { - File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) - } - else File.write(store.output_log(session_name), terminate_lines(log_lines)) - - // write database - using(store.open_database(session_name, output = true))(db => - store.write_session_info(db, session_name, job.session_sources, - build_log = - if (process_result.timeout) build_log.error("Timeout") else build_log, - build = - Sessions.Build_Info(build_deps.sources_shasum(session_name), job.input_heaps, - output_heap, process_result.rc, UUID.random().toString))) - - // messages - process_result.err_lines.foreach(progress.echo) - - if (process_result.ok) { - if (verbose) progress.echo(Build_Process.session_timing(session_name, build_log)) - progress.echo(Build_Process.session_finished(session_name, process_result)) - } - else { - progress.echo(session_name + " FAILED") - if (!process_result.interrupted) progress.echo(process_result_tail.out) - } - - synchronized { - _state = _state. - remove_pending(session_name). - remove_running(session_name). - make_result(session_name, false, output_heap, process_result_tail, node_info = job.node_info) - } - } - protected def start_job(session_name: String): Unit = { val ancestor_results = synchronized { _state.get_results( @@ -674,8 +595,9 @@ val (numa_node, state1) = _state.numa_next(build_context.numa_nodes) val node_info = Build_Job.Node_Info(build_context.hostname, numa_node) val job = - new Build_Job.Build_Session(progress, session_background, store, do_store, - resources, build_context.session_setup, input_heaps, node_info) + new Build_Job.Build_Session(progress, verbose, session_background, store, do_store, + resources, build_context.session_setup, build_deps.sources_shasum(session_name), + input_heaps, node_info) _state = state1.add_running(session_name, job) job } @@ -715,6 +637,8 @@ } def run(): Map[String, Process_Result] = { + def finished(): Boolean = synchronized { _state.finished } + if (finished()) { progress.echo_warning("Nothing to build") Map.empty[String, Process_Result] @@ -722,9 +646,18 @@ else { setup_database() while (!finished()) { - if (progress.stopped) stop_running() + if (progress.stopped) synchronized { _state.stop_running() } - for (job <- finished_running()) finish_job(job) + for (job <- synchronized { _state.finished_running() }) { + val job_name = job.job_name + val (process_result, output_heap) = job.finish + synchronized { + _state = _state. + remove_pending(job_name). + remove_running(job_name). + make_result(job_name, false, output_heap, process_result, node_info = job.node_info) + } + } next_pending() match { case Some(name) =>