# HG changeset patch # User wenzelm # Date 1676811109 -3600 # Node ID 632a92fcb6730b72a802d793829d69410b2fcf94 # Parent 98dab34ed72d87e25826ddbfd7cba001cf44ecc1# Parent c6d7246926036dfcffadd813c5b1513862d9d0df merged diff -r 98dab34ed72d -r 632a92fcb673 src/Pure/General/logger.scala --- a/src/Pure/General/logger.scala Sun Feb 19 09:55:37 2023 +0000 +++ b/src/Pure/General/logger.scala Sun Feb 19 13:51:49 2023 +0100 @@ -35,7 +35,8 @@ } class System_Logger extends Logger { - def apply(msg: => String): Unit = + def apply(msg: => String): Unit = synchronized { if (Platform.is_windows) System.out.println(msg) else System.console.writer.println(msg) + } } diff -r 98dab34ed72d -r 632a92fcb673 src/Pure/PIDE/document.scala --- a/src/Pure/PIDE/document.scala Sun Feb 19 09:55:37 2023 +0000 +++ b/src/Pure/PIDE/document.scala Sun Feb 19 13:51:49 2023 +0100 @@ -371,14 +371,16 @@ object Nodes { val empty: Nodes = new Nodes(Graph.empty(Node.Name.Ordering)) + + private def init(graph: Graph[Node.Name, Node], name: Node.Name): Graph[Node.Name, Node] = + graph.default_node(name, Node.empty) } final class Nodes private(graph: Graph[Node.Name, Node]) { - def apply(name: Node.Name): Node = - graph.default_node(name, Node.empty).get_node(name) + def apply(name: Node.Name): Node = Nodes.init(graph, name).get_node(name) def is_suppressed(name: Node.Name): Boolean = { - val graph1 = graph.default_node(name, Node.empty) + val graph1 = Nodes.init(graph, name) graph1.is_maximal(name) && graph1.get_node(name).is_empty } @@ -391,10 +393,7 @@ def + (entry: (Node.Name, Node)): Nodes = { val (name, node) = entry val imports = node.header.imports - val graph1 = - imports.foldLeft(graph.default_node(name, Node.empty)) { - case (g, p) => g.default_node(p, Node.empty) - } + val graph1 = (name :: imports).foldLeft(graph)(Nodes.init) val graph2 = graph1.imm_preds(name).foldLeft(graph1) { case (g, dep) => g.del_edge(dep, name) } val graph3 = imports.foldLeft(graph2) { case (g, dep) => g.add_edge(dep, name) } @@ -417,8 +416,8 @@ if name == file_name } yield cmd).toList - def descendants(names: List[Node.Name]): List[Node.Name] = graph.all_succs(names) - def requirements(names: List[Node.Name]): List[Node.Name] = graph.all_preds_rev(names) + def descendants(names: List[Node.Name]): List[Node.Name] = + names.foldLeft(graph)(Nodes.init).all_succs(names) def topological_order: List[Node.Name] = graph.topological_order override def toString: String = topological_order.mkString("Nodes(", ",", ")") diff -r 98dab34ed72d -r 632a92fcb673 src/Pure/Thy/sessions.scala --- a/src/Pure/Thy/sessions.scala Sun Feb 19 09:55:37 2023 +0000 +++ b/src/Pure/Thy/sessions.scala Sun Feb 19 13:51:49 2023 +0100 @@ -1331,6 +1331,18 @@ /** persistent store **/ + /** auxiliary **/ + + sealed case class Build_Info( + sources: SHA1.Shasum, + input_heaps: SHA1.Shasum, + output_heap: SHA1.Shasum, + return_code: Int, + uuid: String + ) { + def ok: Boolean = return_code == 0 + } + object Session_Info { val session_name = SQL.Column.string("session_name").make_primary_key @@ -1345,7 +1357,7 @@ List(session_name, session_timing, command_timings, theory_timings, ml_statistics, task_statistics, errors) - // Build.Session_Info + // Build_Info val sources = SQL.Column.string("sources") val input_heaps = SQL.Column.string("input_heaps") val output_heap = SQL.Column.string("output_heap") @@ -1553,7 +1565,7 @@ session_name: String, sources: Sources, build_log: Build_Log.Session_Info, - build: Build.Session_Info + build: Build_Info ): Unit = { db.transaction { write_sources(db, session_name, sources) @@ -1596,7 +1608,7 @@ def read_errors(db: SQL.Database, name: String): List[String] = Build_Log.uncompress_errors(read_bytes(db, name, Session_Info.errors), cache = cache) - def read_build(db: SQL.Database, name: String): Option[Build.Session_Info] = { + def read_build(db: SQL.Database, name: String): Option[Build_Info] = { if (db.tables.contains(Session_Info.table.name)) { db.using_statement(Session_Info.table.select(Nil, Session_Info.session_name.where_equal(name))) { stmt => @@ -1607,7 +1619,7 @@ try { Option(res.string(Session_Info.uuid)).getOrElse("") } catch { case _: SQLException => "" } Some( - Build.Session_Info( + Build_Info( SHA1.fake_shasum(res.string(Session_Info.sources)), SHA1.fake_shasum(res.string(Session_Info.input_heaps)), SHA1.fake_shasum(res.string(Session_Info.output_heap)), diff -r 98dab34ed72d -r 632a92fcb673 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Sun Feb 19 09:55:37 2023 +0000 +++ b/src/Pure/Tools/build.scala Sun Feb 19 13:51:49 2023 +0100 @@ -9,22 +9,6 @@ object Build { - /** auxiliary **/ - - /* persistent build info */ - - sealed case class Session_Info( - sources: SHA1.Shasum, - input_heaps: SHA1.Shasum, - output_heap: SHA1.Shasum, - return_code: Int, - uuid: String - ) { - def ok: Boolean = return_code == 0 - } - - - /** build with results **/ class Results private[Build]( diff -r 98dab34ed72d -r 632a92fcb673 src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Sun Feb 19 09:55:37 2023 +0000 +++ b/src/Pure/Tools/build_job.scala Sun Feb 19 13:51:49 2023 +0100 @@ -11,7 +11,373 @@ import scala.util.matching.Regex +trait Build_Job { + def session_name: String + def numa_node: Option[Int] = None + def start(): Unit = () + def terminate(): Unit = () + def is_finished: Boolean = false + def join: Process_Result = Process_Result.undefined +} + object Build_Job { + class Build_Session(progress: Progress, + session_background: Sessions.Background, + store: Sessions.Store, + val do_store: Boolean, + resources: Resources, + session_setup: (String, Session) => Unit, + val input_heaps: SHA1.Shasum, + override val numa_node: Option[Int] + ) extends Build_Job { + def session_name: String = session_background.session_name + val info: Sessions.Info = session_background.sessions_structure(session_name) + val options: Options = NUMA.policy_options(info.options, numa_node) + + val session_sources: Sessions.Sources = + Sessions.Sources.load(session_background.base, cache = store.cache.compress) + + private lazy val future_result: Future[Process_Result] = + Future.thread("build", uninterruptible = true) { + val parent = info.parent.getOrElse("") + + val env = + Isabelle_System.settings( + List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString)) + + val is_pure = Sessions.is_pure(session_name) + + val use_prelude = if (is_pure) Thy_Header.ml_roots.map(_._1) else Nil + + val eval_store = + if (do_store) { + (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) ::: + List("ML_Heap.save_child " + + ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name)))) + } + else Nil + + def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] = + session_background.base.theory_load_commands.get(node_name.theory) match { + case None => Nil + case Some(spans) => + val syntax = session_background.base.theory_syntax(node_name) + val master_dir = Path.explode(node_name.master_dir) + for (span <- spans; file <- span.loaded_files(syntax).files) + yield { + val src_path = Path.explode(file) + val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path)) + + val bytes = session_sources(blob_name.node).bytes + val text = bytes.text + val chunk = Symbol.Text_Chunk(text) + + Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) -> + Document.Blobs.Item(bytes, text, chunk, changed = false) + } + } + + val session = + new Session(options, resources) { + override val cache: Term.Cache = store.cache + + override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info = + Command.Blobs_Info.make(session_blobs(node_name)) + + override def build_blobs(node_name: Document.Node.Name): Document.Blobs = + Document.Blobs.make(session_blobs(node_name)) + } + + object Build_Session_Errors { + private val promise: Promise[List[String]] = Future.promise + + def result: Exn.Result[List[String]] = promise.join_result + def cancel(): Unit = promise.cancel() + def apply(errs: List[String]): Unit = { + try { promise.fulfill(errs) } + catch { case _: IllegalStateException => } + } + } + + val export_consumer = + Export.consumer(store.open_database(session_name, output = true), store.cache, + progress = progress) + + val stdout = new StringBuilder(1000) + val stderr = new StringBuilder(1000) + val command_timings = new mutable.ListBuffer[Properties.T] + val theory_timings = new mutable.ListBuffer[Properties.T] + val session_timings = new mutable.ListBuffer[Properties.T] + val runtime_statistics = new mutable.ListBuffer[Properties.T] + val task_statistics = new mutable.ListBuffer[Properties.T] + + def fun( + name: String, + acc: mutable.ListBuffer[Properties.T], + unapply: Properties.T => Option[Properties.T] + ): (String, Session.Protocol_Function) = { + name -> ((msg: Prover.Protocol_Output) => + unapply(msg.properties) match { + case Some(props) => acc += props; true + case _ => false + }) + } + + session.init_protocol_handler(new Session.Protocol_Handler { + override def exit(): Unit = Build_Session_Errors.cancel() + + private def build_session_finished(msg: Prover.Protocol_Output): Boolean = { + val (rc, errors) = + try { + val (rc, errs) = { + import XML.Decode._ + pair(int, list(x => x))(Symbol.decode_yxml(msg.text)) + } + val errors = + for (err <- errs) yield { + val prt = Protocol_Message.expose_no_reports(err) + Pretty.string_of(prt, metric = Symbol.Metric) + } + (rc, errors) + } + catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) } + + session.protocol_command("Prover.stop", rc.toString) + Build_Session_Errors(errors) + true + } + + private def loading_theory(msg: Prover.Protocol_Output): Boolean = + msg.properties match { + case Markup.Loading_Theory(Markup.Name(name)) => + progress.theory(Progress.Theory(name, session = session_name)) + false + case _ => false + } + + private def export_(msg: Prover.Protocol_Output): Boolean = + msg.properties match { + case Protocol.Export(args) => + export_consumer.make_entry(session_name, args, msg.chunk) + true + case _ => false + } + + override val functions: Session.Protocol_Functions = + List( + Markup.Build_Session_Finished.name -> build_session_finished, + Markup.Loading_Theory.name -> loading_theory, + Markup.EXPORT -> export_, + fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply), + fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply), + fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply)) + }) + + session.command_timings += Session.Consumer("command_timings") { + case Session.Command_Timing(props) => + for { + elapsed <- Markup.Elapsed.unapply(props) + elapsed_time = Time.seconds(elapsed) + if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold") + } command_timings += props.filter(Markup.command_timing_property) + } + + session.runtime_statistics += Session.Consumer("ML_statistics") { + case Session.Runtime_Statistics(props) => runtime_statistics += props + } + + session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") { + case snapshot => + if (!progress.stopped) { + def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = { + if (!progress.stopped) { + val theory_name = snapshot.node_name.theory + val args = + Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress) + val body = Bytes(Symbol.encode(YXML.string_of_body(xml))) + export_consumer.make_entry(session_name, args, body) + } + } + def export_text(name: String, text: String, compress: Boolean = true): Unit = + export_(name, List(XML.Text(text)), compress = compress) + + for (command <- snapshot.snippet_command) { + export_text(Export.DOCUMENT_ID, command.id.toString, compress = false) + } + + export_text(Export.FILES, + cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))), + compress = false) + + for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) { + val xml = snapshot.switch(blob_name).xml_markup() + export_(Export.MARKUP + (i + 1), xml) + } + export_(Export.MARKUP, snapshot.xml_markup()) + export_(Export.MESSAGES, snapshot.messages.map(_._1)) + } + } + + session.all_messages += Session.Consumer[Any]("build_session_output") { + case msg: Prover.Output => + val message = msg.message + if (msg.is_system) resources.log(Protocol.message_text(message)) + + if (msg.is_stdout) { + stdout ++= Symbol.encode(XML.content(message)) + } + else if (msg.is_stderr) { + stderr ++= Symbol.encode(XML.content(message)) + } + else if (msg.is_exit) { + val err = + "Prover terminated" + + (msg.properties match { + case Markup.Process_Result(result) => ": " + result.print_rc + case _ => "" + }) + Build_Session_Errors(List(err)) + } + case _ => + } + + session_setup(session_name, session) + + val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) + + val process = + Isabelle_Process.start(store, options, session, session_background, + logic = parent, raw_ml_system = is_pure, + use_prelude = use_prelude, eval_main = eval_main, + cwd = info.dir.file, env = env) + + val build_errors = + Isabelle_Thread.interrupt_handler(_ => process.terminate()) { + Exn.capture { process.await_startup() } match { + case Exn.Res(_) => + val resources_yxml = resources.init_session_yxml + val encode_options: XML.Encode.T[Options] = + options => session.prover_options(options).encode + val args_yxml = + YXML.string_of_body( + { + import XML.Encode._ + pair(string, list(pair(encode_options, list(pair(string, properties)))))( + (session_name, info.theories)) + }) + session.protocol_command("build_session", resources_yxml, args_yxml) + Build_Session_Errors.result + case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn))) + } + } + + val process_result = + Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() } + + session.stop() + + val export_errors = + export_consumer.shutdown(close = true).map(Output.error_message_text) + + val (document_output, document_errors) = + try { + if (build_errors.isInstanceOf[Exn.Res[_]] && process_result.ok && info.documents.nonEmpty) { + using(Export.open_database_context(store)) { database_context => + val documents = + using(database_context.open_session(session_background)) { + session_context => + Document_Build.build_documents( + Document_Build.context(session_context, progress = progress), + output_sources = info.document_output, + output_pdf = info.document_output) + } + using(database_context.open_database(session_name, output = true))(session_database => + documents.foreach(_.write(session_database.db, session_name))) + (documents.flatMap(_.log_lines), Nil) + } + } + else (Nil, Nil) + } + catch { + case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors) + case Exn.Interrupt.ERROR(msg) => (Nil, List(msg)) + } + + val result = { + val theory_timing = + theory_timings.iterator.flatMap( + { + case props @ Markup.Name(name) => Some(name -> props) + case _ => None + }).toMap + val used_theory_timings = + for { (name, _) <- session_background.base.used_theories } + yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory)) + + val more_output = + Library.trim_line(stdout.toString) :: + command_timings.toList.map(Protocol.Command_Timing_Marker.apply) ::: + used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) ::: + session_timings.toList.map(Protocol.Session_Timing_Marker.apply) ::: + runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) ::: + task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) ::: + document_output + + process_result.output(more_output) + .error(Library.trim_line(stderr.toString)) + .errors_rc(export_errors ::: document_errors) + } + + build_errors match { + case Exn.Res(build_errs) => + val errs = build_errs ::: document_errors + if (errs.nonEmpty) { + result.error_rc.output( + errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: + errs.map(Protocol.Error_Message_Marker.apply)) + } + else if (progress.stopped && result.ok) result.copy(rc = Process_Result.RC.interrupt) + else result + case Exn.Exn(Exn.Interrupt()) => + if (result.ok) result.copy(rc = Process_Result.RC.interrupt) + else result + case Exn.Exn(exn) => throw exn + } + } + + override def start(): Unit = future_result + override def terminate(): Unit = future_result.cancel() + override def is_finished: Boolean = future_result.is_finished + + private val timeout_request: Option[Event_Timer.Request] = { + if (info.timeout_ignored) None + else Some(Event_Timer.request(Time.now() + info.timeout) { terminate() }) + } + + override def join: Process_Result = { + val result = future_result.join + + val was_timeout = + timeout_request match { + case None => false + case Some(request) => !request.cancel() + } + + if (result.ok) result + else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc + else if (result.interrupted) result.error(Output.error_message_text("Interrupt")) + else result + } + + lazy val finish: SHA1.Shasum = { + require(is_finished, "Build job not finished: " + quote(session_name)) + if (join.ok && do_store && store.output_heap(session_name).is_file) { + SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name) + } + else SHA1.no_shasum + } + } + /* theory markup/messages from session database */ def read_theory( @@ -234,350 +600,3 @@ } }) } - -class Build_Job(progress: Progress, - session_background: Sessions.Background, - store: Sessions.Store, - val do_store: Boolean, - resources: Resources, - session_setup: (String, Session) => Unit, - val numa_node: Option[Int] -) { - def session_name: String = session_background.session_name - val info: Sessions.Info = session_background.sessions_structure(session_name) - val options: Options = NUMA.policy_options(info.options, numa_node) - - val session_sources: Sessions.Sources = - Sessions.Sources.load(session_background.base, cache = store.cache.compress) - - private val future_result: Future[Process_Result] = - Future.thread("build", uninterruptible = true) { - val parent = info.parent.getOrElse("") - - val env = - Isabelle_System.settings( - List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString)) - - val is_pure = Sessions.is_pure(session_name) - - val use_prelude = if (is_pure) Thy_Header.ml_roots.map(_._1) else Nil - - val eval_store = - if (do_store) { - (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) ::: - List("ML_Heap.save_child " + - ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name)))) - } - else Nil - - def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] = - session_background.base.theory_load_commands.get(node_name.theory) match { - case None => Nil - case Some(spans) => - val syntax = session_background.base.theory_syntax(node_name) - val master_dir = Path.explode(node_name.master_dir) - for (span <- spans; file <- span.loaded_files(syntax).files) - yield { - val src_path = Path.explode(file) - val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path)) - - val bytes = session_sources(blob_name.node).bytes - val text = bytes.text - val chunk = Symbol.Text_Chunk(text) - - Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) -> - Document.Blobs.Item(bytes, text, chunk, changed = false) - } - } - - val session = - new Session(options, resources) { - override val cache: Term.Cache = store.cache - - override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info = - Command.Blobs_Info.make(session_blobs(node_name)) - - override def build_blobs(node_name: Document.Node.Name): Document.Blobs = - Document.Blobs.make(session_blobs(node_name)) - } - - object Build_Session_Errors { - private val promise: Promise[List[String]] = Future.promise - - def result: Exn.Result[List[String]] = promise.join_result - def cancel(): Unit = promise.cancel() - def apply(errs: List[String]): Unit = { - try { promise.fulfill(errs) } - catch { case _: IllegalStateException => } - } - } - - val export_consumer = - Export.consumer(store.open_database(session_name, output = true), store.cache, - progress = progress) - - val stdout = new StringBuilder(1000) - val stderr = new StringBuilder(1000) - val command_timings = new mutable.ListBuffer[Properties.T] - val theory_timings = new mutable.ListBuffer[Properties.T] - val session_timings = new mutable.ListBuffer[Properties.T] - val runtime_statistics = new mutable.ListBuffer[Properties.T] - val task_statistics = new mutable.ListBuffer[Properties.T] - - def fun( - name: String, - acc: mutable.ListBuffer[Properties.T], - unapply: Properties.T => Option[Properties.T] - ): (String, Session.Protocol_Function) = { - name -> ((msg: Prover.Protocol_Output) => - unapply(msg.properties) match { - case Some(props) => acc += props; true - case _ => false - }) - } - - session.init_protocol_handler(new Session.Protocol_Handler { - override def exit(): Unit = Build_Session_Errors.cancel() - - private def build_session_finished(msg: Prover.Protocol_Output): Boolean = { - val (rc, errors) = - try { - val (rc, errs) = { - import XML.Decode._ - pair(int, list(x => x))(Symbol.decode_yxml(msg.text)) - } - val errors = - for (err <- errs) yield { - val prt = Protocol_Message.expose_no_reports(err) - Pretty.string_of(prt, metric = Symbol.Metric) - } - (rc, errors) - } - catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) } - - session.protocol_command("Prover.stop", rc.toString) - Build_Session_Errors(errors) - true - } - - private def loading_theory(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Markup.Loading_Theory(Markup.Name(name)) => - progress.theory(Progress.Theory(name, session = session_name)) - false - case _ => false - } - - private def export_(msg: Prover.Protocol_Output): Boolean = - msg.properties match { - case Protocol.Export(args) => - export_consumer.make_entry(session_name, args, msg.chunk) - true - case _ => false - } - - override val functions: Session.Protocol_Functions = - List( - Markup.Build_Session_Finished.name -> build_session_finished, - Markup.Loading_Theory.name -> loading_theory, - Markup.EXPORT -> export_, - fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply), - fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply), - fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply)) - }) - - session.command_timings += Session.Consumer("command_timings") { - case Session.Command_Timing(props) => - for { - elapsed <- Markup.Elapsed.unapply(props) - elapsed_time = Time.seconds(elapsed) - if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold") - } command_timings += props.filter(Markup.command_timing_property) - } - - session.runtime_statistics += Session.Consumer("ML_statistics") { - case Session.Runtime_Statistics(props) => runtime_statistics += props - } - - session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") { - case snapshot => - if (!progress.stopped) { - def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = { - if (!progress.stopped) { - val theory_name = snapshot.node_name.theory - val args = - Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress) - val body = Bytes(Symbol.encode(YXML.string_of_body(xml))) - export_consumer.make_entry(session_name, args, body) - } - } - def export_text(name: String, text: String, compress: Boolean = true): Unit = - export_(name, List(XML.Text(text)), compress = compress) - - for (command <- snapshot.snippet_command) { - export_text(Export.DOCUMENT_ID, command.id.toString, compress = false) - } - - export_text(Export.FILES, - cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))), - compress = false) - - for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) { - val xml = snapshot.switch(blob_name).xml_markup() - export_(Export.MARKUP + (i + 1), xml) - } - export_(Export.MARKUP, snapshot.xml_markup()) - export_(Export.MESSAGES, snapshot.messages.map(_._1)) - } - } - - session.all_messages += Session.Consumer[Any]("build_session_output") { - case msg: Prover.Output => - val message = msg.message - if (msg.is_system) resources.log(Protocol.message_text(message)) - - if (msg.is_stdout) { - stdout ++= Symbol.encode(XML.content(message)) - } - else if (msg.is_stderr) { - stderr ++= Symbol.encode(XML.content(message)) - } - else if (msg.is_exit) { - val err = - "Prover terminated" + - (msg.properties match { - case Markup.Process_Result(result) => ": " + result.print_rc - case _ => "" - }) - Build_Session_Errors(List(err)) - } - case _ => - } - - session_setup(session_name, session) - - val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store) - - val process = - Isabelle_Process.start(store, options, session, session_background, - logic = parent, raw_ml_system = is_pure, - use_prelude = use_prelude, eval_main = eval_main, - cwd = info.dir.file, env = env) - - val build_errors = - Isabelle_Thread.interrupt_handler(_ => process.terminate()) { - Exn.capture { process.await_startup() } match { - case Exn.Res(_) => - val resources_yxml = resources.init_session_yxml - val encode_options: XML.Encode.T[Options] = - options => session.prover_options(options).encode - val args_yxml = - YXML.string_of_body( - { - import XML.Encode._ - pair(string, list(pair(encode_options, list(pair(string, properties)))))( - (session_name, info.theories)) - }) - session.protocol_command("build_session", resources_yxml, args_yxml) - Build_Session_Errors.result - case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn))) - } - } - - val process_result = - Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() } - - session.stop() - - val export_errors = - export_consumer.shutdown(close = true).map(Output.error_message_text) - - val (document_output, document_errors) = - try { - if (build_errors.isInstanceOf[Exn.Res[_]] && process_result.ok && info.documents.nonEmpty) { - using(Export.open_database_context(store)) { database_context => - val documents = - using(database_context.open_session(session_background)) { - session_context => - Document_Build.build_documents( - Document_Build.context(session_context, progress = progress), - output_sources = info.document_output, - output_pdf = info.document_output) - } - using(database_context.open_database(session_name, output = true))(session_database => - documents.foreach(_.write(session_database.db, session_name))) - (documents.flatMap(_.log_lines), Nil) - } - } - else (Nil, Nil) - } - catch { - case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors) - case Exn.Interrupt.ERROR(msg) => (Nil, List(msg)) - } - - val result = { - val theory_timing = - theory_timings.iterator.flatMap( - { - case props @ Markup.Name(name) => Some(name -> props) - case _ => None - }).toMap - val used_theory_timings = - for { (name, _) <- session_background.base.used_theories } - yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory)) - - val more_output = - Library.trim_line(stdout.toString) :: - command_timings.toList.map(Protocol.Command_Timing_Marker.apply) ::: - used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) ::: - session_timings.toList.map(Protocol.Session_Timing_Marker.apply) ::: - runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) ::: - task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) ::: - document_output - - process_result.output(more_output) - .error(Library.trim_line(stderr.toString)) - .errors_rc(export_errors ::: document_errors) - } - - build_errors match { - case Exn.Res(build_errs) => - val errs = build_errs ::: document_errors - if (errs.nonEmpty) { - result.error_rc.output( - errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: - errs.map(Protocol.Error_Message_Marker.apply)) - } - else if (progress.stopped && result.ok) result.copy(rc = Process_Result.RC.interrupt) - else result - case Exn.Exn(Exn.Interrupt()) => - if (result.ok) result.copy(rc = Process_Result.RC.interrupt) - else result - case Exn.Exn(exn) => throw exn - } - } - - def terminate(): Unit = future_result.cancel() - def is_finished: Boolean = future_result.is_finished - - private val timeout_request: Option[Event_Timer.Request] = { - if (info.timeout_ignored) None - else Some(Event_Timer.request(Time.now() + info.timeout) { terminate() }) - } - - def join: Process_Result = { - val result = future_result.join - - val was_timeout = - timeout_request match { - case None => false - case Some(request) => !request.cancel() - } - - if (result.ok) result - else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc - else if (result.interrupted) result.error(Output.error_message_text("Interrupt")) - else result - } -} diff -r 98dab34ed72d -r 632a92fcb673 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sun Feb 19 09:55:37 2023 +0000 +++ b/src/Pure/Tools/build_process.scala Sun Feb 19 13:51:49 2023 +0100 @@ -160,13 +160,6 @@ private val build_deps = build_context.deps private val progress = build_context.progress - // global state - private val numa_nodes = new NUMA.Nodes(numa_shuffling) - private var build_graph = build_context.sessions_structure.build_graph - private var build_order = SortedSet.from(build_graph.keys)(build_context.ordering) - private var running = Map.empty[String, (SHA1.Shasum, Build_Job)] - private var results = Map.empty[String, Build_Process.Result] - private val log = build_options.string("system_log") match { case "" => No_Logger @@ -174,19 +167,65 @@ case log_file => Logger.make(Some(Path.explode(log_file))) } - private def remove_pending(name: String): Unit = { - build_graph = build_graph.del_node(name) - build_order = build_order - name + // global state + private val _numa_nodes = new NUMA.Nodes(numa_shuffling) + private var _build_graph = build_context.sessions_structure.build_graph + private var _build_order = SortedSet.from(_build_graph.keys)(build_context.ordering) + private var _running = Map.empty[String, Build_Job] + private var _results = Map.empty[String, Build_Process.Result] + + private def remove_pending(name: String): Unit = synchronized { + _build_graph = _build_graph.del_node(name) + _build_order = _build_order - name + } + + private def next_pending(): Option[String] = synchronized { + if (_running.size < (max_jobs max 1)) { + _build_order.iterator + .dropWhile(name => _running.isDefinedAt(name) || !_build_graph.is_minimal(name)) + .nextOption() + } + else None + } + + private def next_numa_node(): Option[Int] = synchronized { + _numa_nodes.next(used = + Set.from(for { job <- _running.valuesIterator; i <- job.numa_node } yield i)) } - private def next_pending(): Option[String] = - build_order.iterator - .dropWhile(name => running.isDefinedAt(name) || !build_graph.is_minimal(name)) - .nextOption() + private def test_running(): Boolean = synchronized { !_build_graph.is_empty } + + private def stop_running(): Unit = synchronized { _running.valuesIterator.foreach(_.terminate()) } + + private def finished_running(): List[Build_Job.Build_Session] = synchronized { + List.from( + _running.valuesIterator.flatMap { + case job: Build_Job.Build_Session if job.is_finished => Some(job) + case _ => None + }) + } + + private def job_running(name: String, job: Build_Job): Build_Job = synchronized { + _running += (name -> job) + job + } - private def used_node(i: Int): Boolean = - running.iterator.exists( - { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i }) + private def remove_running(name: String): Unit = synchronized { + _running -= name + } + + private def add_result( + name: String, + current: Boolean, + output_heap: SHA1.Shasum, + process_result: Process_Result + ): Unit = synchronized { + _results += (name -> Build_Process.Result(current, output_heap, process_result)) + } + + private def get_results(names: List[String]): List[Build_Process.Result] = synchronized { + names.map(_results.apply) + } private def session_finished(session_name: String, process_result: Process_Result): String = "Finished " + session_name + " (" + process_result.timing.message_resources + ")" @@ -198,14 +237,10 @@ "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")" } - private def finish_job(session_name: String, input_heaps: SHA1.Shasum, job: Build_Job): Unit = { + private def finish_job(job: Build_Job.Build_Session): Unit = { + val session_name = job.session_name val process_result = job.join - - val output_heap = - if (process_result.ok && job.do_store && store.output_heap(session_name).is_file) { - SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name) - } - else SHA1.no_shasum + val output_heap = job.finish val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) val process_result_tail = { @@ -236,7 +271,7 @@ build_log = if (process_result.timeout) build_log.error("Timeout") else build_log, build = - Build.Session_Info(build_deps.sources_shasum(session_name), input_heaps, + Sessions.Build_Info(build_deps.sources_shasum(session_name), job.input_heaps, output_heap, process_result.rc, UUID.random().toString))) // messages @@ -251,15 +286,18 @@ if (!process_result.interrupted) progress.echo(process_result_tail.out) } - remove_pending(session_name) - running -= session_name - results += (session_name -> Build_Process.Result(false, output_heap, process_result_tail)) + synchronized { + remove_pending(session_name) + remove_running(session_name) + add_result(session_name, false, output_heap, process_result_tail) + } } private def start_job(session_name: String): Unit = { val ancestor_results = - build_deps.sessions_structure.build_requirements(List(session_name)). - filterNot(_ == session_name).map(results(_)) + get_results( + build_deps.sessions_structure.build_requirements(List(session_name)). + filterNot(_ == session_name)) val input_heaps = if (ancestor_results.isEmpty) { SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE"))) @@ -289,13 +327,17 @@ val all_current = current && ancestor_results.forall(_.current) if (all_current) { - remove_pending(session_name) - results += (session_name -> Build_Process.Result(true, output_heap, Process_Result.ok)) + synchronized { + remove_pending(session_name) + add_result(session_name, true, output_heap, Process_Result.ok) + } } else if (no_build) { progress.echo_if(verbose, "Skipping " + session_name + " ...") - remove_pending(session_name) - results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.error)) + synchronized { + remove_pending(session_name) + add_result(session_name, false, output_heap, Process_Result.error) + } } else if (ancestor_results.forall(_.ok) && !progress.stopped) { progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...") @@ -309,16 +351,21 @@ new Resources(session_background, log = log, command_timings = build_context(session_name).old_command_timings) - val numa_node = numa_nodes.next(used_node) val job = - new Build_Job(progress, session_background, store, do_store, - resources, session_setup, numa_node) - running += (session_name -> (input_heaps, job)) + synchronized { + val numa_node = next_numa_node() + job_running(session_name, + new Build_Job.Build_Session(progress, session_background, store, do_store, + resources, session_setup, input_heaps, numa_node)) + } + job.start() } else { progress.echo(session_name + " CANCELLED") - remove_pending(session_name) - results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.undefined)) + synchronized { + remove_pending(session_name) + add_result(session_name, false, output_heap, Process_Result.undefined) + } } } @@ -328,23 +375,17 @@ } def run(): Map[String, Build_Process.Result] = { - while (!build_graph.is_empty) { - if (progress.stopped) { - for ((_, (_, job)) <- running) job.terminate() - } + while (test_running()) { + if (progress.stopped) stop_running() - running.find({ case (_, (_, job)) => job.is_finished }) match { - case Some((session_name, (input_heaps, job))) => - finish_job(session_name, input_heaps, job) - case None if running.size < (max_jobs max 1) => - next_pending() match { - case Some(session_name) => start_job(session_name) - case None => sleep() - } + for (job <- finished_running()) finish_job(job) + + next_pending() match { + case Some(session_name) => start_job(session_name) case None => sleep() } } - results + synchronized { _results } } }