# HG changeset patch # User wenzelm # Date 1689534093 -7200 # Node ID 30d3faa6c24564aee9a61a9e4b20ef07897806e0 # Parent 928e031b7c52879a05f524bf9f6b02e65072686d reuse SSH.Server connection database server; diff -r 928e031b7c52 -r 30d3faa6c245 src/Pure/Thy/store.scala --- a/src/Pure/Thy/store.scala Sun Jul 16 19:38:12 2023 +0200 +++ b/src/Pure/Thy/store.scala Sun Jul 16 21:01:33 2023 +0200 @@ -313,26 +313,30 @@ ssh_port = options.int("build_database_ssh_port"), ssh_user = options.string("build_database_ssh_user")) - def maybe_open_database_server(): Option[SQL.Database] = - if (build_database_server) Some(open_database_server()) else None + def maybe_open_database_server(server: SSH.Server = SSH.no_server): Option[SQL.Database] = + if (build_database_server) Some(open_database_server(server = server)) else None - def open_build_database(path: Path): SQL.Database = - if (build_database_server) open_database_server() + def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database = + if (build_database_server) open_database_server(server = server) else SQLite.open_database(path, restrict = true) def maybe_open_build_database( - path: Path = Path.explode("$ISABELLE_HOME_USER/build.db") - ): Option[SQL.Database] = if (build_database_test) Some(open_build_database(path)) else None + path: Path = Path.explode("$ISABELLE_HOME_USER/build.db"), + server: SSH.Server = SSH.no_server + ): Option[SQL.Database] = { + if (build_database_test) Some(open_build_database(path, server = server)) else None + } def try_open_database( name: String, output: Boolean = false, + server: SSH.Server = SSH.no_server, server_mode: Boolean = build_database_server ): Option[SQL.Database] = { def check(db: SQL.Database): Option[SQL.Database] = if (output || session_info_exists(db)) Some(db) else { db.close(); None } - if (server_mode) check(open_database_server()) + if (server_mode) check(open_database_server(server = server)) else if (output) Some(SQLite.open_database(output_database(name))) else { (for { @@ -346,8 +350,13 @@ def error_database(name: String): Nothing = error("Missing build database for session " + quote(name)) - def open_database(name: String, output: Boolean = false): SQL.Database = - try_open_database(name, output = output) getOrElse error_database(name) + def open_database( + name: String, + output: Boolean = false, + server: SSH.Server = SSH.no_server + ): SQL.Database = { + try_open_database(name, output = output, server = server) getOrElse error_database(name) + } def clean_output( database_server: Option[SQL.Database], @@ -378,6 +387,7 @@ } def check_output( + server: SSH.Server, name: String, session_options: Options, sources_shasum: SHA1.Shasum, @@ -385,7 +395,7 @@ fresh_build: Boolean, store_heap: Boolean ): (Boolean, SHA1.Shasum) = { - try_open_database(name) match { + try_open_database(name, server = server) match { case Some(db) => using(db) { _ => read_build(db, name) match { diff -r 928e031b7c52 -r 30d3faa6c245 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Sun Jul 16 19:38:12 2023 +0200 +++ b/src/Pure/Tools/build.scala Sun Jul 16 21:01:33 2023 +0200 @@ -72,8 +72,11 @@ def build_options(options: Options): Options = options + "completion_limit=0" + "editor_tracing_messages=0" - def build_process(build_context: Build_Process.Context, build_progress: Progress): Build_Process = - new Build_Process(build_context, build_progress) + def build_process( + build_context: Build_Process.Context, + build_progress: Progress, + server: SSH.Server + ): Build_Process = new Build_Process(build_context, build_progress, server) final def build_store(options: Options, cache: Term.Cache = Term.Cache.make()): Store = { val store = Store(build_options(options), cache = cache) @@ -85,9 +88,13 @@ store } - final def run(context: Build_Process.Context, progress: Progress): Results = + final def run( + context: Build_Process.Context, + progress: Progress, + server: SSH.Server + ): Results = Isabelle_Thread.uninterruptible { - using(build_process(context, progress)) { build_process => + using(build_process(context, progress, server)) { build_process => Results(context, build_process.run()) } } @@ -126,108 +133,111 @@ val store = build_engine.build_store(options, cache = cache) val build_options = store.options + using(store.open_server()) { server => + using_optional(store.maybe_open_database_server(server = server)) { database_server => - /* session selection and dependencies */ + + /* session selection and dependencies */ - val full_sessions = - Sessions.load_structure(build_options, dirs = dirs, select_dirs = select_dirs, infos = infos, - augment_options = augment_options) - val full_sessions_selection = full_sessions.imports_selection(selection) + val full_sessions = + Sessions.load_structure(build_options, dirs = dirs, select_dirs = select_dirs, infos = infos, + augment_options = augment_options) + val full_sessions_selection = full_sessions.imports_selection(selection) - val build_deps = { - val deps0 = - Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true, - list_files = list_files, check_keywords = check_keywords).check_errors + val build_deps = { + val deps0 = + Sessions.deps(full_sessions.selection(selection), progress = progress, inlined_files = true, + list_files = list_files, check_keywords = check_keywords).check_errors - if (soft_build && !fresh_build) { - val outdated = - deps0.sessions_structure.build_topological_order.flatMap(name => - store.try_open_database(name) match { - case Some(db) => - using(db)(store.read_build(_, name)) match { - case Some(build) if build.ok => - val session_options = deps0.sessions_structure(name).options - val session_sources = deps0.sources_shasum(name) - if (Sessions.eq_sources(session_options, build.sources, session_sources)) None - else Some(name) - case _ => Some(name) - } - case None => Some(name) - }) + if (soft_build && !fresh_build) { + val outdated = + deps0.sessions_structure.build_topological_order.flatMap(name => + store.try_open_database(name, server = server) match { + case Some(db) => + using(db)(store.read_build(_, name)) match { + case Some(build) if build.ok => + val session_options = deps0.sessions_structure(name).options + val session_sources = deps0.sources_shasum(name) + if (Sessions.eq_sources(session_options, build.sources, session_sources)) None + else Some(name) + case _ => Some(name) + } + case None => Some(name) + }) - Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)), - progress = progress, inlined_files = true).check_errors - } - else deps0 - } + Sessions.deps(full_sessions.selection(Sessions.Selection(sessions = outdated)), + progress = progress, inlined_files = true).check_errors + } + else deps0 + } - /* check unknown files */ + /* check unknown files */ + + if (check_unknown_files) { + val source_files = + (for { + (_, base) <- build_deps.session_bases.iterator + (path, _) <- base.session_sources.iterator + } yield path).toList + Mercurial.check_files(source_files)._2 match { + case Nil => + case unknown_files => + progress.echo_warning("Unknown files (not part of the underlying Mercurial repository):" + + unknown_files.map(File.standard_path).sorted.mkString("\n ", "\n ", "")) + } + } + + + /* build process and results */ + + val build_context = + Build_Process.Context(store, build_deps, hostname = hostname(build_options), + build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs, + fresh_build = fresh_build, no_build = no_build, session_setup = session_setup, + master = true) - if (check_unknown_files) { - val source_files = - (for { - (_, base) <- build_deps.session_bases.iterator - (path, _) <- base.session_sources.iterator - } yield path).toList - Mercurial.check_files(source_files)._2 match { - case Nil => - case unknown_files => - progress.echo_warning("Unknown files (not part of the underlying Mercurial repository):" + - unknown_files.map(File.standard_path).sorted.mkString("\n ", "\n ", "")) + if (clean_build) { + for (name <- full_sessions.imports_descendants(full_sessions_selection)) { + store.clean_output(database_server, name) match { + case None => + case Some(true) => progress.echo("Cleaned " + name) + case Some(false) => progress.echo(name + " FAILED to clean") + } + } + } + + val results = build_engine.run(build_context, progress, server) + + if (export_files) { + for (name <- full_sessions_selection.iterator if results(name).ok) { + val info = results.info(name) + if (info.export_files.nonEmpty) { + progress.echo("Exporting " + info.name + " ...") + for ((dir, prune, pats) <- info.export_files) { + Export.export_files(store, name, info.dir + dir, + progress = if (progress.verbose) progress else new Progress, + export_prune = prune, + export_patterns = pats) + } + } + } + } + + val presentation_sessions = + results.sessions_ok.filter(name => browser_info.enabled(results.info(name))) + if (presentation_sessions.nonEmpty && !progress.stopped) { + Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions, + progress = progress) + } + + if (!results.ok && (progress.verbose || !no_build)) { + progress.echo("Unfinished session(s): " + commas(results.unfinished)) + } + + results } } - - - /* build process and results */ - - val build_context = - Build_Process.Context(store, build_deps, hostname = hostname(build_options), - build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs, - fresh_build = fresh_build, no_build = no_build, session_setup = session_setup, - master = true) - - if (clean_build) { - using_optional(store.maybe_open_database_server()) { database_server => - for (name <- full_sessions.imports_descendants(full_sessions_selection)) { - store.clean_output(database_server, name) match { - case None => - case Some(true) => progress.echo("Cleaned " + name) - case Some(false) => progress.echo(name + " FAILED to clean") - } - } - } - } - - val results = build_engine.run(build_context, progress) - - if (export_files) { - for (name <- full_sessions_selection.iterator if results(name).ok) { - val info = results.info(name) - if (info.export_files.nonEmpty) { - progress.echo("Exporting " + info.name + " ...") - for ((dir, prune, pats) <- info.export_files) { - Export.export_files(store, name, info.dir + dir, - progress = if (progress.verbose) progress else new Progress, - export_prune = prune, - export_patterns = pats) - } - } - } - } - - val presentation_sessions = - results.sessions_ok.filter(name => browser_info.enabled(results.info(name))) - if (presentation_sessions.nonEmpty && !progress.stopped) { - Browser_Info.build(browser_info, results.store, results.deps, presentation_sessions, - progress = progress) - } - - if (!results.ok && (progress.verbose || !no_build)) { - progress.echo("Unfinished session(s): " + commas(results.unfinished)) - } - - results } @@ -450,29 +460,31 @@ val store = build_engine.build_store(options) val build_options = store.options - using_optional(store.maybe_open_build_database()) { build_database => - val builds = read_builds(build_database) + using(store.open_server()) { server => + using_optional(store.maybe_open_build_database(server = server)) { build_database => + val builds = read_builds(build_database) - if (list_builds) progress.echo(print_builds(build_database, builds)) + if (list_builds) progress.echo(print_builds(build_database, builds)) - if (!list_builds || build_id.nonEmpty) { - val build_master = id_builds(build_database, build_id, builds) + if (!list_builds || build_id.nonEmpty) { + val build_master = id_builds(build_database, build_id, builds) - val sessions_structure = - Sessions.load_structure(build_options, dirs = dirs). - selection(Sessions.Selection(sessions = build_master.sessions)) + val sessions_structure = + Sessions.load_structure(build_options, dirs = dirs). + selection(Sessions.Selection(sessions = build_master.sessions)) - val build_deps = - Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors + val build_deps = + Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors - val build_context = - Build_Process.Context(store, build_deps, hostname = hostname(build_options), - numa_shuffling = numa_shuffling, max_jobs = max_jobs, - build_uuid = build_master.build_uuid) + val build_context = + Build_Process.Context(store, build_deps, hostname = hostname(build_options), + numa_shuffling = numa_shuffling, max_jobs = max_jobs, + build_uuid = build_master.build_uuid) - Some(build_engine.run(build_context, progress)) + Some(build_engine.run(build_context, progress, server)) + } + else None } - else None } } diff -r 928e031b7c52 -r 30d3faa6c245 src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Sun Jul 16 19:38:12 2023 +0200 +++ b/src/Pure/Tools/build_job.scala Sun Jul 16 21:01:33 2023 +0200 @@ -24,14 +24,14 @@ session_context: Session_Context, progress: Progress, log: Logger, - database_server: Option[SQL.Database], + server: SSH.Server, session_background: Sessions.Background, sources_shasum: SHA1.Shasum, input_shasum: SHA1.Shasum, node_info: Host.Node_Info, store_heap: Boolean ): Session_Job = { - new Session_Job(build_context, session_context, progress, log, database_server, + new Session_Job(build_context, session_context, progress, log, server, session_background, sources_shasum, input_shasum, node_info, store_heap) } @@ -45,14 +45,15 @@ sources_shasum: SHA1.Shasum, timeout: Time, store: Store, - progress: Progress = new Progress + progress: Progress = new Progress, + server: SSH.Server = SSH.no_server ): Session_Context = { def default: Session_Context = Session_Context( name, deps, ancestors, session_prefs, sources_shasum, timeout, Time.zero, Bytes.empty, build_uuid) - store.try_open_database(name) match { + store.try_open_database(name, server = server) match { case None => default case Some(db) => def ignore_error(msg: String) = { @@ -99,7 +100,7 @@ session_context: Session_Context, progress: Progress, log: Logger, - database_server: Option[SQL.Database], + server: SSH.Server, session_background: Sessions.Background, sources_shasum: SHA1.Shasum, input_shasum: SHA1.Shasum, @@ -115,414 +116,417 @@ val store = build_context.store - store.clean_output(database_server, session_name, session_init = true) - - val session_sources = - Store.Sources.load(session_background.base, cache = store.cache.compress) - - val env = - Isabelle_System.settings( - List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString)) - - val session_heaps = - session_background.info.parent match { - case None => Nil - case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic) - } - - val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil - - val eval_store = - if (store_heap) { - (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) ::: - List("ML_Heap.save_child " + - ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name)))) - } - else Nil + using_optional(store.maybe_open_database_server(server = server)) { database_server => - def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] = - session_background.base.theory_load_commands.get(node_name.theory) match { - case None => Nil - case Some(spans) => - val syntax = session_background.base.theory_syntax(node_name) - val master_dir = Path.explode(node_name.master_dir) - for (span <- spans; file <- span.loaded_files(syntax).files) - yield { - val src_path = Path.explode(file) - val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path)) - - val bytes = session_sources(blob_name.node).bytes - val text = bytes.text - val chunk = Symbol.Text_Chunk(text) + store.clean_output(database_server, session_name, session_init = true) - Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) -> - Document.Blobs.Item(bytes, text, chunk, changed = false) - } - } - - - /* session */ - - val resources = - new Resources(session_background, log = log, - command_timings = - Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache)) - - val session = - new Session(options, resources) { - override val cache: Term.Cache = store.cache - - override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info = - Command.Blobs_Info.make(session_blobs(node_name)) + val session_sources = + Store.Sources.load(session_background.base, cache = store.cache.compress) - override def build_blobs(node_name: Document.Node.Name): Document.Blobs = - Document.Blobs.make(session_blobs(node_name)) - } - - object Build_Session_Errors { - private val promise: Promise[List[String]] = Future.promise - - def result: Exn.Result[List[String]] = promise.join_result - def cancel(): Unit = promise.cancel() - def apply(errs: List[String]): Unit = { - try { promise.fulfill(errs) } - catch { case _: IllegalStateException => } - } - } - - val export_consumer = - Export.consumer(store.open_database(session_name, output = true), store.cache, - progress = progress) - - val stdout = new StringBuilder(1000) - val stderr = new StringBuilder(1000) - val command_timings = new mutable.ListBuffer[Properties.T] - val theory_timings = new mutable.ListBuffer[Properties.T] - val session_timings = new mutable.ListBuffer[Properties.T] - val runtime_statistics = new mutable.ListBuffer[Properties.T] - val task_statistics = new mutable.ListBuffer[Properties.T] + val env = + Isabelle_System.settings( + List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString)) - def fun( - name: String, - acc: mutable.ListBuffer[Properties.T], - unapply: Properties.T => Option[Properties.T] - ): (String, Session.Protocol_Function) = { - name -> ((msg: Prover.Protocol_Output) => - unapply(msg.properties) match { - case Some(props) => acc += props; true - case _ => false - }) - } - - session.init_protocol_handler(new Session.Protocol_Handler { - override def exit(): Unit = Build_Session_Errors.cancel() - - private def build_session_finished(msg: Prover.Protocol_Output): Boolean = { - val (rc, errors) = - try { - val (rc, errs) = { - import XML.Decode._ - pair(int, list(x => x))(Symbol.decode_yxml(msg.text)) - } - val errors = - for (err <- errs) yield { - val prt = Protocol_Message.expose_no_reports(err) - Pretty.string_of(prt, metric = Symbol.Metric) - } - (rc, errors) - } - catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) } - - session.protocol_command("Prover.stop", rc.toString) - Build_Session_Errors(errors) - true + val session_heaps = + session_background.info.parent match { + case None => Nil + case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic) } - private def loading_theory(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Markup.Loading_Theory(Markup.Name(name)) => - progress.theory(Progress.Theory(name, session = session_name)) - false - case _ => false - } - - private def export_(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Protocol.Export(args) => - export_consumer.make_entry(session_name, args, msg.chunk) - true - case _ => false - } + val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil - override val functions: Session.Protocol_Functions = - List( - Markup.Build_Session_Finished.name -> build_session_finished, - Markup.Loading_Theory.name -> loading_theory, - Markup.EXPORT -> export_, - fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply), - fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply), - fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply)) - }) - - session.command_timings += Session.Consumer("command_timings") { - case Session.Command_Timing(props) => - for { - elapsed <- Markup.Elapsed.unapply(props) - elapsed_time = Time.seconds(elapsed) - if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold") - } command_timings += props.filter(Markup.command_timing_property) - } - - session.runtime_statistics += Session.Consumer("ML_statistics") { - case Session.Runtime_Statistics(props) => runtime_statistics += props - } + val eval_store = + if (store_heap) { + (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) ::: + List("ML_Heap.save_child " + + ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name)))) + } + else Nil - session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") { - case snapshot => - if (!progress.stopped) { - def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = { - if (!progress.stopped) { - val theory_name = snapshot.node_name.theory - val args = - Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress) - val body = Bytes(Symbol.encode(YXML.string_of_body(xml))) - export_consumer.make_entry(session_name, args, body) - } - } - def export_text(name: String, text: String, compress: Boolean = true): Unit = - export_(name, List(XML.Text(text)), compress = compress) - - for (command <- snapshot.snippet_command) { - export_text(Export.DOCUMENT_ID, command.id.toString, compress = false) - } - - export_text(Export.FILES, - cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))), - compress = false) + def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] = + session_background.base.theory_load_commands.get(node_name.theory) match { + case None => Nil + case Some(spans) => + val syntax = session_background.base.theory_syntax(node_name) + val master_dir = Path.explode(node_name.master_dir) + for (span <- spans; file <- span.loaded_files(syntax).files) + yield { + val src_path = Path.explode(file) + val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path)) - for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) { - val xml = snapshot.switch(blob_name).xml_markup() - export_(Export.MARKUP + (i + 1), xml) - } - export_(Export.MARKUP, snapshot.xml_markup()) - export_(Export.MESSAGES, snapshot.messages.map(_._1)) - } - } + val bytes = session_sources(blob_name.node).bytes + val text = bytes.text + val chunk = Symbol.Text_Chunk(text) - session.all_messages += Session.Consumer[Any]("build_session_output") { - case msg: Prover.Output => - val message = msg.message - if (msg.is_system) resources.log(Protocol.message_text(message)) - - if (msg.is_stdout) { - stdout ++= Symbol.encode(XML.content(message)) + Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) -> + Document.Blobs.Item(bytes, text, chunk, changed = false) + } } - else if (msg.is_stderr) { - stderr ++= Symbol.encode(XML.content(message)) - } - else if (msg.is_exit) { - val err = - "Prover terminated" + - (msg.properties match { - case Markup.Process_Result(result) => ": " + result.print_rc - case _ => "" - }) - Build_Session_Errors(List(err)) - } - case _ => - } - - build_context.session_setup(session_name, session) - - val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) - /* process */ + /* session */ - val process = - Isabelle_Process.start(options, session, session_background, session_heaps, - use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env) + val resources = + new Resources(session_background, log = log, + command_timings = + Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache)) - val timeout_request: Option[Event_Timer.Request] = - if (info.timeout_ignored) None - else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() }) + val session = + new Session(options, resources) { + override val cache: Term.Cache = store.cache - val build_errors = - Isabelle_Thread.interrupt_handler(_ => process.terminate()) { - Exn.capture { process.await_startup() } match { - case Exn.Res(_) => - val resources_yxml = resources.init_session_yxml - val encode_options: XML.Encode.T[Options] = - options => session.prover_options(options).encode - val args_yxml = - YXML.string_of_body( - { - import XML.Encode._ - pair(string, list(pair(encode_options, list(pair(string, properties)))))( - (session_name, info.theories)) - }) - session.protocol_command("build_session", resources_yxml, args_yxml) - Build_Session_Errors.result - case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn))) + override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info = + Command.Blobs_Info.make(session_blobs(node_name)) + + override def build_blobs(node_name: Document.Node.Name): Document.Blobs = + Document.Blobs.make(session_blobs(node_name)) + } + + object Build_Session_Errors { + private val promise: Promise[List[String]] = Future.promise + + def result: Exn.Result[List[String]] = promise.join_result + def cancel(): Unit = promise.cancel() + def apply(errs: List[String]): Unit = { + try { promise.fulfill(errs) } + catch { case _: IllegalStateException => } } } - val result0 = - Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() } + val export_consumer = + Export.consumer(store.open_database(session_name, output = true, server = server), + store.cache, progress = progress) - val was_timeout = - timeout_request match { - case None => false - case Some(request) => !request.cancel() + val stdout = new StringBuilder(1000) + val stderr = new StringBuilder(1000) + val command_timings = new mutable.ListBuffer[Properties.T] + val theory_timings = new mutable.ListBuffer[Properties.T] + val session_timings = new mutable.ListBuffer[Properties.T] + val runtime_statistics = new mutable.ListBuffer[Properties.T] + val task_statistics = new mutable.ListBuffer[Properties.T] + + def fun( + name: String, + acc: mutable.ListBuffer[Properties.T], + unapply: Properties.T => Option[Properties.T] + ): (String, Session.Protocol_Function) = { + name -> ((msg: Prover.Protocol_Output) => + unapply(msg.properties) match { + case Some(props) => acc += props; true + case _ => false + }) } - session.stop() + session.init_protocol_handler(new Session.Protocol_Handler { + override def exit(): Unit = Build_Session_Errors.cancel() + + private def build_session_finished(msg: Prover.Protocol_Output): Boolean = { + val (rc, errors) = + try { + val (rc, errs) = { + import XML.Decode._ + pair(int, list(x => x))(Symbol.decode_yxml(msg.text)) + } + val errors = + for (err <- errs) yield { + val prt = Protocol_Message.expose_no_reports(err) + Pretty.string_of(prt, metric = Symbol.Metric) + } + (rc, errors) + } + catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) } + + session.protocol_command("Prover.stop", rc.toString) + Build_Session_Errors(errors) + true + } - val export_errors = - export_consumer.shutdown(close = true).map(Output.error_message_text) + private def loading_theory(msg: Prover.Protocol_Output): Boolean = + msg.properties match { + case Markup.Loading_Theory(Markup.Name(name)) => + progress.theory(Progress.Theory(name, session = session_name)) + false + case _ => false + } + + private def export_(msg: Prover.Protocol_Output): Boolean = + msg.properties match { + case Protocol.Export(args) => + export_consumer.make_entry(session_name, args, msg.chunk) + true + case _ => false + } + + override val functions: Session.Protocol_Functions = + List( + Markup.Build_Session_Finished.name -> build_session_finished, + Markup.Loading_Theory.name -> loading_theory, + Markup.EXPORT -> export_, + fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply), + fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply), + fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply)) + }) + + session.command_timings += Session.Consumer("command_timings") { + case Session.Command_Timing(props) => + for { + elapsed <- Markup.Elapsed.unapply(props) + elapsed_time = Time.seconds(elapsed) + if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold") + } command_timings += props.filter(Markup.command_timing_property) + } - val (document_output, document_errors) = - try { - if (build_errors.isInstanceOf[Exn.Res[_]] && result0.ok && info.documents.nonEmpty) { - using(Export.open_database_context(store)) { database_context => - val documents = - using(database_context.open_session(session_background)) { - session_context => - Document_Build.build_documents( - Document_Build.context(session_context, progress = progress), - output_sources = info.document_output, - output_pdf = info.document_output) + session.runtime_statistics += Session.Consumer("ML_statistics") { + case Session.Runtime_Statistics(props) => runtime_statistics += props + } + + session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") { + case snapshot => + if (!progress.stopped) { + def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = { + if (!progress.stopped) { + val theory_name = snapshot.node_name.theory + val args = + Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress) + val body = Bytes(Symbol.encode(YXML.string_of_body(xml))) + export_consumer.make_entry(session_name, args, body) } - using(database_context.open_database(session_name, output = true))(session_database => - documents.foreach(_.write(session_database.db, session_name))) - (documents.flatMap(_.log_lines), Nil) + } + def export_text(name: String, text: String, compress: Boolean = true): Unit = + export_(name, List(XML.Text(text)), compress = compress) + + for (command <- snapshot.snippet_command) { + export_text(Export.DOCUMENT_ID, command.id.toString, compress = false) + } + + export_text(Export.FILES, + cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))), + compress = false) + + for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) { + val xml = snapshot.switch(blob_name).xml_markup() + export_(Export.MARKUP + (i + 1), xml) + } + export_(Export.MARKUP, snapshot.xml_markup()) + export_(Export.MESSAGES, snapshot.messages.map(_._1)) } - } - else (Nil, Nil) } - catch { - case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors) - case Exn.Interrupt.ERROR(msg) => (Nil, List(msg)) + + session.all_messages += Session.Consumer[Any]("build_session_output") { + case msg: Prover.Output => + val message = msg.message + if (msg.is_system) resources.log(Protocol.message_text(message)) + + if (msg.is_stdout) { + stdout ++= Symbol.encode(XML.content(message)) + } + else if (msg.is_stderr) { + stderr ++= Symbol.encode(XML.content(message)) + } + else if (msg.is_exit) { + val err = + "Prover terminated" + + (msg.properties match { + case Markup.Process_Result(result) => ": " + result.print_rc + case _ => "" + }) + Build_Session_Errors(List(err)) + } + case _ => } + build_context.session_setup(session_name, session) + + val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) + - /* process result */ + /* process */ + + val process = + Isabelle_Process.start(options, session, session_background, session_heaps, + use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env) + + val timeout_request: Option[Event_Timer.Request] = + if (info.timeout_ignored) None + else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() }) - val result1 = { - val theory_timing = - theory_timings.iterator.flatMap( - { - case props @ Markup.Name(name) => Some(name -> props) - case _ => None - }).toMap - val used_theory_timings = - for { (name, _) <- session_background.base.used_theories } - yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory)) + val build_errors = + Isabelle_Thread.interrupt_handler(_ => process.terminate()) { + Exn.capture { process.await_startup() } match { + case Exn.Res(_) => + val resources_yxml = resources.init_session_yxml + val encode_options: XML.Encode.T[Options] = + options => session.prover_options(options).encode + val args_yxml = + YXML.string_of_body( + { + import XML.Encode._ + pair(string, list(pair(encode_options, list(pair(string, properties)))))( + (session_name, info.theories)) + }) + session.protocol_command("build_session", resources_yxml, args_yxml) + Build_Session_Errors.result + case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn))) + } + } - val more_output = - Library.trim_line(stdout.toString) :: - command_timings.toList.map(Protocol.Command_Timing_Marker.apply) ::: - used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) ::: - session_timings.toList.map(Protocol.Session_Timing_Marker.apply) ::: - runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) ::: - task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) ::: - document_output + val result0 = + Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() } - result0.output(more_output) - .error(Library.trim_line(stderr.toString)) - .errors_rc(export_errors ::: document_errors) - } + val was_timeout = + timeout_request match { + case None => false + case Some(request) => !request.cancel() + } + + session.stop() + + val export_errors = + export_consumer.shutdown(close = true).map(Output.error_message_text) - val result2 = - build_errors match { - case Exn.Res(build_errs) => - val errs = build_errs ::: document_errors - if (errs.nonEmpty) { - result1.error_rc.output( - errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: - errs.map(Protocol.Error_Message_Marker.apply)) + val (document_output, document_errors) = + try { + if (build_errors.isInstanceOf[Exn.Res[_]] && result0.ok && info.documents.nonEmpty) { + using(Export.open_database_context(store)) { database_context => + val documents = + using(database_context.open_session(session_background)) { + session_context => + Document_Build.build_documents( + Document_Build.context(session_context, progress = progress), + output_sources = info.document_output, + output_pdf = info.document_output) + } + using(database_context.open_database(session_name, output = true))(session_database => + documents.foreach(_.write(session_database.db, session_name))) + (documents.flatMap(_.log_lines), Nil) + } } - else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt) - else result1 - case Exn.Exn(Exn.Interrupt()) => - if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt) - else result1 - case Exn.Exn(exn) => throw exn - } - - val process_result = - if (result2.ok) result2 - else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc - else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt")) - else result2 + else (Nil, Nil) + } + catch { + case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors) + case Exn.Interrupt.ERROR(msg) => (Nil, List(msg)) + } - /* output heap */ + /* process result */ - val output_shasum = { - val heap = store.output_heap(session_name) - if (process_result.ok && store_heap && heap.is_file) { - val slice = Space.MiB(options.real("build_database_slice")).bytes - val digest = ML_Heap.store(database_server, session_name, heap, slice) - SHA1.shasum(digest, session_name) - } - else SHA1.no_shasum - } + val result1 = { + val theory_timing = + theory_timings.iterator.flatMap( + { + case props @ Markup.Name(name) => Some(name -> props) + case _ => None + }).toMap + val used_theory_timings = + for { (name, _) <- session_background.base.used_theories } + yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory)) - val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) + val more_output = + Library.trim_line(stdout.toString) :: + command_timings.toList.map(Protocol.Command_Timing_Marker.apply) ::: + used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) ::: + session_timings.toList.map(Protocol.Session_Timing_Marker.apply) ::: + runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) ::: + task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) ::: + document_output - val build_log = - Build_Log.Log_File(session_name, process_result.out_lines). - parse_session_info( - command_timings = true, - theory_timings = true, - ml_statistics = true, - task_statistics = true) + result0.output(more_output) + .error(Library.trim_line(stderr.toString)) + .errors_rc(export_errors ::: document_errors) + } - // write log file - if (process_result.ok) { - File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) - } - else File.write(store.output_log(session_name), terminate_lines(log_lines)) + val result2 = + build_errors match { + case Exn.Res(build_errs) => + val errs = build_errs ::: document_errors + if (errs.nonEmpty) { + result1.error_rc.output( + errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: + errs.map(Protocol.Error_Message_Marker.apply)) + } + else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt) + else result1 + case Exn.Exn(Exn.Interrupt()) => + if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt) + else result1 + case Exn.Exn(exn) => throw exn + } + + val process_result = + if (result2.ok) result2 + else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc + else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt")) + else result2 + + + /* output heap */ - // write database - using(store.open_database(session_name, output = true))(db => - store.write_session_info(db, session_name, session_sources, - build_log = - if (process_result.timeout) build_log.error("Timeout") else build_log, - build = - Store.Build_Info( - sources = sources_shasum, - input_heaps = input_shasum, - output_heap = output_shasum, - process_result.rc, - build_context.build_uuid))) + val output_shasum = { + val heap = store.output_heap(session_name) + if (process_result.ok && store_heap && heap.is_file) { + val slice = Space.MiB(options.real("build_database_slice")).bytes + val digest = ML_Heap.store(database_server, session_name, heap, slice) + SHA1.shasum(digest, session_name) + } + else SHA1.no_shasum + } + + val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) - // messages - process_result.err_lines.foreach(progress.echo(_)) + val build_log = + Build_Log.Log_File(session_name, process_result.out_lines). + parse_session_info( + command_timings = true, + theory_timings = true, + ml_statistics = true, + task_statistics = true) + + // write log file + if (process_result.ok) { + File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) + } + else File.write(store.output_log(session_name), terminate_lines(log_lines)) - if (process_result.ok) { - val props = build_log.session_timing - val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 - val timing = Markup.Timing_Properties.get(props) - progress.echo( - "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")", - verbose = true) - progress.echo( - "Finished " + session_name + " (" + process_result.timing.message_resources + ")") + // write database + using(store.open_database(session_name, output = true, server = server))(db => + store.write_session_info(db, session_name, session_sources, + build_log = + if (process_result.timeout) build_log.error("Timeout") else build_log, + build = + Store.Build_Info( + sources = sources_shasum, + input_heaps = input_shasum, + output_heap = output_shasum, + process_result.rc, + build_context.build_uuid))) + + // messages + process_result.err_lines.foreach(progress.echo(_)) + + if (process_result.ok) { + val props = build_log.session_timing + val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 + val timing = Markup.Timing_Properties.get(props) + progress.echo( + "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")", + verbose = true) + progress.echo( + "Finished " + session_name + " (" + process_result.timing.message_resources + ")") + } + else { + progress.echo( + session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")") + if (!process_result.interrupted) { + val tail = info.options.int("process_output_tail") + val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0) + val prefix = if (log_lines.length == suffix.length) Nil else List("...") + progress.echo(Library.trim_line(cat_lines(prefix ::: suffix))) + } + } + + (process_result.copy(out_lines = log_lines), output_shasum) } - else { - progress.echo( - session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")") - if (!process_result.interrupted) { - val tail = info.options.int("process_output_tail") - val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0) - val prefix = if (log_lines.length == suffix.length) Nil else List("...") - progress.echo(Library.trim_line(cat_lines(prefix ::: suffix))) - } - } - - (process_result.copy(out_lines = log_lines), output_shasum) } override def cancel(): Unit = future_result.cancel() diff -r 928e031b7c52 -r 30d3faa6c245 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sun Jul 16 19:38:12 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Sun Jul 16 21:01:33 2023 +0200 @@ -128,7 +128,11 @@ make(data(dom).foldLeft(graph.restrict(dom)) { case (g, e) => g.new_node(e.name, e) }) } - def init(build_context: Context, progress: Progress = new Progress): Sessions = { + def init( + build_context: Context, + progress: Progress = new Progress, + server: SSH.Server = SSH.no_server + ): Sessions = { val sessions_structure = build_context.sessions_structure make( sessions_structure.build_graph.iterator.foldLeft(graph) { @@ -167,7 +171,7 @@ val session = Build_Job.Session_Context.load( build_context.build_uuid, name, deps, ancestors, prefs, sources_shasum, - info.timeout, build_context.store, progress = progress) + info.timeout, build_context.store, progress = progress, server = server) graph0.new_node(name, session) } } @@ -835,7 +839,8 @@ class Build_Process( protected final val build_context: Build_Process.Context, - protected final val build_progress: Progress + protected final val build_progress: Progress, + protected final val server: SSH.Server ) extends AutoCloseable { /* context */ @@ -850,12 +855,12 @@ /* progress backed by database */ private val _database_server: Option[SQL.Database] = - try { store.maybe_open_database_server() } + try { store.maybe_open_database_server(server = server) } catch { case exn: Throwable => close(); throw exn } private val _build_database: Option[SQL.Database] = try { - for (db <- store.maybe_open_build_database()) yield { + for (db <- store.maybe_open_build_database(server = server)) yield { val store_tables = if (db.is_postgresql) Store.Data.tables else SQL.Tables.empty Build_Process.Data.transaction_lock(db, create = true, @@ -871,7 +876,7 @@ catch { case exn: Throwable => close(); throw exn } private val _host_database: Option[SQL.Database] = - try { store.maybe_open_build_database(path = Host.Data.database) } + try { store.maybe_open_build_database(path = Host.Data.database, server = server) } catch { case exn: Throwable => close(); throw exn } protected val (progress, worker_uuid) = synchronized { @@ -879,7 +884,7 @@ case None => (build_progress, UUID.random().toString) case Some(db) => try { - val progress_db = store.open_build_database(Progress.Data.database) + val progress_db = store.open_build_database(Progress.Data.database, server = server) val progress = new Database_Progress(progress_db, build_progress, hostname = hostname, @@ -966,7 +971,7 @@ state.sessions.iterator.exists(_.ancestors.contains(session_name)) val (current, output_shasum) = - store.check_output(session_name, + store.check_output(server, session_name, session_options = build_context.sessions_structure(session_name).options, sources_shasum = sources_shasum, input_shasum = input_shasum, @@ -1018,7 +1023,7 @@ val session = state.sessions(session_name) val build = - Build_Job.start_session(build_context, session, progress, log, _database_server, + Build_Job.start_session(build_context, session, progress, log, server, build_deps.background(session_name), sources_shasum, input_shasum, node_info, store_heap) val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))