diff -r 25e923c57af7 -r 68a7ad1385bc src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sat Feb 11 23:24:57 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Sun Feb 12 13:45:06 2023 +0100 @@ -9,6 +9,8 @@ import scala.math.Ordering +import scala.collection.immutable.SortedSet +import scala.annotation.tailrec object Build_Process { @@ -64,10 +66,11 @@ object Context { def apply( - sessions_structure: Sessions.Structure, store: Sessions.Store, + deps: Sessions.Deps, progress: Progress = new Progress ): Context = { + val sessions_structure = deps.sessions_structure val build_graph = sessions_structure.build_graph val sessions = @@ -110,19 +113,250 @@ } } - new Context(sessions_structure, sessions, ordering) + new Context(store, deps, sessions, ordering, progress) } } final class Context private( - val sessions_structure: Sessions.Structure, + val store: Sessions.Store, + val deps: Sessions.Deps, sessions: Map[String, Session_Context], - val ordering: Ordering[String] + val ordering: Ordering[String], + val progress: Progress ) { + def sessions_structure: Sessions.Structure = deps.sessions_structure + def apply(session: String): Session_Context = sessions.getOrElse(session, Session_Context.empty(session, Time.zero)) def build_heap(session: String): Boolean = Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session) } + + + /* queue with scheduling information */ + + private object Queue { + def apply(build_context: Build_Process.Context): Queue = { + val build_graph = build_context.sessions_structure.build_graph + val build_order = SortedSet.from(build_graph.keys)(build_context.ordering) + new Queue(build_graph, build_order) + } + } + + private class Queue( + build_graph: Graph[String, Sessions.Info], + build_order: SortedSet[String] + ) { + def is_empty: Boolean = build_graph.is_empty + + def - (name: String): Queue = + new Queue(build_graph.del_node(name), build_order - name) + + def dequeue(skip: String => Boolean): Option[String] = + build_order.iterator.dropWhile(name => skip(name) || !build_graph.is_minimal(name)) + .nextOption() + } + + + /* main */ + + private def session_finished(session_name: String, process_result: Process_Result): String = + "Finished " + session_name + " (" + process_result.timing.message_resources + ")" + + private def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = { + val props = build_log.session_timing + val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 + val timing = Markup.Timing_Properties.get(props) + "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")" + } + + case class Result( + current: Boolean, + output_heap: SHA1.Shasum, + process_result: Process_Result + ) { + def ok: Boolean = process_result.ok + } + + def main( + build_context: Context, + build_heap: Boolean = false, + numa_shuffling: Boolean = false, + max_jobs: Int = 1, + fresh_build: Boolean = false, + no_build: Boolean = false, + verbose: Boolean = false, + session_setup: (String, Session) => Unit = (_, _) => () + ): Map[String, Result] = { + val store = build_context.store + val build_options = store.options + val build_deps = build_context.deps + val progress = build_context.progress + + def sleep(): Unit = + Isabelle_Thread.interrupt_handler(_ => progress.stop()) { + build_options.seconds("editor_input_delay").sleep() + } + + val log = + build_options.string("system_log") match { + case "" => No_Logger + case "-" => Logger.make(progress) + case log_file => Logger.make(Some(Path.explode(log_file))) + } + + val numa_nodes = new NUMA.Nodes(numa_shuffling) + + @tailrec def loop( + pending: Queue, + running: Map[String, (SHA1.Shasum, Build_Job)], + results: Map[String, Result] + ): Map[String, Result] = { + def used_node(i: Int): Boolean = + running.iterator.exists( + { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i }) + + if (pending.is_empty) results + else { + if (progress.stopped) { + for ((_, (_, job)) <- running) job.terminate() + } + + running.find({ case (_, (_, job)) => job.is_finished }) match { + case Some((session_name, (input_heaps, job))) => + //{{{ finish job + + val process_result = job.join + + val output_heap = + if (process_result.ok && job.do_store && store.output_heap(session_name).is_file) { + SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name) + } + else SHA1.no_shasum + + val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) + val process_result_tail = { + val tail = job.info.options.int("process_output_tail") + process_result.copy( + out_lines = + "(more details via \"isabelle log -H Error " + session_name + "\")" :: + (if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0))) + } + + val build_log = + Build_Log.Log_File(session_name, process_result.out_lines). + parse_session_info( + command_timings = true, + theory_timings = true, + ml_statistics = true, + task_statistics = true) + + // write log file + if (process_result.ok) { + File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) + } + else File.write(store.output_log(session_name), terminate_lines(log_lines)) + + // write database + using(store.open_database(session_name, output = true))(db => + store.write_session_info(db, session_name, job.session_sources, + build_log = + if (process_result.timeout) build_log.error("Timeout") else build_log, + build = + Build.Session_Info(build_deps.sources_shasum(session_name), input_heaps, + output_heap, process_result.rc, UUID.random().toString))) + + // messages + process_result.err_lines.foreach(progress.echo) + + if (process_result.ok) { + if (verbose) progress.echo(session_timing(session_name, build_log)) + progress.echo(session_finished(session_name, process_result)) + } + else { + progress.echo(session_name + " FAILED") + if (!process_result.interrupted) progress.echo(process_result_tail.out) + } + + loop(pending - session_name, running - session_name, + results + (session_name -> Result(false, output_heap, process_result_tail))) + //}}} + case None if running.size < (max_jobs max 1) => + //{{{ check/start next job + pending.dequeue(running.isDefinedAt) match { + case Some(session_name) => + val ancestor_results = + build_deps.sessions_structure.build_requirements(List(session_name)). + filterNot(_ == session_name).map(results(_)) + val input_heaps = + if (ancestor_results.isEmpty) { + SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE"))) + } + else SHA1.flat_shasum(ancestor_results.map(_.output_heap)) + + val do_store = build_heap || build_context.build_heap(session_name) + val (current, output_heap) = { + store.try_open_database(session_name) match { + case Some(db) => + using(db)(store.read_build(_, session_name)) match { + case Some(build) => + val output_heap = store.find_heap_shasum(session_name) + val current = + !fresh_build && + build.ok && + build.sources == build_deps.sources_shasum(session_name) && + build.input_heaps == input_heaps && + build.output_heap == output_heap && + !(do_store && output_heap.is_empty) + (current, output_heap) + case None => (false, SHA1.no_shasum) + } + case None => (false, SHA1.no_shasum) + } + } + val all_current = current && ancestor_results.forall(_.current) + + if (all_current) { + loop(pending - session_name, running, + results + (session_name -> Result(true, output_heap, Process_Result.ok))) + } + else if (no_build) { + progress.echo_if(verbose, "Skipping " + session_name + " ...") + loop(pending - session_name, running, + results + (session_name -> Result(false, output_heap, Process_Result.error))) + } + else if (ancestor_results.forall(_.ok) && !progress.stopped) { + progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...") + + store.clean_output(session_name) + using(store.open_database(session_name, output = true))( + store.init_session_info(_, session_name)) + + val session_background = build_deps.background(session_name) + val resources = + new Resources(session_background, log = log, + command_timings = build_context(session_name).old_command_timings) + + val numa_node = numa_nodes.next(used_node) + val job = + new Build_Job(progress, session_background, store, do_store, + resources, session_setup, numa_node) + loop(pending, running + (session_name -> (input_heaps, job)), results) + } + else { + progress.echo(session_name + " CANCELLED") + loop(pending - session_name, running, + results + (session_name -> Result(false, output_heap, Process_Result.undefined))) + } + case None => sleep(); loop(pending, running, results) + } + ///}}} + case None => sleep(); loop(pending, running, results) + } + } + } + + loop(Queue(build_context), Map.empty, Map.empty) + } }