--- a/src/Pure/General/logger.scala Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/General/logger.scala Sun Feb 19 13:51:49 2023 +0100
@@ -35,7 +35,8 @@
}
class System_Logger extends Logger {
- def apply(msg: => String): Unit =
+ def apply(msg: => String): Unit = synchronized {
if (Platform.is_windows) System.out.println(msg)
else System.console.writer.println(msg)
+ }
}
--- a/src/Pure/PIDE/document.scala Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/PIDE/document.scala Sun Feb 19 13:51:49 2023 +0100
@@ -371,14 +371,16 @@
object Nodes {
val empty: Nodes = new Nodes(Graph.empty(Node.Name.Ordering))
+
+ private def init(graph: Graph[Node.Name, Node], name: Node.Name): Graph[Node.Name, Node] =
+ graph.default_node(name, Node.empty)
}
final class Nodes private(graph: Graph[Node.Name, Node]) {
- def apply(name: Node.Name): Node =
- graph.default_node(name, Node.empty).get_node(name)
+ def apply(name: Node.Name): Node = Nodes.init(graph, name).get_node(name)
def is_suppressed(name: Node.Name): Boolean = {
- val graph1 = graph.default_node(name, Node.empty)
+ val graph1 = Nodes.init(graph, name)
graph1.is_maximal(name) && graph1.get_node(name).is_empty
}
@@ -391,10 +393,7 @@
def + (entry: (Node.Name, Node)): Nodes = {
val (name, node) = entry
val imports = node.header.imports
- val graph1 =
- imports.foldLeft(graph.default_node(name, Node.empty)) {
- case (g, p) => g.default_node(p, Node.empty)
- }
+ val graph1 = (name :: imports).foldLeft(graph)(Nodes.init)
val graph2 =
graph1.imm_preds(name).foldLeft(graph1) { case (g, dep) => g.del_edge(dep, name) }
val graph3 = imports.foldLeft(graph2) { case (g, dep) => g.add_edge(dep, name) }
@@ -417,8 +416,8 @@
if name == file_name
} yield cmd).toList
- def descendants(names: List[Node.Name]): List[Node.Name] = graph.all_succs(names)
- def requirements(names: List[Node.Name]): List[Node.Name] = graph.all_preds_rev(names)
+ def descendants(names: List[Node.Name]): List[Node.Name] =
+ names.foldLeft(graph)(Nodes.init).all_succs(names)
def topological_order: List[Node.Name] = graph.topological_order
override def toString: String = topological_order.mkString("Nodes(", ",", ")")
--- a/src/Pure/Thy/sessions.scala Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/Thy/sessions.scala Sun Feb 19 13:51:49 2023 +0100
@@ -1331,6 +1331,18 @@
/** persistent store **/
+ /** auxiliary **/
+
+ sealed case class Build_Info(
+ sources: SHA1.Shasum,
+ input_heaps: SHA1.Shasum,
+ output_heap: SHA1.Shasum,
+ return_code: Int,
+ uuid: String
+ ) {
+ def ok: Boolean = return_code == 0
+ }
+
object Session_Info {
val session_name = SQL.Column.string("session_name").make_primary_key
@@ -1345,7 +1357,7 @@
List(session_name, session_timing, command_timings, theory_timings,
ml_statistics, task_statistics, errors)
- // Build.Session_Info
+ // Build_Info
val sources = SQL.Column.string("sources")
val input_heaps = SQL.Column.string("input_heaps")
val output_heap = SQL.Column.string("output_heap")
@@ -1553,7 +1565,7 @@
session_name: String,
sources: Sources,
build_log: Build_Log.Session_Info,
- build: Build.Session_Info
+ build: Build_Info
): Unit = {
db.transaction {
write_sources(db, session_name, sources)
@@ -1596,7 +1608,7 @@
def read_errors(db: SQL.Database, name: String): List[String] =
Build_Log.uncompress_errors(read_bytes(db, name, Session_Info.errors), cache = cache)
- def read_build(db: SQL.Database, name: String): Option[Build.Session_Info] = {
+ def read_build(db: SQL.Database, name: String): Option[Build_Info] = {
if (db.tables.contains(Session_Info.table.name)) {
db.using_statement(Session_Info.table.select(Nil,
Session_Info.session_name.where_equal(name))) { stmt =>
@@ -1607,7 +1619,7 @@
try { Option(res.string(Session_Info.uuid)).getOrElse("") }
catch { case _: SQLException => "" }
Some(
- Build.Session_Info(
+ Build_Info(
SHA1.fake_shasum(res.string(Session_Info.sources)),
SHA1.fake_shasum(res.string(Session_Info.input_heaps)),
SHA1.fake_shasum(res.string(Session_Info.output_heap)),
--- a/src/Pure/Tools/build.scala Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/Tools/build.scala Sun Feb 19 13:51:49 2023 +0100
@@ -9,22 +9,6 @@
object Build {
- /** auxiliary **/
-
- /* persistent build info */
-
- sealed case class Session_Info(
- sources: SHA1.Shasum,
- input_heaps: SHA1.Shasum,
- output_heap: SHA1.Shasum,
- return_code: Int,
- uuid: String
- ) {
- def ok: Boolean = return_code == 0
- }
-
-
-
/** build with results **/
class Results private[Build](
--- a/src/Pure/Tools/build_job.scala Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/Tools/build_job.scala Sun Feb 19 13:51:49 2023 +0100
@@ -11,7 +11,373 @@
import scala.util.matching.Regex
+trait Build_Job {
+ def session_name: String
+ def numa_node: Option[Int] = None
+ def start(): Unit = ()
+ def terminate(): Unit = ()
+ def is_finished: Boolean = false
+ def join: Process_Result = Process_Result.undefined
+}
+
object Build_Job {
+ class Build_Session(progress: Progress,
+ session_background: Sessions.Background,
+ store: Sessions.Store,
+ val do_store: Boolean,
+ resources: Resources,
+ session_setup: (String, Session) => Unit,
+ val input_heaps: SHA1.Shasum,
+ override val numa_node: Option[Int]
+ ) extends Build_Job {
+ def session_name: String = session_background.session_name
+ val info: Sessions.Info = session_background.sessions_structure(session_name)
+ val options: Options = NUMA.policy_options(info.options, numa_node)
+
+ val session_sources: Sessions.Sources =
+ Sessions.Sources.load(session_background.base, cache = store.cache.compress)
+
+ private lazy val future_result: Future[Process_Result] =
+ Future.thread("build", uninterruptible = true) {
+ val parent = info.parent.getOrElse("")
+
+ val env =
+ Isabelle_System.settings(
+ List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
+
+ val is_pure = Sessions.is_pure(session_name)
+
+ val use_prelude = if (is_pure) Thy_Header.ml_roots.map(_._1) else Nil
+
+ val eval_store =
+ if (do_store) {
+ (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
+ List("ML_Heap.save_child " +
+ ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
+ }
+ else Nil
+
+ def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
+ session_background.base.theory_load_commands.get(node_name.theory) match {
+ case None => Nil
+ case Some(spans) =>
+ val syntax = session_background.base.theory_syntax(node_name)
+ val master_dir = Path.explode(node_name.master_dir)
+ for (span <- spans; file <- span.loaded_files(syntax).files)
+ yield {
+ val src_path = Path.explode(file)
+ val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
+
+ val bytes = session_sources(blob_name.node).bytes
+ val text = bytes.text
+ val chunk = Symbol.Text_Chunk(text)
+
+ Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
+ Document.Blobs.Item(bytes, text, chunk, changed = false)
+ }
+ }
+
+ val session =
+ new Session(options, resources) {
+ override val cache: Term.Cache = store.cache
+
+ override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
+ Command.Blobs_Info.make(session_blobs(node_name))
+
+ override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
+ Document.Blobs.make(session_blobs(node_name))
+ }
+
+ object Build_Session_Errors {
+ private val promise: Promise[List[String]] = Future.promise
+
+ def result: Exn.Result[List[String]] = promise.join_result
+ def cancel(): Unit = promise.cancel()
+ def apply(errs: List[String]): Unit = {
+ try { promise.fulfill(errs) }
+ catch { case _: IllegalStateException => }
+ }
+ }
+
+ val export_consumer =
+ Export.consumer(store.open_database(session_name, output = true), store.cache,
+ progress = progress)
+
+ val stdout = new StringBuilder(1000)
+ val stderr = new StringBuilder(1000)
+ val command_timings = new mutable.ListBuffer[Properties.T]
+ val theory_timings = new mutable.ListBuffer[Properties.T]
+ val session_timings = new mutable.ListBuffer[Properties.T]
+ val runtime_statistics = new mutable.ListBuffer[Properties.T]
+ val task_statistics = new mutable.ListBuffer[Properties.T]
+
+ def fun(
+ name: String,
+ acc: mutable.ListBuffer[Properties.T],
+ unapply: Properties.T => Option[Properties.T]
+ ): (String, Session.Protocol_Function) = {
+ name -> ((msg: Prover.Protocol_Output) =>
+ unapply(msg.properties) match {
+ case Some(props) => acc += props; true
+ case _ => false
+ })
+ }
+
+ session.init_protocol_handler(new Session.Protocol_Handler {
+ override def exit(): Unit = Build_Session_Errors.cancel()
+
+ private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
+ val (rc, errors) =
+ try {
+ val (rc, errs) = {
+ import XML.Decode._
+ pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
+ }
+ val errors =
+ for (err <- errs) yield {
+ val prt = Protocol_Message.expose_no_reports(err)
+ Pretty.string_of(prt, metric = Symbol.Metric)
+ }
+ (rc, errors)
+ }
+ catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
+
+ session.protocol_command("Prover.stop", rc.toString)
+ Build_Session_Errors(errors)
+ true
+ }
+
+ private def loading_theory(msg: Prover.Protocol_Output): Boolean =
+ msg.properties match {
+ case Markup.Loading_Theory(Markup.Name(name)) =>
+ progress.theory(Progress.Theory(name, session = session_name))
+ false
+ case _ => false
+ }
+
+ private def export_(msg: Prover.Protocol_Output): Boolean =
+ msg.properties match {
+ case Protocol.Export(args) =>
+ export_consumer.make_entry(session_name, args, msg.chunk)
+ true
+ case _ => false
+ }
+
+ override val functions: Session.Protocol_Functions =
+ List(
+ Markup.Build_Session_Finished.name -> build_session_finished,
+ Markup.Loading_Theory.name -> loading_theory,
+ Markup.EXPORT -> export_,
+ fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
+ fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
+ fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
+ })
+
+ session.command_timings += Session.Consumer("command_timings") {
+ case Session.Command_Timing(props) =>
+ for {
+ elapsed <- Markup.Elapsed.unapply(props)
+ elapsed_time = Time.seconds(elapsed)
+ if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
+ } command_timings += props.filter(Markup.command_timing_property)
+ }
+
+ session.runtime_statistics += Session.Consumer("ML_statistics") {
+ case Session.Runtime_Statistics(props) => runtime_statistics += props
+ }
+
+ session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
+ case snapshot =>
+ if (!progress.stopped) {
+ def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
+ if (!progress.stopped) {
+ val theory_name = snapshot.node_name.theory
+ val args =
+ Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
+ val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
+ export_consumer.make_entry(session_name, args, body)
+ }
+ }
+ def export_text(name: String, text: String, compress: Boolean = true): Unit =
+ export_(name, List(XML.Text(text)), compress = compress)
+
+ for (command <- snapshot.snippet_command) {
+ export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
+ }
+
+ export_text(Export.FILES,
+ cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
+ compress = false)
+
+ for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
+ val xml = snapshot.switch(blob_name).xml_markup()
+ export_(Export.MARKUP + (i + 1), xml)
+ }
+ export_(Export.MARKUP, snapshot.xml_markup())
+ export_(Export.MESSAGES, snapshot.messages.map(_._1))
+ }
+ }
+
+ session.all_messages += Session.Consumer[Any]("build_session_output") {
+ case msg: Prover.Output =>
+ val message = msg.message
+ if (msg.is_system) resources.log(Protocol.message_text(message))
+
+ if (msg.is_stdout) {
+ stdout ++= Symbol.encode(XML.content(message))
+ }
+ else if (msg.is_stderr) {
+ stderr ++= Symbol.encode(XML.content(message))
+ }
+ else if (msg.is_exit) {
+ val err =
+ "Prover terminated" +
+ (msg.properties match {
+ case Markup.Process_Result(result) => ": " + result.print_rc
+ case _ => ""
+ })
+ Build_Session_Errors(List(err))
+ }
+ case _ =>
+ }
+
+ session_setup(session_name, session)
+
+ val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
+
+ val process =
+ Isabelle_Process.start(store, options, session, session_background,
+ logic = parent, raw_ml_system = is_pure,
+ use_prelude = use_prelude, eval_main = eval_main,
+ cwd = info.dir.file, env = env)
+
+ val build_errors =
+ Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
+ Exn.capture { process.await_startup() } match {
+ case Exn.Res(_) =>
+ val resources_yxml = resources.init_session_yxml
+ val encode_options: XML.Encode.T[Options] =
+ options => session.prover_options(options).encode
+ val args_yxml =
+ YXML.string_of_body(
+ {
+ import XML.Encode._
+ pair(string, list(pair(encode_options, list(pair(string, properties)))))(
+ (session_name, info.theories))
+ })
+ session.protocol_command("build_session", resources_yxml, args_yxml)
+ Build_Session_Errors.result
+ case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
+ }
+ }
+
+ val process_result =
+ Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
+
+ session.stop()
+
+ val export_errors =
+ export_consumer.shutdown(close = true).map(Output.error_message_text)
+
+ val (document_output, document_errors) =
+ try {
+ if (build_errors.isInstanceOf[Exn.Res[_]] && process_result.ok && info.documents.nonEmpty) {
+ using(Export.open_database_context(store)) { database_context =>
+ val documents =
+ using(database_context.open_session(session_background)) {
+ session_context =>
+ Document_Build.build_documents(
+ Document_Build.context(session_context, progress = progress),
+ output_sources = info.document_output,
+ output_pdf = info.document_output)
+ }
+ using(database_context.open_database(session_name, output = true))(session_database =>
+ documents.foreach(_.write(session_database.db, session_name)))
+ (documents.flatMap(_.log_lines), Nil)
+ }
+ }
+ else (Nil, Nil)
+ }
+ catch {
+ case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
+ case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
+ }
+
+ val result = {
+ val theory_timing =
+ theory_timings.iterator.flatMap(
+ {
+ case props @ Markup.Name(name) => Some(name -> props)
+ case _ => None
+ }).toMap
+ val used_theory_timings =
+ for { (name, _) <- session_background.base.used_theories }
+ yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
+
+ val more_output =
+ Library.trim_line(stdout.toString) ::
+ command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
+ used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
+ session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
+ runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
+ task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
+ document_output
+
+ process_result.output(more_output)
+ .error(Library.trim_line(stderr.toString))
+ .errors_rc(export_errors ::: document_errors)
+ }
+
+ build_errors match {
+ case Exn.Res(build_errs) =>
+ val errs = build_errs ::: document_errors
+ if (errs.nonEmpty) {
+ result.error_rc.output(
+ errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
+ errs.map(Protocol.Error_Message_Marker.apply))
+ }
+ else if (progress.stopped && result.ok) result.copy(rc = Process_Result.RC.interrupt)
+ else result
+ case Exn.Exn(Exn.Interrupt()) =>
+ if (result.ok) result.copy(rc = Process_Result.RC.interrupt)
+ else result
+ case Exn.Exn(exn) => throw exn
+ }
+ }
+
+ override def start(): Unit = future_result
+ override def terminate(): Unit = future_result.cancel()
+ override def is_finished: Boolean = future_result.is_finished
+
+ private val timeout_request: Option[Event_Timer.Request] = {
+ if (info.timeout_ignored) None
+ else Some(Event_Timer.request(Time.now() + info.timeout) { terminate() })
+ }
+
+ override def join: Process_Result = {
+ val result = future_result.join
+
+ val was_timeout =
+ timeout_request match {
+ case None => false
+ case Some(request) => !request.cancel()
+ }
+
+ if (result.ok) result
+ else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc
+ else if (result.interrupted) result.error(Output.error_message_text("Interrupt"))
+ else result
+ }
+
+ lazy val finish: SHA1.Shasum = {
+ require(is_finished, "Build job not finished: " + quote(session_name))
+ if (join.ok && do_store && store.output_heap(session_name).is_file) {
+ SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name)
+ }
+ else SHA1.no_shasum
+ }
+ }
+
/* theory markup/messages from session database */
def read_theory(
@@ -234,350 +600,3 @@
}
})
}
-
-class Build_Job(progress: Progress,
- session_background: Sessions.Background,
- store: Sessions.Store,
- val do_store: Boolean,
- resources: Resources,
- session_setup: (String, Session) => Unit,
- val numa_node: Option[Int]
-) {
- def session_name: String = session_background.session_name
- val info: Sessions.Info = session_background.sessions_structure(session_name)
- val options: Options = NUMA.policy_options(info.options, numa_node)
-
- val session_sources: Sessions.Sources =
- Sessions.Sources.load(session_background.base, cache = store.cache.compress)
-
- private val future_result: Future[Process_Result] =
- Future.thread("build", uninterruptible = true) {
- val parent = info.parent.getOrElse("")
-
- val env =
- Isabelle_System.settings(
- List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
-
- val is_pure = Sessions.is_pure(session_name)
-
- val use_prelude = if (is_pure) Thy_Header.ml_roots.map(_._1) else Nil
-
- val eval_store =
- if (do_store) {
- (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
- List("ML_Heap.save_child " +
- ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
- }
- else Nil
-
- def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
- session_background.base.theory_load_commands.get(node_name.theory) match {
- case None => Nil
- case Some(spans) =>
- val syntax = session_background.base.theory_syntax(node_name)
- val master_dir = Path.explode(node_name.master_dir)
- for (span <- spans; file <- span.loaded_files(syntax).files)
- yield {
- val src_path = Path.explode(file)
- val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
-
- val bytes = session_sources(blob_name.node).bytes
- val text = bytes.text
- val chunk = Symbol.Text_Chunk(text)
-
- Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
- Document.Blobs.Item(bytes, text, chunk, changed = false)
- }
- }
-
- val session =
- new Session(options, resources) {
- override val cache: Term.Cache = store.cache
-
- override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
- Command.Blobs_Info.make(session_blobs(node_name))
-
- override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
- Document.Blobs.make(session_blobs(node_name))
- }
-
- object Build_Session_Errors {
- private val promise: Promise[List[String]] = Future.promise
-
- def result: Exn.Result[List[String]] = promise.join_result
- def cancel(): Unit = promise.cancel()
- def apply(errs: List[String]): Unit = {
- try { promise.fulfill(errs) }
- catch { case _: IllegalStateException => }
- }
- }
-
- val export_consumer =
- Export.consumer(store.open_database(session_name, output = true), store.cache,
- progress = progress)
-
- val stdout = new StringBuilder(1000)
- val stderr = new StringBuilder(1000)
- val command_timings = new mutable.ListBuffer[Properties.T]
- val theory_timings = new mutable.ListBuffer[Properties.T]
- val session_timings = new mutable.ListBuffer[Properties.T]
- val runtime_statistics = new mutable.ListBuffer[Properties.T]
- val task_statistics = new mutable.ListBuffer[Properties.T]
-
- def fun(
- name: String,
- acc: mutable.ListBuffer[Properties.T],
- unapply: Properties.T => Option[Properties.T]
- ): (String, Session.Protocol_Function) = {
- name -> ((msg: Prover.Protocol_Output) =>
- unapply(msg.properties) match {
- case Some(props) => acc += props; true
- case _ => false
- })
- }
-
- session.init_protocol_handler(new Session.Protocol_Handler {
- override def exit(): Unit = Build_Session_Errors.cancel()
-
- private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
- val (rc, errors) =
- try {
- val (rc, errs) = {
- import XML.Decode._
- pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
- }
- val errors =
- for (err <- errs) yield {
- val prt = Protocol_Message.expose_no_reports(err)
- Pretty.string_of(prt, metric = Symbol.Metric)
- }
- (rc, errors)
- }
- catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
-
- session.protocol_command("Prover.stop", rc.toString)
- Build_Session_Errors(errors)
- true
- }
-
- private def loading_theory(msg: Prover.Protocol_Output): Boolean =
- msg.properties match {
- case Markup.Loading_Theory(Markup.Name(name)) =>
- progress.theory(Progress.Theory(name, session = session_name))
- false
- case _ => false
- }
-
- private def export_(msg: Prover.Protocol_Output): Boolean =
- msg.properties match {
- case Protocol.Export(args) =>
- export_consumer.make_entry(session_name, args, msg.chunk)
- true
- case _ => false
- }
-
- override val functions: Session.Protocol_Functions =
- List(
- Markup.Build_Session_Finished.name -> build_session_finished,
- Markup.Loading_Theory.name -> loading_theory,
- Markup.EXPORT -> export_,
- fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
- fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
- fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
- })
-
- session.command_timings += Session.Consumer("command_timings") {
- case Session.Command_Timing(props) =>
- for {
- elapsed <- Markup.Elapsed.unapply(props)
- elapsed_time = Time.seconds(elapsed)
- if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
- } command_timings += props.filter(Markup.command_timing_property)
- }
-
- session.runtime_statistics += Session.Consumer("ML_statistics") {
- case Session.Runtime_Statistics(props) => runtime_statistics += props
- }
-
- session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
- case snapshot =>
- if (!progress.stopped) {
- def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
- if (!progress.stopped) {
- val theory_name = snapshot.node_name.theory
- val args =
- Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
- val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
- export_consumer.make_entry(session_name, args, body)
- }
- }
- def export_text(name: String, text: String, compress: Boolean = true): Unit =
- export_(name, List(XML.Text(text)), compress = compress)
-
- for (command <- snapshot.snippet_command) {
- export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
- }
-
- export_text(Export.FILES,
- cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
- compress = false)
-
- for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
- val xml = snapshot.switch(blob_name).xml_markup()
- export_(Export.MARKUP + (i + 1), xml)
- }
- export_(Export.MARKUP, snapshot.xml_markup())
- export_(Export.MESSAGES, snapshot.messages.map(_._1))
- }
- }
-
- session.all_messages += Session.Consumer[Any]("build_session_output") {
- case msg: Prover.Output =>
- val message = msg.message
- if (msg.is_system) resources.log(Protocol.message_text(message))
-
- if (msg.is_stdout) {
- stdout ++= Symbol.encode(XML.content(message))
- }
- else if (msg.is_stderr) {
- stderr ++= Symbol.encode(XML.content(message))
- }
- else if (msg.is_exit) {
- val err =
- "Prover terminated" +
- (msg.properties match {
- case Markup.Process_Result(result) => ": " + result.print_rc
- case _ => ""
- })
- Build_Session_Errors(List(err))
- }
- case _ =>
- }
-
- session_setup(session_name, session)
-
- val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
-
- val process =
- Isabelle_Process.start(store, options, session, session_background,
- logic = parent, raw_ml_system = is_pure,
- use_prelude = use_prelude, eval_main = eval_main,
- cwd = info.dir.file, env = env)
-
- val build_errors =
- Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
- Exn.capture { process.await_startup() } match {
- case Exn.Res(_) =>
- val resources_yxml = resources.init_session_yxml
- val encode_options: XML.Encode.T[Options] =
- options => session.prover_options(options).encode
- val args_yxml =
- YXML.string_of_body(
- {
- import XML.Encode._
- pair(string, list(pair(encode_options, list(pair(string, properties)))))(
- (session_name, info.theories))
- })
- session.protocol_command("build_session", resources_yxml, args_yxml)
- Build_Session_Errors.result
- case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
- }
- }
-
- val process_result =
- Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
-
- session.stop()
-
- val export_errors =
- export_consumer.shutdown(close = true).map(Output.error_message_text)
-
- val (document_output, document_errors) =
- try {
- if (build_errors.isInstanceOf[Exn.Res[_]] && process_result.ok && info.documents.nonEmpty) {
- using(Export.open_database_context(store)) { database_context =>
- val documents =
- using(database_context.open_session(session_background)) {
- session_context =>
- Document_Build.build_documents(
- Document_Build.context(session_context, progress = progress),
- output_sources = info.document_output,
- output_pdf = info.document_output)
- }
- using(database_context.open_database(session_name, output = true))(session_database =>
- documents.foreach(_.write(session_database.db, session_name)))
- (documents.flatMap(_.log_lines), Nil)
- }
- }
- else (Nil, Nil)
- }
- catch {
- case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
- case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
- }
-
- val result = {
- val theory_timing =
- theory_timings.iterator.flatMap(
- {
- case props @ Markup.Name(name) => Some(name -> props)
- case _ => None
- }).toMap
- val used_theory_timings =
- for { (name, _) <- session_background.base.used_theories }
- yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
-
- val more_output =
- Library.trim_line(stdout.toString) ::
- command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
- used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
- session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
- runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
- task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
- document_output
-
- process_result.output(more_output)
- .error(Library.trim_line(stderr.toString))
- .errors_rc(export_errors ::: document_errors)
- }
-
- build_errors match {
- case Exn.Res(build_errs) =>
- val errs = build_errs ::: document_errors
- if (errs.nonEmpty) {
- result.error_rc.output(
- errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
- errs.map(Protocol.Error_Message_Marker.apply))
- }
- else if (progress.stopped && result.ok) result.copy(rc = Process_Result.RC.interrupt)
- else result
- case Exn.Exn(Exn.Interrupt()) =>
- if (result.ok) result.copy(rc = Process_Result.RC.interrupt)
- else result
- case Exn.Exn(exn) => throw exn
- }
- }
-
- def terminate(): Unit = future_result.cancel()
- def is_finished: Boolean = future_result.is_finished
-
- private val timeout_request: Option[Event_Timer.Request] = {
- if (info.timeout_ignored) None
- else Some(Event_Timer.request(Time.now() + info.timeout) { terminate() })
- }
-
- def join: Process_Result = {
- val result = future_result.join
-
- val was_timeout =
- timeout_request match {
- case None => false
- case Some(request) => !request.cancel()
- }
-
- if (result.ok) result
- else if (was_timeout) result.error(Output.error_message_text("Timeout")).timeout_rc
- else if (result.interrupted) result.error(Output.error_message_text("Interrupt"))
- else result
- }
-}
--- a/src/Pure/Tools/build_process.scala Sun Feb 19 09:55:37 2023 +0000
+++ b/src/Pure/Tools/build_process.scala Sun Feb 19 13:51:49 2023 +0100
@@ -160,13 +160,6 @@
private val build_deps = build_context.deps
private val progress = build_context.progress
- // global state
- private val numa_nodes = new NUMA.Nodes(numa_shuffling)
- private var build_graph = build_context.sessions_structure.build_graph
- private var build_order = SortedSet.from(build_graph.keys)(build_context.ordering)
- private var running = Map.empty[String, (SHA1.Shasum, Build_Job)]
- private var results = Map.empty[String, Build_Process.Result]
-
private val log =
build_options.string("system_log") match {
case "" => No_Logger
@@ -174,19 +167,65 @@
case log_file => Logger.make(Some(Path.explode(log_file)))
}
- private def remove_pending(name: String): Unit = {
- build_graph = build_graph.del_node(name)
- build_order = build_order - name
+ // global state
+ private val _numa_nodes = new NUMA.Nodes(numa_shuffling)
+ private var _build_graph = build_context.sessions_structure.build_graph
+ private var _build_order = SortedSet.from(_build_graph.keys)(build_context.ordering)
+ private var _running = Map.empty[String, Build_Job]
+ private var _results = Map.empty[String, Build_Process.Result]
+
+ private def remove_pending(name: String): Unit = synchronized {
+ _build_graph = _build_graph.del_node(name)
+ _build_order = _build_order - name
+ }
+
+ private def next_pending(): Option[String] = synchronized {
+ if (_running.size < (max_jobs max 1)) {
+ _build_order.iterator
+ .dropWhile(name => _running.isDefinedAt(name) || !_build_graph.is_minimal(name))
+ .nextOption()
+ }
+ else None
+ }
+
+ private def next_numa_node(): Option[Int] = synchronized {
+ _numa_nodes.next(used =
+ Set.from(for { job <- _running.valuesIterator; i <- job.numa_node } yield i))
}
- private def next_pending(): Option[String] =
- build_order.iterator
- .dropWhile(name => running.isDefinedAt(name) || !build_graph.is_minimal(name))
- .nextOption()
+ private def test_running(): Boolean = synchronized { !_build_graph.is_empty }
+
+ private def stop_running(): Unit = synchronized { _running.valuesIterator.foreach(_.terminate()) }
+
+ private def finished_running(): List[Build_Job.Build_Session] = synchronized {
+ List.from(
+ _running.valuesIterator.flatMap {
+ case job: Build_Job.Build_Session if job.is_finished => Some(job)
+ case _ => None
+ })
+ }
+
+ private def job_running(name: String, job: Build_Job): Build_Job = synchronized {
+ _running += (name -> job)
+ job
+ }
- private def used_node(i: Int): Boolean =
- running.iterator.exists(
- { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i })
+ private def remove_running(name: String): Unit = synchronized {
+ _running -= name
+ }
+
+ private def add_result(
+ name: String,
+ current: Boolean,
+ output_heap: SHA1.Shasum,
+ process_result: Process_Result
+ ): Unit = synchronized {
+ _results += (name -> Build_Process.Result(current, output_heap, process_result))
+ }
+
+ private def get_results(names: List[String]): List[Build_Process.Result] = synchronized {
+ names.map(_results.apply)
+ }
private def session_finished(session_name: String, process_result: Process_Result): String =
"Finished " + session_name + " (" + process_result.timing.message_resources + ")"
@@ -198,14 +237,10 @@
"Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")"
}
- private def finish_job(session_name: String, input_heaps: SHA1.Shasum, job: Build_Job): Unit = {
+ private def finish_job(job: Build_Job.Build_Session): Unit = {
+ val session_name = job.session_name
val process_result = job.join
-
- val output_heap =
- if (process_result.ok && job.do_store && store.output_heap(session_name).is_file) {
- SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name)
- }
- else SHA1.no_shasum
+ val output_heap = job.finish
val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
val process_result_tail = {
@@ -236,7 +271,7 @@
build_log =
if (process_result.timeout) build_log.error("Timeout") else build_log,
build =
- Build.Session_Info(build_deps.sources_shasum(session_name), input_heaps,
+ Sessions.Build_Info(build_deps.sources_shasum(session_name), job.input_heaps,
output_heap, process_result.rc, UUID.random().toString)))
// messages
@@ -251,15 +286,18 @@
if (!process_result.interrupted) progress.echo(process_result_tail.out)
}
- remove_pending(session_name)
- running -= session_name
- results += (session_name -> Build_Process.Result(false, output_heap, process_result_tail))
+ synchronized {
+ remove_pending(session_name)
+ remove_running(session_name)
+ add_result(session_name, false, output_heap, process_result_tail)
+ }
}
private def start_job(session_name: String): Unit = {
val ancestor_results =
- build_deps.sessions_structure.build_requirements(List(session_name)).
- filterNot(_ == session_name).map(results(_))
+ get_results(
+ build_deps.sessions_structure.build_requirements(List(session_name)).
+ filterNot(_ == session_name))
val input_heaps =
if (ancestor_results.isEmpty) {
SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE")))
@@ -289,13 +327,17 @@
val all_current = current && ancestor_results.forall(_.current)
if (all_current) {
- remove_pending(session_name)
- results += (session_name -> Build_Process.Result(true, output_heap, Process_Result.ok))
+ synchronized {
+ remove_pending(session_name)
+ add_result(session_name, true, output_heap, Process_Result.ok)
+ }
}
else if (no_build) {
progress.echo_if(verbose, "Skipping " + session_name + " ...")
- remove_pending(session_name)
- results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.error))
+ synchronized {
+ remove_pending(session_name)
+ add_result(session_name, false, output_heap, Process_Result.error)
+ }
}
else if (ancestor_results.forall(_.ok) && !progress.stopped) {
progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...")
@@ -309,16 +351,21 @@
new Resources(session_background, log = log,
command_timings = build_context(session_name).old_command_timings)
- val numa_node = numa_nodes.next(used_node)
val job =
- new Build_Job(progress, session_background, store, do_store,
- resources, session_setup, numa_node)
- running += (session_name -> (input_heaps, job))
+ synchronized {
+ val numa_node = next_numa_node()
+ job_running(session_name,
+ new Build_Job.Build_Session(progress, session_background, store, do_store,
+ resources, session_setup, input_heaps, numa_node))
+ }
+ job.start()
}
else {
progress.echo(session_name + " CANCELLED")
- remove_pending(session_name)
- results += (session_name -> Build_Process.Result(false, output_heap, Process_Result.undefined))
+ synchronized {
+ remove_pending(session_name)
+ add_result(session_name, false, output_heap, Process_Result.undefined)
+ }
}
}
@@ -328,23 +375,17 @@
}
def run(): Map[String, Build_Process.Result] = {
- while (!build_graph.is_empty) {
- if (progress.stopped) {
- for ((_, (_, job)) <- running) job.terminate()
- }
+ while (test_running()) {
+ if (progress.stopped) stop_running()
- running.find({ case (_, (_, job)) => job.is_finished }) match {
- case Some((session_name, (input_heaps, job))) =>
- finish_job(session_name, input_heaps, job)
- case None if running.size < (max_jobs max 1) =>
- next_pending() match {
- case Some(session_name) => start_job(session_name)
- case None => sleep()
- }
+ for (job <- finished_running()) finish_job(job)
+
+ next_pending() match {
+ case Some(session_name) => start_job(session_name)
case None => sleep()
}
}
- results
+ synchronized { _results }
}
}