diff -r bce98b5dfec6 -r c7a98469c0e7 src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Sat Jan 20 13:52:36 2024 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,552 +0,0 @@ -/* Title: Pure/Tools/build_job.scala - Author: Makarius - -Build job running prover process, with rudimentary PIDE session. -*/ - -package isabelle - - -import scala.collection.mutable - - -trait Build_Job { - def cancel(): Unit = () - def is_finished: Boolean = false - def join: Option[Build_Job.Result] = None -} - -object Build_Job { - sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum) - - - /* build session */ - - def start_session( - build_context: Build.Context, - session_context: Session_Context, - progress: Progress, - log: Logger, - server: SSH.Server, - session_background: Sessions.Background, - sources_shasum: SHA1.Shasum, - input_shasum: SHA1.Shasum, - node_info: Host.Node_Info, - store_heap: Boolean - ): Session_Job = { - new Session_Job(build_context, session_context, progress, log, server, - session_background, sources_shasum, input_shasum, node_info, store_heap) - } - - object Session_Context { - def load( - database_server: Option[SQL.Database], - build_uuid: String, - name: String, - deps: List[String], - ancestors: List[String], - session_prefs: String, - sources_shasum: SHA1.Shasum, - timeout: Time, - store: Store, - progress: Progress = new Progress - ): Session_Context = { - def default: Session_Context = - Session_Context( - name, deps, ancestors, session_prefs, sources_shasum, timeout, - Time.zero, Bytes.empty, build_uuid) - - def read(db: SQL.Database): Session_Context = { - def ignore_error(msg: String) = { - progress.echo_warning( - "Ignoring bad database " + db + " for session " + quote(name) + - if_proper(msg, ":\n" + msg)) - default - } - try { - val command_timings = store.read_command_timings(db, name) - val elapsed = - store.read_session_timing(db, name) match { - case Markup.Elapsed(s) => Time.seconds(s) - case _ => Time.zero - } - new Session_Context( - name, deps, ancestors, session_prefs, sources_shasum, timeout, - elapsed, command_timings, build_uuid) - } - catch { - case ERROR(msg) => ignore_error(msg) - case exn: java.lang.Error => ignore_error(Exn.message(exn)) - case _: XML.Error => ignore_error("XML.Error") - } - } - - database_server match { - case Some(db) => if (store.session_info_exists(db)) read(db) else default - case None => using_option(store.try_open_database(name))(read) getOrElse default - } - } - } - - sealed case class Session_Context( - name: String, - deps: List[String], - ancestors: List[String], - session_prefs: String, - sources_shasum: SHA1.Shasum, - timeout: Time, - old_time: Time, - old_command_timings_blob: Bytes, - build_uuid: String - ) extends Library.Named - - class Session_Job private[Build_Job]( - build_context: Build.Context, - session_context: Session_Context, - progress: Progress, - log: Logger, - server: SSH.Server, - session_background: Sessions.Background, - sources_shasum: SHA1.Shasum, - input_shasum: SHA1.Shasum, - node_info: Host.Node_Info, - store_heap: Boolean - ) extends Build_Job { - def session_name: String = session_background.session_name - - private val future_result: Future[Option[Result]] = - Future.thread("build", uninterruptible = true) { - val info = session_background.sessions_structure(session_name) - val options = Host.node_options(info.options, node_info) - - val store = build_context.store - - using_optional(store.maybe_open_database_server(server = server)) { database_server => - - store.clean_output(database_server, session_name, session_init = true) - - val session_sources = - Store.Sources.load(session_background.base, cache = store.cache.compress) - - val env = - Isabelle_System.settings( - List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString)) - - val session_heaps = - session_background.info.parent match { - case None => Nil - case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic) - } - - val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil - - val eval_store = - if (store_heap) { - (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) ::: - List("ML_Heap.save_child " + - ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name)))) - } - else Nil - - def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] = - session_background.base.theory_load_commands.get(node_name.theory) match { - case None => Nil - case Some(spans) => - val syntax = session_background.base.theory_syntax(node_name) - val master_dir = Path.explode(node_name.master_dir) - for (span <- spans; file <- span.loaded_files(syntax).files) - yield { - val src_path = Path.explode(file) - val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path)) - - val bytes = session_sources(blob_name.node).bytes - val text = bytes.text - val chunk = Symbol.Text_Chunk(text) - - Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) -> - Document.Blobs.Item(bytes, text, chunk, changed = false) - } - } - - - /* session */ - - val resources = - new Resources(session_background, log = log, - command_timings = - Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache)) - - val session = - new Session(options, resources) { - override val cache: Term.Cache = store.cache - - override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info = - Command.Blobs_Info.make(session_blobs(node_name)) - - override def build_blobs(node_name: Document.Node.Name): Document.Blobs = - Document.Blobs.make(session_blobs(node_name)) - } - - object Build_Session_Errors { - private val promise: Promise[List[String]] = Future.promise - - def result: Exn.Result[List[String]] = promise.join_result - def cancel(): Unit = promise.cancel() - def apply(errs: List[String]): Unit = { - try { promise.fulfill(errs) } - catch { case _: IllegalStateException => } - } - } - - val export_consumer = - Export.consumer(store.open_database(session_name, output = true, server = server), - store.cache, progress = progress) - - val stdout = new StringBuilder(1000) - val stderr = new StringBuilder(1000) - val command_timings = new mutable.ListBuffer[Properties.T] - val theory_timings = new mutable.ListBuffer[Properties.T] - val session_timings = new mutable.ListBuffer[Properties.T] - val runtime_statistics = new mutable.ListBuffer[Properties.T] - val task_statistics = new mutable.ListBuffer[Properties.T] - - def fun( - name: String, - acc: mutable.ListBuffer[Properties.T], - unapply: Properties.T => Option[Properties.T] - ): (String, Session.Protocol_Function) = { - name -> ((msg: Prover.Protocol_Output) => - unapply(msg.properties) match { - case Some(props) => acc += props; true - case _ => false - }) - } - - session.init_protocol_handler(new Session.Protocol_Handler { - override def exit(): Unit = Build_Session_Errors.cancel() - - private def build_session_finished(msg: Prover.Protocol_Output): Boolean = { - val (rc, errors) = - try { - val (rc, errs) = { - import XML.Decode._ - pair(int, list(x => x))(Symbol.decode_yxml(msg.text)) - } - val errors = - for (err <- errs) yield { - val prt = Protocol_Message.expose_no_reports(err) - Pretty.string_of(prt, metric = Symbol.Metric) - } - (rc, errors) - } - catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) } - - session.protocol_command("Prover.stop", rc.toString) - Build_Session_Errors(errors) - true - } - - private def loading_theory(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Markup.Loading_Theory(Markup.Name(name)) => - progress.theory(Progress.Theory(name, session = session_name)) - false - case _ => false - } - - private def export_(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Protocol.Export(args) => - export_consumer.make_entry(session_name, args, msg.chunk) - true - case _ => false - } - - override val functions: Session.Protocol_Functions = - List( - Markup.Build_Session_Finished.name -> build_session_finished, - Markup.Loading_Theory.name -> loading_theory, - Markup.EXPORT -> export_, - fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply), - fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply), - fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply)) - }) - - session.command_timings += Session.Consumer("command_timings") { - case Session.Command_Timing(props) => - for { - elapsed <- Markup.Elapsed.unapply(props) - elapsed_time = Time.seconds(elapsed) - if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold") - } command_timings += props.filter(Markup.command_timing_property) - } - - session.runtime_statistics += Session.Consumer("ML_statistics") { - case Session.Runtime_Statistics(props) => runtime_statistics += props - } - - session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") { - case snapshot => - if (!progress.stopped) { - def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = { - if (!progress.stopped) { - val theory_name = snapshot.node_name.theory - val args = - Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress) - val body = Bytes(Symbol.encode(YXML.string_of_body(xml))) - export_consumer.make_entry(session_name, args, body) - } - } - def export_text(name: String, text: String, compress: Boolean = true): Unit = - export_(name, List(XML.Text(text)), compress = compress) - - for (command <- snapshot.snippet_command) { - export_text(Export.DOCUMENT_ID, command.id.toString, compress = false) - } - - export_text(Export.FILES, - cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))), - compress = false) - - for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) { - val xml = snapshot.switch(blob_name).xml_markup() - export_(Export.MARKUP + (i + 1), xml) - } - export_(Export.MARKUP, snapshot.xml_markup()) - export_(Export.MESSAGES, snapshot.messages.map(_._1)) - } - } - - session.all_messages += Session.Consumer[Any]("build_session_output") { - case msg: Prover.Output => - val message = msg.message - if (msg.is_system) resources.log(Protocol.message_text(message)) - - if (msg.is_stdout) { - stdout ++= Symbol.encode(XML.content(message)) - } - else if (msg.is_stderr) { - stderr ++= Symbol.encode(XML.content(message)) - } - else if (msg.is_exit) { - val err = - "Prover terminated" + - (msg.properties match { - case Markup.Process_Result(result) => ": " + result.print_rc - case _ => "" - }) - Build_Session_Errors(List(err)) - } - case _ => - } - - build_context.session_setup(session_name, session) - - val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) - - - /* process */ - - val process = - Isabelle_Process.start(options, session, session_background, session_heaps, - use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env) - - val timeout_request: Option[Event_Timer.Request] = - if (info.timeout_ignored) None - else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() }) - - val build_errors = - Isabelle_Thread.interrupt_handler(_ => process.terminate()) { - Exn.capture { process.await_startup() } match { - case Exn.Res(_) => - val resources_yxml = resources.init_session_yxml - val encode_options: XML.Encode.T[Options] = - options => session.prover_options(options).encode - val args_yxml = - YXML.string_of_body( - { - import XML.Encode._ - pair(string, list(pair(encode_options, list(pair(string, properties)))))( - (session_name, info.theories)) - }) - session.protocol_command("build_session", resources_yxml, args_yxml) - Build_Session_Errors.result - case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn))) - } - } - - val result0 = - Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() } - - val was_timeout = - timeout_request match { - case None => false - case Some(request) => !request.cancel() - } - - session.stop() - - val export_errors = - export_consumer.shutdown(close = true).map(Output.error_message_text) - - val (document_output, document_errors) = - try { - if (Exn.is_res(build_errors) && result0.ok && info.documents.nonEmpty) { - using(Export.open_database_context(store, server = server)) { database_context => - val documents = - using(database_context.open_session(session_background)) { - session_context => - Document_Build.build_documents( - Document_Build.context(session_context, progress = progress), - output_sources = info.document_output, - output_pdf = info.document_output) - } - using(database_context.open_database(session_name, output = true))(session_database => - documents.foreach(_.write(session_database.db, session_name))) - (documents.flatMap(_.log_lines), Nil) - } - } - else (Nil, Nil) - } - catch { - case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors) - case Exn.Interrupt.ERROR(msg) => (Nil, List(msg)) - } - - - /* process result */ - - val result1 = { - val theory_timing = - theory_timings.iterator.flatMap( - { - case props @ Markup.Name(name) => Some(name -> props) - case _ => None - }).toMap - val used_theory_timings = - for { (name, _) <- session_background.base.used_theories } - yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory)) - - val more_output = - Library.trim_line(stdout.toString) :: - command_timings.toList.map(Protocol.Command_Timing_Marker.apply) ::: - used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) ::: - session_timings.toList.map(Protocol.Session_Timing_Marker.apply) ::: - runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) ::: - task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) ::: - document_output - - result0.output(more_output) - .error(Library.trim_line(stderr.toString)) - .errors_rc(export_errors ::: document_errors) - } - - val result2 = - build_errors match { - case Exn.Res(build_errs) => - val errs = build_errs ::: document_errors - if (errs.nonEmpty) { - result1.error_rc.output( - errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: - errs.map(Protocol.Error_Message_Marker.apply)) - } - else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt) - else result1 - case Exn.Exn(Exn.Interrupt()) => - if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt) - else result1 - case Exn.Exn(exn) => throw exn - } - - val process_result = - if (result2.ok) result2 - else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc - else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt")) - else result2 - - - /* output heap */ - - val output_shasum = { - val heap = store.output_heap(session_name) - if (process_result.ok && store_heap && heap.is_file) { - val slice = Space.MiB(options.real("build_database_slice")).bytes - val digest = ML_Heap.store(database_server, session_name, heap, slice) - SHA1.shasum(digest, session_name) - } - else SHA1.no_shasum - } - - val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) - - val build_log = - Build_Log.Log_File(session_name, process_result.out_lines, cache = store.cache). - parse_session_info( - command_timings = true, - theory_timings = true, - ml_statistics = true, - task_statistics = true) - - // write log file - if (process_result.ok) { - File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) - } - else File.write(store.output_log(session_name), terminate_lines(log_lines)) - - // write database - def write_info(db: SQL.Database): Unit = - store.write_session_info(db, session_name, session_sources, - build_log = - if (process_result.timeout) build_log.error("Timeout") else build_log, - build = - Store.Build_Info( - sources = sources_shasum, - input_heaps = input_shasum, - output_heap = output_shasum, - process_result.rc, - build_context.build_uuid)) - - val valid = - if (progress.stopped_local) false - else { - database_server match { - case Some(db) => write_info(db) - case None => using(store.open_database(session_name, output = true))(write_info) - } - true - } - - // messages - process_result.err_lines.foreach(progress.echo(_)) - - if (process_result.ok) { - val props = build_log.session_timing - val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 - val timing = Markup.Timing_Properties.get(props) - progress.echo( - "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")", - verbose = true) - progress.echo( - "Finished " + session_name + " (" + process_result.timing.message_resources + ")") - } - else { - progress.echo( - session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")") - if (!process_result.interrupted) { - val tail = info.options.int("process_output_tail") - val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0) - val prefix = if (log_lines.length == suffix.length) Nil else List("...") - progress.echo(Library.trim_line(cat_lines(prefix ::: suffix))) - } - } - - if (valid) Some(Result(process_result.copy(out_lines = log_lines), output_shasum)) - else None - } - } - - override def cancel(): Unit = future_result.cancel() - override def is_finished: Boolean = future_result.is_finished - override def join: Option[Result] = future_result.join - } -}