src/Pure/Tools/build_job.scala
changeset 79502 c7a98469c0e7
parent 79501 bce98b5dfec6
child 79503 c67b47cd41dc
--- a/src/Pure/Tools/build_job.scala	Sat Jan 20 13:52:36 2024 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,552 +0,0 @@
-/*  Title:      Pure/Tools/build_job.scala
-    Author:     Makarius
-
-Build job running prover process, with rudimentary PIDE session.
-*/
-
-package isabelle
-
-
-import scala.collection.mutable
-
-
-trait Build_Job {
-  def cancel(): Unit = ()
-  def is_finished: Boolean = false
-  def join: Option[Build_Job.Result] = None
-}
-
-object Build_Job {
-  sealed case class Result(process_result: Process_Result, output_shasum: SHA1.Shasum)
-
-
-  /* build session */
-
-  def start_session(
-    build_context: Build.Context,
-    session_context: Session_Context,
-    progress: Progress,
-    log: Logger,
-    server: SSH.Server,
-    session_background: Sessions.Background,
-    sources_shasum: SHA1.Shasum,
-    input_shasum: SHA1.Shasum,
-    node_info: Host.Node_Info,
-    store_heap: Boolean
-  ): Session_Job = {
-    new Session_Job(build_context, session_context, progress, log, server,
-      session_background, sources_shasum, input_shasum, node_info, store_heap)
-  }
-
-  object Session_Context {
-    def load(
-      database_server: Option[SQL.Database],
-      build_uuid: String,
-      name: String,
-      deps: List[String],
-      ancestors: List[String],
-      session_prefs: String,
-      sources_shasum: SHA1.Shasum,
-      timeout: Time,
-      store: Store,
-      progress: Progress = new Progress
-    ): Session_Context = {
-      def default: Session_Context =
-        Session_Context(
-          name, deps, ancestors, session_prefs, sources_shasum, timeout,
-          Time.zero, Bytes.empty, build_uuid)
-
-      def read(db: SQL.Database): Session_Context = {
-        def ignore_error(msg: String) = {
-          progress.echo_warning(
-            "Ignoring bad database " + db + " for session " + quote(name) +
-            if_proper(msg, ":\n" + msg))
-          default
-        }
-        try {
-          val command_timings = store.read_command_timings(db, name)
-          val elapsed =
-            store.read_session_timing(db, name) match {
-              case Markup.Elapsed(s) => Time.seconds(s)
-              case _ => Time.zero
-            }
-          new Session_Context(
-            name, deps, ancestors, session_prefs, sources_shasum, timeout,
-            elapsed, command_timings, build_uuid)
-        }
-        catch {
-          case ERROR(msg) => ignore_error(msg)
-          case exn: java.lang.Error => ignore_error(Exn.message(exn))
-          case _: XML.Error => ignore_error("XML.Error")
-        }
-      }
-
-      database_server match {
-        case Some(db) => if (store.session_info_exists(db)) read(db) else default
-        case None => using_option(store.try_open_database(name))(read) getOrElse default
-      }
-    }
-  }
-
-  sealed case class Session_Context(
-    name: String,
-    deps: List[String],
-    ancestors: List[String],
-    session_prefs: String,
-    sources_shasum: SHA1.Shasum,
-    timeout: Time,
-    old_time: Time,
-    old_command_timings_blob: Bytes,
-    build_uuid: String
-  ) extends Library.Named
-
-  class Session_Job private[Build_Job](
-    build_context: Build.Context,
-    session_context: Session_Context,
-    progress: Progress,
-    log: Logger,
-    server: SSH.Server,
-    session_background: Sessions.Background,
-    sources_shasum: SHA1.Shasum,
-    input_shasum: SHA1.Shasum,
-    node_info: Host.Node_Info,
-    store_heap: Boolean
-  ) extends Build_Job {
-    def session_name: String = session_background.session_name
-
-    private val future_result: Future[Option[Result]] =
-      Future.thread("build", uninterruptible = true) {
-        val info = session_background.sessions_structure(session_name)
-        val options = Host.node_options(info.options, node_info)
-
-        val store = build_context.store
-
-        using_optional(store.maybe_open_database_server(server = server)) { database_server =>
-
-          store.clean_output(database_server, session_name, session_init = true)
-
-          val session_sources =
-            Store.Sources.load(session_background.base, cache = store.cache.compress)
-
-          val env =
-            Isabelle_System.settings(
-              List("ISABELLE_ML_DEBUGGER" -> options.bool("ML_debugger").toString))
-
-          val session_heaps =
-            session_background.info.parent match {
-              case None => Nil
-              case Some(logic) => ML_Process.session_heaps(store, session_background, logic = logic)
-            }
-
-          val use_prelude = if (session_heaps.isEmpty) Thy_Header.ml_roots.map(_._1) else Nil
-
-          val eval_store =
-            if (store_heap) {
-              (if (info.theories.nonEmpty) List("ML_Heap.share_common_data ()") else Nil) :::
-              List("ML_Heap.save_child " +
-                ML_Syntax.print_string_bytes(File.platform_path(store.output_heap(session_name))))
-            }
-            else Nil
-
-          def session_blobs(node_name: Document.Node.Name): List[(Command.Blob, Document.Blobs.Item)] =
-            session_background.base.theory_load_commands.get(node_name.theory) match {
-              case None => Nil
-              case Some(spans) =>
-                val syntax = session_background.base.theory_syntax(node_name)
-                val master_dir = Path.explode(node_name.master_dir)
-                for (span <- spans; file <- span.loaded_files(syntax).files)
-                  yield {
-                    val src_path = Path.explode(file)
-                    val blob_name = Document.Node.Name(File.symbolic_path(master_dir + src_path))
-
-                    val bytes = session_sources(blob_name.node).bytes
-                    val text = bytes.text
-                    val chunk = Symbol.Text_Chunk(text)
-
-                    Command.Blob(blob_name, src_path, Some((SHA1.digest(bytes), chunk))) ->
-                      Document.Blobs.Item(bytes, text, chunk, changed = false)
-                  }
-            }
-
-
-          /* session */
-
-          val resources =
-            new Resources(session_background, log = log,
-              command_timings =
-                Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache))
-
-          val session =
-            new Session(options, resources) {
-              override val cache: Term.Cache = store.cache
-
-              override def build_blobs_info(node_name: Document.Node.Name): Command.Blobs_Info =
-                Command.Blobs_Info.make(session_blobs(node_name))
-
-              override def build_blobs(node_name: Document.Node.Name): Document.Blobs =
-                Document.Blobs.make(session_blobs(node_name))
-            }
-
-          object Build_Session_Errors {
-            private val promise: Promise[List[String]] = Future.promise
-
-            def result: Exn.Result[List[String]] = promise.join_result
-            def cancel(): Unit = promise.cancel()
-            def apply(errs: List[String]): Unit = {
-              try { promise.fulfill(errs) }
-              catch { case _: IllegalStateException => }
-            }
-          }
-
-          val export_consumer =
-            Export.consumer(store.open_database(session_name, output = true, server = server),
-              store.cache, progress = progress)
-
-          val stdout = new StringBuilder(1000)
-          val stderr = new StringBuilder(1000)
-          val command_timings = new mutable.ListBuffer[Properties.T]
-          val theory_timings = new mutable.ListBuffer[Properties.T]
-          val session_timings = new mutable.ListBuffer[Properties.T]
-          val runtime_statistics = new mutable.ListBuffer[Properties.T]
-          val task_statistics = new mutable.ListBuffer[Properties.T]
-
-          def fun(
-            name: String,
-            acc: mutable.ListBuffer[Properties.T],
-            unapply: Properties.T => Option[Properties.T]
-          ): (String, Session.Protocol_Function) = {
-            name -> ((msg: Prover.Protocol_Output) =>
-              unapply(msg.properties) match {
-                case Some(props) => acc += props; true
-                case _ => false
-              })
-          }
-
-          session.init_protocol_handler(new Session.Protocol_Handler {
-              override def exit(): Unit = Build_Session_Errors.cancel()
-
-              private def build_session_finished(msg: Prover.Protocol_Output): Boolean = {
-                val (rc, errors) =
-                  try {
-                    val (rc, errs) = {
-                      import XML.Decode._
-                      pair(int, list(x => x))(Symbol.decode_yxml(msg.text))
-                    }
-                    val errors =
-                      for (err <- errs) yield {
-                        val prt = Protocol_Message.expose_no_reports(err)
-                        Pretty.string_of(prt, metric = Symbol.Metric)
-                      }
-                    (rc, errors)
-                  }
-                  catch { case ERROR(err) => (Process_Result.RC.failure, List(err)) }
-
-                session.protocol_command("Prover.stop", rc.toString)
-                Build_Session_Errors(errors)
-                true
-              }
-
-              private def loading_theory(msg: Prover.Protocol_Output): Boolean =
-                msg.properties match {
-                  case Markup.Loading_Theory(Markup.Name(name)) =>
-                    progress.theory(Progress.Theory(name, session = session_name))
-                    false
-                  case _ => false
-                }
-
-              private def export_(msg: Prover.Protocol_Output): Boolean =
-                msg.properties match {
-                  case Protocol.Export(args) =>
-                    export_consumer.make_entry(session_name, args, msg.chunk)
-                    true
-                  case _ => false
-                }
-
-              override val functions: Session.Protocol_Functions =
-                List(
-                  Markup.Build_Session_Finished.name -> build_session_finished,
-                  Markup.Loading_Theory.name -> loading_theory,
-                  Markup.EXPORT -> export_,
-                  fun(Markup.Theory_Timing.name, theory_timings, Markup.Theory_Timing.unapply),
-                  fun(Markup.Session_Timing.name, session_timings, Markup.Session_Timing.unapply),
-                  fun(Markup.Task_Statistics.name, task_statistics, Markup.Task_Statistics.unapply))
-            })
-
-          session.command_timings += Session.Consumer("command_timings") {
-            case Session.Command_Timing(props) =>
-              for {
-                elapsed <- Markup.Elapsed.unapply(props)
-                elapsed_time = Time.seconds(elapsed)
-                if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
-              } command_timings += props.filter(Markup.command_timing_property)
-          }
-
-          session.runtime_statistics += Session.Consumer("ML_statistics") {
-            case Session.Runtime_Statistics(props) => runtime_statistics += props
-          }
-
-          session.finished_theories += Session.Consumer[Document.Snapshot]("finished_theories") {
-            case snapshot =>
-              if (!progress.stopped) {
-                def export_(name: String, xml: XML.Body, compress: Boolean = true): Unit = {
-                  if (!progress.stopped) {
-                    val theory_name = snapshot.node_name.theory
-                    val args =
-                      Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
-                    val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
-                    export_consumer.make_entry(session_name, args, body)
-                  }
-                }
-                def export_text(name: String, text: String, compress: Boolean = true): Unit =
-                  export_(name, List(XML.Text(text)), compress = compress)
-
-                for (command <- snapshot.snippet_command) {
-                  export_text(Export.DOCUMENT_ID, command.id.toString, compress = false)
-                }
-
-                export_text(Export.FILES,
-                  cat_lines(snapshot.node_files.map(name => File.symbolic_path(name.path))),
-                  compress = false)
-
-                for ((blob_name, i) <- snapshot.node_files.tail.zipWithIndex) {
-                  val xml = snapshot.switch(blob_name).xml_markup()
-                  export_(Export.MARKUP + (i + 1), xml)
-                }
-                export_(Export.MARKUP, snapshot.xml_markup())
-                export_(Export.MESSAGES, snapshot.messages.map(_._1))
-              }
-          }
-
-          session.all_messages += Session.Consumer[Any]("build_session_output") {
-            case msg: Prover.Output =>
-              val message = msg.message
-              if (msg.is_system) resources.log(Protocol.message_text(message))
-
-              if (msg.is_stdout) {
-                stdout ++= Symbol.encode(XML.content(message))
-              }
-              else if (msg.is_stderr) {
-                stderr ++= Symbol.encode(XML.content(message))
-              }
-              else if (msg.is_exit) {
-                val err =
-                  "Prover terminated" +
-                    (msg.properties match {
-                      case Markup.Process_Result(result) => ": " + result.print_rc
-                      case _ => ""
-                    })
-                Build_Session_Errors(List(err))
-              }
-            case _ =>
-          }
-
-          build_context.session_setup(session_name, session)
-
-          val eval_main = Command_Line.ML_tool("Isabelle_Process.init_build ()" :: eval_store)
-
-
-          /* process */
-
-          val process =
-            Isabelle_Process.start(options, session, session_background, session_heaps,
-              use_prelude = use_prelude, eval_main = eval_main, cwd = info.dir.file, env = env)
-
-          val timeout_request: Option[Event_Timer.Request] =
-            if (info.timeout_ignored) None
-            else Some(Event_Timer.request(Time.now() + info.timeout) { process.terminate() })
-
-          val build_errors =
-            Isabelle_Thread.interrupt_handler(_ => process.terminate()) {
-              Exn.capture { process.await_startup() } match {
-                case Exn.Res(_) =>
-                  val resources_yxml = resources.init_session_yxml
-                  val encode_options: XML.Encode.T[Options] =
-                    options => session.prover_options(options).encode
-                  val args_yxml =
-                    YXML.string_of_body(
-                      {
-                        import XML.Encode._
-                        pair(string, list(pair(encode_options, list(pair(string, properties)))))(
-                          (session_name, info.theories))
-                      })
-                  session.protocol_command("build_session", resources_yxml, args_yxml)
-                  Build_Session_Errors.result
-                case Exn.Exn(exn) => Exn.Res(List(Exn.message(exn)))
-              }
-            }
-
-          val result0 =
-            Isabelle_Thread.interrupt_handler(_ => process.terminate()) { process.await_shutdown() }
-
-          val was_timeout =
-            timeout_request match {
-              case None => false
-              case Some(request) => !request.cancel()
-            }
-
-          session.stop()
-
-          val export_errors =
-            export_consumer.shutdown(close = true).map(Output.error_message_text)
-
-          val (document_output, document_errors) =
-            try {
-              if (Exn.is_res(build_errors) && result0.ok && info.documents.nonEmpty) {
-                using(Export.open_database_context(store, server = server)) { database_context =>
-                  val documents =
-                    using(database_context.open_session(session_background)) {
-                      session_context =>
-                        Document_Build.build_documents(
-                          Document_Build.context(session_context, progress = progress),
-                          output_sources = info.document_output,
-                          output_pdf = info.document_output)
-                    }
-                  using(database_context.open_database(session_name, output = true))(session_database =>
-                    documents.foreach(_.write(session_database.db, session_name)))
-                  (documents.flatMap(_.log_lines), Nil)
-                }
-              }
-              else (Nil, Nil)
-            }
-            catch {
-              case exn: Document_Build.Build_Error => (exn.log_lines, exn.log_errors)
-              case Exn.Interrupt.ERROR(msg) => (Nil, List(msg))
-            }
-
-
-          /* process result */
-
-          val result1 = {
-            val theory_timing =
-              theory_timings.iterator.flatMap(
-                {
-                  case props @ Markup.Name(name) => Some(name -> props)
-                  case _ => None
-                }).toMap
-            val used_theory_timings =
-              for { (name, _) <- session_background.base.used_theories }
-                yield theory_timing.getOrElse(name.theory, Markup.Name(name.theory))
-
-            val more_output =
-              Library.trim_line(stdout.toString) ::
-                command_timings.toList.map(Protocol.Command_Timing_Marker.apply) :::
-                used_theory_timings.map(Protocol.Theory_Timing_Marker.apply) :::
-                session_timings.toList.map(Protocol.Session_Timing_Marker.apply) :::
-                runtime_statistics.toList.map(Protocol.ML_Statistics_Marker.apply) :::
-                task_statistics.toList.map(Protocol.Task_Statistics_Marker.apply) :::
-                document_output
-
-            result0.output(more_output)
-              .error(Library.trim_line(stderr.toString))
-              .errors_rc(export_errors ::: document_errors)
-          }
-
-          val result2 =
-            build_errors match {
-              case Exn.Res(build_errs) =>
-                val errs = build_errs ::: document_errors
-                if (errs.nonEmpty) {
-                  result1.error_rc.output(
-                    errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
-                      errs.map(Protocol.Error_Message_Marker.apply))
-                }
-                else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
-                else result1
-              case Exn.Exn(Exn.Interrupt()) =>
-                if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
-                else result1
-              case Exn.Exn(exn) => throw exn
-            }
-
-          val process_result =
-            if (result2.ok) result2
-            else if (was_timeout) result2.error(Output.error_message_text("Timeout")).timeout_rc
-            else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt"))
-            else result2
-
-
-          /* output heap */
-
-          val output_shasum = {
-            val heap = store.output_heap(session_name)
-            if (process_result.ok && store_heap && heap.is_file) {
-              val slice = Space.MiB(options.real("build_database_slice")).bytes
-              val digest = ML_Heap.store(database_server, session_name, heap, slice)
-              SHA1.shasum(digest, session_name)
-            }
-            else SHA1.no_shasum
-          }
-
-          val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
-
-          val build_log =
-            Build_Log.Log_File(session_name, process_result.out_lines, cache = store.cache).
-              parse_session_info(
-                command_timings = true,
-                theory_timings = true,
-                ml_statistics = true,
-                task_statistics = true)
-
-          // write log file
-          if (process_result.ok) {
-            File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines))
-          }
-          else File.write(store.output_log(session_name), terminate_lines(log_lines))
-
-          // write database
-          def write_info(db: SQL.Database): Unit =
-            store.write_session_info(db, session_name, session_sources,
-              build_log =
-                if (process_result.timeout) build_log.error("Timeout") else build_log,
-              build =
-                Store.Build_Info(
-                  sources = sources_shasum,
-                  input_heaps = input_shasum,
-                  output_heap = output_shasum,
-                  process_result.rc,
-                  build_context.build_uuid))
-
-          val valid =
-            if (progress.stopped_local) false
-            else {
-              database_server match {
-                case Some(db) => write_info(db)
-                case None => using(store.open_database(session_name, output = true))(write_info)
-              }
-              true
-            }
-
-          // messages
-          process_result.err_lines.foreach(progress.echo(_))
-
-          if (process_result.ok) {
-            val props = build_log.session_timing
-            val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1
-            val timing = Markup.Timing_Properties.get(props)
-            progress.echo(
-              "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")",
-              verbose = true)
-            progress.echo(
-              "Finished " + session_name + " (" + process_result.timing.message_resources + ")")
-          }
-          else {
-            progress.echo(
-              session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")")
-            if (!process_result.interrupted) {
-              val tail = info.options.int("process_output_tail")
-              val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)
-              val prefix = if (log_lines.length == suffix.length) Nil else List("...")
-              progress.echo(Library.trim_line(cat_lines(prefix ::: suffix)))
-            }
-          }
-
-          if (valid) Some(Result(process_result.copy(out_lines = log_lines), output_shasum))
-          else None
-        }
-      }
-
-    override def cancel(): Unit = future_result.cancel()
-    override def is_finished: Boolean = future_result.is_finished
-    override def join: Option[Result] = future_result.join
-  }
-}