# HG changeset patch # User wenzelm # Date 1676205906 -3600 # Node ID 68a7ad1385bc46b0e9a0362049c359103ee42c55 # Parent 25e923c57af7ce9448229c9272c5223a37cc3626 clarified modules; diff -r 25e923c57af7 -r 68a7ad1385bc src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Sat Feb 11 23:24:57 2023 +0100 +++ b/src/Pure/Tools/build.scala Sun Feb 12 13:45:06 2023 +0100 @@ -8,10 +8,6 @@ package isabelle -import scala.collection.immutable.SortedSet -import scala.annotation.tailrec - - object Build { /** auxiliary **/ @@ -28,31 +24,6 @@ } - /* queue with scheduling information */ - - private object Queue { - def apply(build_context: Build_Process.Context): Queue = { - val build_graph = build_context.sessions_structure.build_graph - val build_order = SortedSet.from(build_graph.keys)(build_context.ordering) - new Queue(build_graph, build_order) - } - } - - private class Queue( - build_graph: Graph[String, Sessions.Info], - build_order: SortedSet[String] - ) { - def is_empty: Boolean = build_graph.is_empty - - def - (name: String): Queue = - new Queue(build_graph.del_node(name), build_order - name) - - def dequeue(skip: String => Boolean): Option[String] = - build_order.iterator.dropWhile(name => skip(name) || !build_graph.is_minimal(name)) - .nextOption() - } - - /** build with results **/ @@ -76,16 +47,6 @@ override def toString: String = rc.toString } - def session_finished(session_name: String, process_result: Process_Result): String = - "Finished " + session_name + " (" + process_result.timing.message_resources + ")" - - def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = { - val props = build_log.session_timing - val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 - val timing = Markup.Timing_Properties.get(props) - "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")" - } - def build( options: Options, selection: Sessions.Selection = Sessions.Selection.empty, @@ -170,10 +131,9 @@ } - /* main build process */ + /* build process and results */ - val build_context = - Build_Process.Context(build_deps.sessions_structure, store, progress = progress) + val build_context = Build_Process.Context(store, build_deps, progress = progress) store.prepare_output_dir() @@ -187,188 +147,19 @@ } } - // scheduler loop - case class Result( - current: Boolean, - output_heap: SHA1.Shasum, - process_result: Process_Result - ) { - def ok: Boolean = process_result.ok - } - - def sleep(): Unit = - Isabelle_Thread.interrupt_handler(_ => progress.stop()) { - build_options.seconds("editor_input_delay").sleep() - } - - val log = - build_options.string("system_log") match { - case "" => No_Logger - case "-" => Logger.make(progress) - case log_file => Logger.make(Some(Path.explode(log_file))) - } - - val numa_nodes = new NUMA.Nodes(numa_shuffling) - - @tailrec def loop( - pending: Queue, - running: Map[String, (SHA1.Shasum, Build_Job)], - results: Map[String, Result] - ): Map[String, Result] = { - def used_node(i: Int): Boolean = - running.iterator.exists( - { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i }) - - if (pending.is_empty) results - else { - if (progress.stopped) { - for ((_, (_, job)) <- running) job.terminate() - } - - running.find({ case (_, (_, job)) => job.is_finished }) match { - case Some((session_name, (input_heaps, job))) => - //{{{ finish job - - val process_result = job.join - - val output_heap = - if (process_result.ok && job.do_store && store.output_heap(session_name).is_file) { - SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name) - } - else SHA1.no_shasum - - val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) - val process_result_tail = { - val tail = job.info.options.int("process_output_tail") - process_result.copy( - out_lines = - "(more details via \"isabelle log -H Error " + session_name + "\")" :: - (if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0))) - } - - val build_log = - Build_Log.Log_File(session_name, process_result.out_lines). - parse_session_info( - command_timings = true, - theory_timings = true, - ml_statistics = true, - task_statistics = true) - - // write log file - if (process_result.ok) { - File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) - } - else File.write(store.output_log(session_name), terminate_lines(log_lines)) - - // write database - using(store.open_database(session_name, output = true))(db => - store.write_session_info(db, session_name, job.session_sources, - build_log = - if (process_result.timeout) build_log.error("Timeout") else build_log, - build = - Session_Info(build_deps.sources_shasum(session_name), input_heaps, output_heap, - process_result.rc, UUID.random().toString))) - - // messages - process_result.err_lines.foreach(progress.echo) - - if (process_result.ok) { - if (verbose) progress.echo(session_timing(session_name, build_log)) - progress.echo(session_finished(session_name, process_result)) - } - else { - progress.echo(session_name + " FAILED") - if (!process_result.interrupted) progress.echo(process_result_tail.out) - } - - loop(pending - session_name, running - session_name, - results + (session_name -> Result(false, output_heap, process_result_tail))) - //}}} - case None if running.size < (max_jobs max 1) => - //{{{ check/start next job - pending.dequeue(running.isDefinedAt) match { - case Some(session_name) => - val ancestor_results = - build_deps.sessions_structure.build_requirements(List(session_name)). - filterNot(_ == session_name).map(results(_)) - val input_heaps = - if (ancestor_results.isEmpty) { - SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE"))) - } - else SHA1.flat_shasum(ancestor_results.map(_.output_heap)) - - val do_store = build_heap || build_context.build_heap(session_name) - val (current, output_heap) = { - store.try_open_database(session_name) match { - case Some(db) => - using(db)(store.read_build(_, session_name)) match { - case Some(build) => - val output_heap = store.find_heap_shasum(session_name) - val current = - !fresh_build && - build.ok && - build.sources == build_deps.sources_shasum(session_name) && - build.input_heaps == input_heaps && - build.output_heap == output_heap && - !(do_store && output_heap.is_empty) - (current, output_heap) - case None => (false, SHA1.no_shasum) - } - case None => (false, SHA1.no_shasum) - } - } - val all_current = current && ancestor_results.forall(_.current) - - if (all_current) { - loop(pending - session_name, running, - results + (session_name -> Result(true, output_heap, Process_Result.ok))) - } - else if (no_build) { - progress.echo_if(verbose, "Skipping " + session_name + " ...") - loop(pending - session_name, running, - results + (session_name -> Result(false, output_heap, Process_Result.error))) - } - else if (ancestor_results.forall(_.ok) && !progress.stopped) { - progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...") - - store.clean_output(session_name) - using(store.open_database(session_name, output = true))( - store.init_session_info(_, session_name)) - - val session_background = build_deps.background(session_name) - val resources = - new Resources(session_background, log = log, - command_timings = build_context(session_name).old_command_timings) - - val numa_node = numa_nodes.next(used_node) - val job = - new Build_Job(progress, session_background, store, do_store, - resources, session_setup, numa_node) - loop(pending, running + (session_name -> (input_heaps, job)), results) - } - else { - progress.echo(session_name + " CANCELLED") - loop(pending - session_name, running, - results + (session_name -> Result(false, output_heap, Process_Result.undefined))) - } - case None => sleep(); loop(pending, running, results) - } - ///}}} - case None => sleep(); loop(pending, running, results) - } - } - } - - - /* build results */ - val results = { val build_results = if (build_deps.is_empty) { progress.echo_warning("Nothing to build") - Map.empty[String, Result] + Map.empty[String, Build_Process.Result] } - else Isabelle_Thread.uninterruptible { loop(Queue(build_context), Map.empty, Map.empty) } + else { + Isabelle_Thread.uninterruptible { + Build_Process.main(build_context, build_heap = build_heap, + numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build, + no_build = no_build, verbose = verbose, session_setup = session_setup) + } + } val sessions_ok: List[String] = (for { diff -r 25e923c57af7 -r 68a7ad1385bc src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sat Feb 11 23:24:57 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Sun Feb 12 13:45:06 2023 +0100 @@ -9,6 +9,8 @@ import scala.math.Ordering +import scala.collection.immutable.SortedSet +import scala.annotation.tailrec object Build_Process { @@ -64,10 +66,11 @@ object Context { def apply( - sessions_structure: Sessions.Structure, store: Sessions.Store, + deps: Sessions.Deps, progress: Progress = new Progress ): Context = { + val sessions_structure = deps.sessions_structure val build_graph = sessions_structure.build_graph val sessions = @@ -110,19 +113,250 @@ } } - new Context(sessions_structure, sessions, ordering) + new Context(store, deps, sessions, ordering, progress) } } final class Context private( - val sessions_structure: Sessions.Structure, + val store: Sessions.Store, + val deps: Sessions.Deps, sessions: Map[String, Session_Context], - val ordering: Ordering[String] + val ordering: Ordering[String], + val progress: Progress ) { + def sessions_structure: Sessions.Structure = deps.sessions_structure + def apply(session: String): Session_Context = sessions.getOrElse(session, Session_Context.empty(session, Time.zero)) def build_heap(session: String): Boolean = Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session) } + + + /* queue with scheduling information */ + + private object Queue { + def apply(build_context: Build_Process.Context): Queue = { + val build_graph = build_context.sessions_structure.build_graph + val build_order = SortedSet.from(build_graph.keys)(build_context.ordering) + new Queue(build_graph, build_order) + } + } + + private class Queue( + build_graph: Graph[String, Sessions.Info], + build_order: SortedSet[String] + ) { + def is_empty: Boolean = build_graph.is_empty + + def - (name: String): Queue = + new Queue(build_graph.del_node(name), build_order - name) + + def dequeue(skip: String => Boolean): Option[String] = + build_order.iterator.dropWhile(name => skip(name) || !build_graph.is_minimal(name)) + .nextOption() + } + + + /* main */ + + private def session_finished(session_name: String, process_result: Process_Result): String = + "Finished " + session_name + " (" + process_result.timing.message_resources + ")" + + private def session_timing(session_name: String, build_log: Build_Log.Session_Info): String = { + val props = build_log.session_timing + val threads = Markup.Session_Timing.Threads.unapply(props) getOrElse 1 + val timing = Markup.Timing_Properties.get(props) + "Timing " + session_name + " (" + threads + " threads, " + timing.message_factor + ")" + } + + case class Result( + current: Boolean, + output_heap: SHA1.Shasum, + process_result: Process_Result + ) { + def ok: Boolean = process_result.ok + } + + def main( + build_context: Context, + build_heap: Boolean = false, + numa_shuffling: Boolean = false, + max_jobs: Int = 1, + fresh_build: Boolean = false, + no_build: Boolean = false, + verbose: Boolean = false, + session_setup: (String, Session) => Unit = (_, _) => () + ): Map[String, Result] = { + val store = build_context.store + val build_options = store.options + val build_deps = build_context.deps + val progress = build_context.progress + + def sleep(): Unit = + Isabelle_Thread.interrupt_handler(_ => progress.stop()) { + build_options.seconds("editor_input_delay").sleep() + } + + val log = + build_options.string("system_log") match { + case "" => No_Logger + case "-" => Logger.make(progress) + case log_file => Logger.make(Some(Path.explode(log_file))) + } + + val numa_nodes = new NUMA.Nodes(numa_shuffling) + + @tailrec def loop( + pending: Queue, + running: Map[String, (SHA1.Shasum, Build_Job)], + results: Map[String, Result] + ): Map[String, Result] = { + def used_node(i: Int): Boolean = + running.iterator.exists( + { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i }) + + if (pending.is_empty) results + else { + if (progress.stopped) { + for ((_, (_, job)) <- running) job.terminate() + } + + running.find({ case (_, (_, job)) => job.is_finished }) match { + case Some((session_name, (input_heaps, job))) => + //{{{ finish job + + val process_result = job.join + + val output_heap = + if (process_result.ok && job.do_store && store.output_heap(session_name).is_file) { + SHA1.shasum(ML_Heap.write_digest(store.output_heap(session_name)), session_name) + } + else SHA1.no_shasum + + val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) + val process_result_tail = { + val tail = job.info.options.int("process_output_tail") + process_result.copy( + out_lines = + "(more details via \"isabelle log -H Error " + session_name + "\")" :: + (if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0))) + } + + val build_log = + Build_Log.Log_File(session_name, process_result.out_lines). + parse_session_info( + command_timings = true, + theory_timings = true, + ml_statistics = true, + task_statistics = true) + + // write log file + if (process_result.ok) { + File.write_gzip(store.output_log_gz(session_name), terminate_lines(log_lines)) + } + else File.write(store.output_log(session_name), terminate_lines(log_lines)) + + // write database + using(store.open_database(session_name, output = true))(db => + store.write_session_info(db, session_name, job.session_sources, + build_log = + if (process_result.timeout) build_log.error("Timeout") else build_log, + build = + Build.Session_Info(build_deps.sources_shasum(session_name), input_heaps, + output_heap, process_result.rc, UUID.random().toString))) + + // messages + process_result.err_lines.foreach(progress.echo) + + if (process_result.ok) { + if (verbose) progress.echo(session_timing(session_name, build_log)) + progress.echo(session_finished(session_name, process_result)) + } + else { + progress.echo(session_name + " FAILED") + if (!process_result.interrupted) progress.echo(process_result_tail.out) + } + + loop(pending - session_name, running - session_name, + results + (session_name -> Result(false, output_heap, process_result_tail))) + //}}} + case None if running.size < (max_jobs max 1) => + //{{{ check/start next job + pending.dequeue(running.isDefinedAt) match { + case Some(session_name) => + val ancestor_results = + build_deps.sessions_structure.build_requirements(List(session_name)). + filterNot(_ == session_name).map(results(_)) + val input_heaps = + if (ancestor_results.isEmpty) { + SHA1.shasum_meta_info(SHA1.digest(Path.explode("$POLYML_EXE"))) + } + else SHA1.flat_shasum(ancestor_results.map(_.output_heap)) + + val do_store = build_heap || build_context.build_heap(session_name) + val (current, output_heap) = { + store.try_open_database(session_name) match { + case Some(db) => + using(db)(store.read_build(_, session_name)) match { + case Some(build) => + val output_heap = store.find_heap_shasum(session_name) + val current = + !fresh_build && + build.ok && + build.sources == build_deps.sources_shasum(session_name) && + build.input_heaps == input_heaps && + build.output_heap == output_heap && + !(do_store && output_heap.is_empty) + (current, output_heap) + case None => (false, SHA1.no_shasum) + } + case None => (false, SHA1.no_shasum) + } + } + val all_current = current && ancestor_results.forall(_.current) + + if (all_current) { + loop(pending - session_name, running, + results + (session_name -> Result(true, output_heap, Process_Result.ok))) + } + else if (no_build) { + progress.echo_if(verbose, "Skipping " + session_name + " ...") + loop(pending - session_name, running, + results + (session_name -> Result(false, output_heap, Process_Result.error))) + } + else if (ancestor_results.forall(_.ok) && !progress.stopped) { + progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...") + + store.clean_output(session_name) + using(store.open_database(session_name, output = true))( + store.init_session_info(_, session_name)) + + val session_background = build_deps.background(session_name) + val resources = + new Resources(session_background, log = log, + command_timings = build_context(session_name).old_command_timings) + + val numa_node = numa_nodes.next(used_node) + val job = + new Build_Job(progress, session_background, store, do_store, + resources, session_setup, numa_node) + loop(pending, running + (session_name -> (input_heaps, job)), results) + } + else { + progress.echo(session_name + " CANCELLED") + loop(pending - session_name, running, + results + (session_name -> Result(false, output_heap, Process_Result.undefined))) + } + case None => sleep(); loop(pending, running, results) + } + ///}}} + case None => sleep(); loop(pending, running, results) + } + } + } + + loop(Queue(build_context), Map.empty, Map.empty) + } }