# HG changeset patch # User wenzelm # Date 1677870647 -3600 # Node ID f0d9f9196b9b047b45d408a146b574159bd804c6 # Parent c546e3e1f7f6f1fcceadf41aa0fbfdbec8156f9f more database content; clarified signature; tuned comments; diff -r c546e3e1f7f6 -r f0d9f9196b9b src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Fri Mar 03 13:50:54 2023 +0100 +++ b/src/Pure/Tools/build_job.scala Fri Mar 03 20:10:47 2023 +0100 @@ -46,6 +46,7 @@ object Session_Context { def load( + uuid: String, name: String, deps: List[String], ancestors: List[String], @@ -55,7 +56,8 @@ progress: Progress = new Progress ): Session_Context = { def default: Session_Context = - new Session_Context(name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty) + Session_Context( + name, deps, ancestors, sources_shasum, timeout, Time.zero, Bytes.empty, uuid) store.try_open_database(name) match { case None => default @@ -74,7 +76,7 @@ case _ => Time.zero } new Session_Context( - name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings) + name, deps, ancestors, sources_shasum, timeout, elapsed, command_timings, uuid) } catch { case ERROR(msg) => ignore_error(msg) @@ -86,14 +88,15 @@ } } - final class Session_Context( - val name: String, - val deps: List[String], - val ancestors: List[String], - val sources_shasum: SHA1.Shasum, - val timeout: Time, - val old_time: Time, - val old_command_timings_blob: Bytes + sealed case class Session_Context( + name: String, + deps: List[String], + ancestors: List[String], + sources_shasum: SHA1.Shasum, + timeout: Time, + old_time: Time, + old_command_timings_blob: Bytes, + uuid: String ) { override def toString: String = name } diff -r c546e3e1f7f6 -r f0d9f9196b9b src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Fri Mar 03 13:50:54 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Fri Mar 03 20:10:47 2023 +0100 @@ -42,7 +42,8 @@ val sources_shasum = build_deps.sources_shasum(name) val session_context = Build_Job.Session_Context.load( - name, deps, ancestors, sources_shasum, info.timeout, store, progress = progress) + uuid, name, deps, ancestors, sources_shasum, info.timeout, store, + progress = progress) name -> session_context }) @@ -85,10 +86,11 @@ } } + // static context of one particular instance, identified by uuid final class Context private( val store: Sessions.Store, val build_deps: Sessions.Deps, - val sessions: Map[String, Build_Job.Session_Context], + val sessions: State.Sessions, val ordering: Ordering[String], val progress: Progress, val hostname: String, @@ -145,12 +147,21 @@ def ok: Boolean = process_result.ok } + object State { + type Sessions = Map[String, Build_Job.Session_Context] + type Pending = List[Entry] + type Running = Map[String, Build_Job] + type Results = Map[String, Result] + } + + // dynamic state of various instances, distinguished by uuid sealed case class State( serial: Long = 0, numa_index: Int = 0, - pending: List[Entry] = Nil, - running: Map[String, Build_Job] = Map.empty, - results: Map[String, Build_Process.Result] = Map.empty + sessions: State.Sessions = Map.empty, // static build targets + pending: State.Pending = Nil, // dynamic build "queue" + running: State.Running = Map.empty, // presently running jobs + results: State.Results = Map.empty // finished results ) { def numa_next(numa_nodes: List[Int]): (Option[Int], State) = if (numa_nodes.isEmpty) (None, this) @@ -235,6 +246,20 @@ val table = make_table("serial", List(serial)) } + object Sessions { + val name = Generic.name.make_primary_key + val deps = SQL.Column.string("deps") + val ancestors = SQL.Column.string("ancestors") + val sources = SQL.Column.string("sources") + val timeout = SQL.Column.long("timeout") + val old_time = SQL.Column.long("old_time") + val old_command_timings = SQL.Column.bytes("old_command_timings") + val uuid = Generic.uuid + + val table = make_table("sessions", + List(name, deps, ancestors, sources, timeout, old_time, old_command_timings, uuid)) + } + object Pending { val name = Generic.name.make_primary_key val deps = SQL.Column.string("deps") @@ -280,6 +305,49 @@ } } + def read_sessions_domain(db: SQL.Database): Set[String] = + db.using_statement(Sessions.table.select(List(Sessions.name)))(stmt => + Set.from(stmt.execute_query().iterator(_.string(Sessions.name)))) + + def read_sessions(db: SQL.Database, names: Iterable[String] = Nil): State.Sessions = + db.using_statement( + Sessions.table.select(sql = if_proper(names, Sessions.name.where_member(names))) + ) { stmt => + Map.from(stmt.execute_query().iterator { res => + val name = res.string(Sessions.name) + val deps = split_lines(res.string(Sessions.deps)) + val ancestors = split_lines(res.string(Sessions.ancestors)) + val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources)) + val timeout = Time.ms(res.long(Sessions.timeout)) + val old_time = Time.ms(res.long(Sessions.old_time)) + val old_command_timings_blob = res.bytes(Sessions.old_command_timings) + val uuid = res.string(Sessions.uuid) + name -> Build_Job.Session_Context(name, deps, ancestors, sources_shasum, + timeout, old_time, old_command_timings_blob, uuid) + }) + } + + def update_sessions(db:SQL.Database, sessions: State.Sessions): Boolean = { + val old_sessions = read_sessions_domain(db) + val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList + + for ((name, session) <- insert) { + db.using_statement(Sessions.table.insert()) { stmt => + stmt.string(1) = name + stmt.string(2) = cat_lines(session.deps) + stmt.string(3) = cat_lines(session.ancestors) + stmt.string(4) = session.sources_shasum.toString + stmt.long(5) = session.timeout.ms + stmt.long(6) = session.old_time.ms + stmt.bytes(7) = session.old_command_timings_blob + stmt.string(8) = session.uuid + stmt.execute() + } + } + + insert.nonEmpty + } + def read_pending(db: SQL.Database): List[Entry] = db.using_statement(Pending.table.select(sql = SQL.order_by(List(Pending.name)))) { stmt => List.from( @@ -291,7 +359,7 @@ }) } - def update_pending(db: SQL.Database, pending: List[Entry]): Boolean = { + def update_pending(db: SQL.Database, pending: State.Pending): Boolean = { val old_pending = read_pending(db) val (delete, insert) = Library.symmetric_difference(old_pending, pending) @@ -324,7 +392,7 @@ }) } - def update_running(db: SQL.Database, running: Map[String, Build_Job]): Boolean = { + def update_running(db: SQL.Database, running: State.Running): Boolean = { val old_running = read_running(db) val abs_running = running.valuesIterator.map(_.make_abstract).toList @@ -348,11 +416,15 @@ delete.nonEmpty || insert.nonEmpty } + def read_results_domain(db: SQL.Database): Set[String] = + db.using_statement(Results.table.select(List(Results.name)))(stmt => + Set.from(stmt.execute_query().iterator(_.string(Results.name)))) + def read_results(db: SQL.Database, names: List[String] = Nil): Map[String, Build_Job.Result] = db.using_statement( - Results.table.select(sql = if_proper(names, Results.name.where_member(names)))) { stmt => - Map.from( - stmt.execute_query().iterator { res => + Results.table.select(sql = if_proper(names, Results.name.where_member(names))) + ) { stmt => + Map.from(stmt.execute_query().iterator { res => val name = res.string(Results.name) val hostname = res.string(Results.hostname) val numa_node = res.get_int(Results.numa_node) @@ -372,14 +444,10 @@ timing = timing) name -> Build_Job.Result(node_info, process_result) }) - } + } - def read_results_name(db: SQL.Database): Set[String] = - db.using_statement(Results.table.select(List(Results.name)))(stmt => - Set.from(stmt.execute_query().iterator(_.string(Results.name)))) - - def update_results(db: SQL.Database, results: Map[String, Build_Process.Result]): Boolean = { - val old_results = read_results_name(db) + def update_results(db: SQL.Database, results: State.Results): Boolean = { + val old_results = read_results_domain(db) val insert = results.iterator.filterNot(p => old_results.contains(p._1)).toList for ((name, result) <- insert) { @@ -407,6 +475,7 @@ List( Base.table, Serial.table, + Sessions.table, Pending.table, Running.table, Results.table, @@ -438,6 +507,7 @@ ): State = { val changed = List( + update_sessions(db, state.sessions), update_pending(db, state.pending), update_running(db, state.running), update_results(db, state.results), @@ -512,6 +582,12 @@ /* policy operations */ protected def init_state(state: Build_Process.State): Build_Process.State = { + val sessions1 = + build_context.sessions.foldLeft(state.sessions) { case (map, (name, session)) => + if (state.sessions.isDefinedAt(name)) map + else map + (name -> session) + } + val old_pending = state.pending.iterator.map(_.name).toSet val new_pending = List.from( @@ -519,7 +595,9 @@ (name, session_context) <- build_context.sessions.iterator if !old_pending(name) } yield Build_Process.Entry(name, session_context.deps)) - state.copy(pending = new_pending ::: state.pending) + val pending1 = new_pending ::: state.pending + + state.copy(sessions = sessions1, pending = pending1) } protected def next_job(state: Build_Process.State): Option[String] =