diff -r f3a6140fa3b1 -r c2c59de57df9 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sat Jul 01 16:32:46 2023 +0200 +++ b/src/Pure/Tools/build_process.scala Sat Jul 01 16:42:57 2023 +0200 @@ -16,10 +16,9 @@ object Build_Process { /** static context **/ - def init_context( + sealed case class Context( store: Store, - build_deps: Sessions.Deps, - progress: Progress = new Progress, + build_deps: isabelle.Sessions.Deps, ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"), hostname: String = Isabelle_System.hostname(), numa_shuffling: Boolean = false, @@ -29,79 +28,7 @@ no_build: Boolean = false, session_setup: (String, Session) => Unit = (_, _) => (), build_uuid: String = UUID.random().toString, - master: Boolean = false, - ): Context = { - val sessions_structure = build_deps.sessions_structure - val build_graph = sessions_structure.build_graph - - val sessions = - Map.from( - for ((name, (info, _)) <- build_graph.iterator) - yield { - val deps = info.parent.toList - val ancestors = sessions_structure.build_requirements(deps) - val sources_shasum = build_deps.sources_shasum(name) - val session_context = - Build_Job.Session_Context.load( - build_uuid, name, deps, ancestors, info.session_prefs, sources_shasum, - info.timeout, store, progress = progress) - name -> session_context - }) - - val sessions_time = { - val maximals = build_graph.maximals.toSet - def descendants_time(name: String): Double = { - if (maximals.contains(name)) sessions(name).old_time.seconds - else { - val descendants = build_graph.all_succs(List(name)).toSet - val g = build_graph.restrict(descendants) - (0.0 :: g.maximals.flatMap { desc => - val ps = g.all_preds(List(desc)) - if (ps.exists(p => !sessions.isDefinedAt(p))) None - else Some(ps.map(p => sessions(p).old_time.seconds).sum) - }).max - } - } - Map.from( - for (name <- sessions.keysIterator) - yield name -> descendants_time(name)).withDefaultValue(0.0) - } - - val ordering = - new Ordering[String] { - def compare(name1: String, name2: String): Int = - sessions_time(name2) compare sessions_time(name1) match { - case 0 => - sessions(name2).timeout compare sessions(name1).timeout match { - case 0 => name1 compare name2 - case ord => ord - } - case ord => ord - } - } - - val numa_nodes = Host.numa_nodes(enabled = numa_shuffling) - - new Context(store, build_deps, sessions, ordering, ml_platform, hostname, numa_nodes, - build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build, - no_build = no_build, session_setup, build_uuid = build_uuid, master = master) - } - - final class Context private[Build_Process]( - val store: Store, - val build_deps: Sessions.Deps, - val sessions: State.Sessions, - val ordering: Ordering[String], - val ml_platform: String, - val hostname: String, - val numa_nodes: List[Int], - val build_heap: Boolean, - val max_jobs: Int, - val fresh_build: Boolean, - val no_build: Boolean, - val session_setup: (String, Session) => Unit, - val build_uuid: String, - val master: Boolean + master: Boolean = false ) { override def toString: String = "Build_Process.Context(build_uuid = " + quote(build_uuid) + @@ -109,20 +36,7 @@ def build_options: Options = store.options - def sessions_structure: Sessions.Structure = build_deps.sessions_structure - - def sources_shasum(name: String): SHA1.Shasum = sessions(name).sources_shasum - - def old_command_timings(name: String): List[Properties.T] = - sessions.get(name) match { - case Some(session_context) => - Properties.uncompress(session_context.old_command_timings_blob, cache = store.cache) - case None => Nil - } - - def store_heap(name: String): Boolean = - build_heap || Sessions.is_pure(name) || - sessions.valuesIterator.exists(_.ancestors.contains(name)) + def sessions_structure: isabelle.Sessions.Structure = build_deps.sessions_structure def worker_active: Boolean = max_jobs > 0 } @@ -184,16 +98,122 @@ def ok: Boolean = process_result.ok } + object Sessions { + type Graph = isabelle.Graph[String, Build_Job.Session_Context] + val empty: Sessions = new Sessions(Graph.string) + } + + final class Sessions private(val graph: Sessions.Graph) { + override def toString: String = graph.toString + + def apply(name: String): Build_Job.Session_Context = graph.get_node(name) + + def iterator: Iterator[Build_Job.Session_Context] = + for (name <- graph.topological_order.iterator) yield apply(name) + + def make(new_graph: Sessions.Graph): Sessions = + if (graph == new_graph) this + else { + new Sessions( + new_graph.iterator.foldLeft(new_graph) { + case (g, (name, (session, _))) => g.add_deps_acyclic(name, session.deps) + }) + } + + def pull( + data_domain: Set[String], + data: Set[String] => List[Build_Job.Session_Context] + ): Sessions = { + val dom = data_domain -- iterator.map(_.name) + make(data(dom).foldLeft(graph.restrict(dom)) { case (g, e) => g.new_node(e.name, e) }) + } + + def init(build_context: Context, progress: Progress = new Progress): Sessions = { + val sessions_structure = build_context.sessions_structure + make( + sessions_structure.build_graph.iterator.foldLeft(graph) { + case (graph0, (name, (info, _))) => + val deps = info.parent.toList + val prefs = info.session_prefs + val ancestors = sessions_structure.build_requirements(deps) + val sources_shasum = build_context.build_deps.sources_shasum(name) + + if (graph0.defined(name)) { + val session0 = graph0.get_node(name) + val prefs0 = session0.session_prefs + val ancestors0 = session0.ancestors + val sources_shasum0 = session0.sources_shasum + + def err(msg: String, a: String, b: String): Nothing = + error("Conflicting dependencies for session " + quote(name) + ": " + + msg + "\n" + a + "\nvs.\n" + b) + + if (prefs0 != prefs) { + err("preferences disagree", + Symbol.cartouche_decoded(prefs0), Symbol.cartouche_decoded(prefs)) + } + if (ancestors0 != ancestors) { + err("ancestors disagree", commas_quote(ancestors0), commas_quote(ancestors)) + } + if (sources_shasum0 != sources_shasum) { + val a = sources_shasum0 - sources_shasum + val b = sources_shasum - sources_shasum0 + err("sources disagree", a.toString, b.toString) + } + + graph0 + } + else { + val session = + Build_Job.Session_Context.load( + build_context.build_uuid, name, deps, ancestors, prefs, sources_shasum, + info.timeout, build_context.store, progress = progress) + graph0.new_node(name, session) + } + } + ) + } + + lazy val max_time: Map[String, Double] = { + val maximals = graph.maximals.toSet + def descendants_time(name: String): Double = { + if (maximals.contains(name)) apply(name).old_time.seconds + else { + val descendants = graph.all_succs(List(name)).toSet + val g = graph.restrict(descendants) + (0.0 :: g.maximals.flatMap { desc => + val ps = g.all_preds(List(desc)) + if (ps.exists(p => !graph.defined(p))) None + else Some(ps.map(p => apply(p).old_time.seconds).sum) + }).max + } + } + Map.from( + for (name <- graph.keys_iterator) + yield name -> descendants_time(name)).withDefaultValue(0.0) + } + + lazy val ordering: Ordering[String] = + (a: String, b: String) => + max_time(b) compare max_time(a) match { + case 0 => + apply(b).timeout compare apply(a).timeout match { + case 0 => a compare b + case ord => ord + } + case ord => ord + } + } + sealed case class Snapshot( builds: List[Build], // available build configurations workers: List[Worker], // available worker processes - sessions: State.Sessions, // static build targets + sessions: Sessions, // static build targets pending: State.Pending, // dynamic build "queue" running: State.Running, // presently running jobs results: State.Results) // finished results object State { - type Sessions = Map[String, Build_Job.Session_Context] type Pending = List[Task] type Running = Map[String, Job] type Results = Map[String, Result] @@ -206,7 +226,8 @@ sealed case class State( serial: Long = 0, - sessions: State.Sessions = Map.empty, + numa_nodes: List[Int] = Nil, + sessions: Sessions = Sessions.empty, pending: State.Pending = Nil, running: State.Running = Map.empty, results: State.Results = Map.empty @@ -406,7 +427,7 @@ def read_sessions(db: SQL.Database, names: Iterable[String] = Nil, build_uuid: String = "" - ): State.Sessions = + ): List[Build_Job.Session_Context] = { db.execute_query_statement( Sessions.table.select( sql = @@ -414,7 +435,7 @@ if_proper(names, Sessions.name.member(names)), if_proper(build_uuid, Sessions.build_uuid.equal(build_uuid))) ), - Map.from[String, Build_Job.Session_Context], + List.from[Build_Job.Session_Context], { res => val name = res.string(Sessions.name) val deps = split_lines(res.string(Sessions.deps)) @@ -425,19 +446,20 @@ val old_time = Time.ms(res.long(Sessions.old_time)) val old_command_timings_blob = res.bytes(Sessions.old_command_timings) val build_uuid = res.string(Sessions.build_uuid) - name -> Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum, + Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum, timeout, old_time, old_command_timings_blob, build_uuid) } ) + } - def update_sessions(db: SQL.Database, sessions: State.Sessions): Boolean = { + def update_sessions(db: SQL.Database, sessions: Build_Process.Sessions): Boolean = { val old_sessions = read_sessions_domain(db) - val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList + val insert = sessions.iterator.filterNot(s => old_sessions.contains(s.name)).toList - for ((name, session) <- insert) { + for (session <- insert) { db.execute_statement(Sessions.table.insert(), body = { stmt => - stmt.string(1) = name + stmt.string(1) = session.name stmt.string(2) = cat_lines(session.deps) stmt.string(3) = cat_lines(session.ancestors) stmt.string(4) = session.session_prefs @@ -756,7 +778,7 @@ val serial = serial_db max state.serial stamp_worker(db, worker_uuid, serial) - val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions) + val sessions = state.sessions.pull(read_sessions_domain(db), read_sessions(db, _)) val pending = read_pending(db) val running = pull0(read_running(db), state.running) val results = pull1(read_results_domain(db), read_results(db, _), state.results) @@ -805,7 +827,7 @@ protected final val store: Store = build_context.store protected final val build_options: Options = store.options - protected final val build_deps: Sessions.Deps = build_context.build_deps + protected final val build_deps: isabelle.Sessions.Deps = build_context.build_deps protected final val hostname: String = build_context.hostname protected final val build_uuid: String = build_context.build_uuid @@ -888,46 +910,47 @@ /* policy operations */ protected def init_state(state: Build_Process.State): Build_Process.State = { - val sessions1 = - build_context.sessions.foldLeft(state.sessions) { case (map, (name, session)) => - if (state.sessions.isDefinedAt(name)) map - else map + (name -> session) - } + val sessions1 = state.sessions.init(build_context, progress = build_progress) val old_pending = state.pending.iterator.map(_.name).toSet val new_pending = List.from( - for { - (name, session_context) <- build_context.sessions.iterator - if !old_pending(name) - } yield Build_Process.Task(name, session_context.deps, JSON.Object.empty, build_uuid)) + for (session <- sessions1.iterator if !old_pending(session.name)) + yield Build_Process.Task(session.name, session.deps, JSON.Object.empty, build_uuid)) val pending1 = new_pending ::: state.pending - state.copy(sessions = sessions1, pending = pending1) + state.copy( + numa_nodes = Host.numa_nodes(enabled = build_context.numa_shuffling), + sessions = sessions1, + pending = pending1) } protected def next_job(state: Build_Process.State): Option[String] = if (progress.stopped || state.running.size < build_context.max_jobs) { state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name)) - .sortBy(_.name)(build_context.ordering) + .sortBy(_.name)(state.sessions.ordering) .headOption.map(_.name) } else None protected def start_session(state: Build_Process.State, session_name: String): Build_Process.State = { val ancestor_results = - for (a <- build_context.sessions(session_name).ancestors) yield state.results(a) + for (a <- state.sessions(session_name).ancestors) yield state.results(a) + + val sources_shasum = state.sessions(session_name).sources_shasum val input_shasum = if (ancestor_results.isEmpty) ML_Process.bootstrap_shasum() else SHA1.flat_shasum(ancestor_results.map(_.output_shasum)) - val store_heap = build_context.store_heap(session_name) + val store_heap = + build_context.build_heap || Sessions.is_pure(session_name) || + state.sessions.iterator.exists(_.ancestors.contains(session_name)) val (current, output_shasum) = store.check_output(session_name, session_options = build_context.sessions_structure(session_name).options, - sources_shasum = build_context.sources_shasum(session_name), + sources_shasum = sources_shasum, input_shasum = input_shasum, fresh_build = build_context.fresh_build, store_heap = store_heap) @@ -966,7 +989,7 @@ val numa_node = for { db <- _host_database - n <- Host.next_numa_node(db, hostname, build_context.numa_nodes, used_nodes) + n <- Host.next_numa_node(db, hostname, state.numa_nodes, used_nodes) } yield n val node_info = Host.Node_Info(hostname, numa_node) @@ -976,9 +999,11 @@ store.clean_output(_database_server, session_name, session_init = true) + val session = state.sessions(session_name) + val build = - Build_Job.start_session(build_context, progress, log, _database_server, - build_deps.background(session_name), input_shasum, node_info) + Build_Job.start_session(build_context, session, progress, log, _database_server, + build_deps.background(session_name), sources_shasum, input_shasum, node_info, store_heap) val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))