# HG changeset patch # User wenzelm # Date 1710253832 -3600 # Node ID 630a82f873109aebdf8f95b31074de0111ce2730 # Parent 510fe8c3d9b865f78acb3d1eb962cbd6804a680a database performance tuning: pull changed entries only, based on recorded updates (see 98d65411bfdb); diff -r 510fe8c3d9b8 -r 630a82f87310 src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala Tue Mar 12 15:11:29 2024 +0100 +++ b/src/Pure/Build/build_process.scala Tue Mar 12 15:30:32 2024 +0100 @@ -44,10 +44,8 @@ object Task { type Entry = (String, Task) - def entry(name: String, deps: List[String], build_uuid: String): Entry = - name -> Task(name, deps, build_uuid) def entry(session: Build_Job.Session_Context, build_context: isabelle.Build.Context): Entry = - entry(session.name, session.deps, build_context.build_uuid) + session.name -> Task(session.name, session.deps, build_context.build_uuid) } sealed case class Task( @@ -111,12 +109,15 @@ }) } - def pull( - data_domain: Set[String], - data: Set[String] => List[Build_Job.Session_Context] - ): Sessions = { - val dom = data_domain -- iterator.map(_.name) - make(data(dom).foldLeft(graph.restrict(dom)) { case (g, e) => g.new_node(e.name, e) }) + def update(updates: List[Library.Update.Op[Build_Job.Session_Context]]): Sessions = { + val graph1 = + updates.foldLeft(graph) { + case (g, Library.Update.Delete(name)) => g.del_node(name) + case (g, Library.Update.Insert(session)) => + (if (g.defined(session.name)) g.del_node(session.name) else g) + .new_node(session.name, session) + } + make(graph1) } def init( @@ -300,32 +301,6 @@ private lazy val build_id_tables = tables.filter(t => Generic.build_id_table(t) && !Generic.build_uuid_table(t)) - def pull[A <: Library.Named]( - data_domain: Set[String], - data_iterator: Set[String] => Iterator[A], - old_data: Map[String, A] - ): Map[String, A] = { - val dom = data_domain -- old_data.keysIterator - val data = old_data -- old_data.keysIterator.filterNot(data_domain) - if (dom.isEmpty) data - else data_iterator(dom).foldLeft(data) { case (map, a) => map + (a.name -> a) } - } - - def pull0[A <: Library.Named]( - new_data: Map[String, A], - old_data: Map[String, A] - ): Map[String, A] = { - pull(new_data.keySet, dom => new_data.valuesIterator.filter(a => dom(a.name)), old_data) - } - - def pull1[A <: Library.Named]( - data_domain: Set[String], - data_base: Set[String] => Map[String, A], - old_data: Map[String, A] - ): Map[String, A] = { - pull(data_domain, dom => data_base(dom).valuesIterator, old_data) - } - object Generic { val build_id = SQL.Column.long("build_id") val build_uuid = SQL.Column.string("build_uuid") @@ -472,8 +447,7 @@ }) for (build <- builds.sortBy(_.start)(Date.Ordering)) yield { - val sessions = private_data.read_sessions_domain(db, build_uuid = build.build_uuid) - build.copy(sessions = sessions.toList.sorted) + build.copy(sessions = private_data.read_sessions(db, build_uuid = build.build_uuid).sorted) } } @@ -545,39 +519,29 @@ lazy val table_index: Int = tables.index(table) } - def read_sessions_domain(db: SQL.Database, build_uuid: String = ""): Set[String] = + def read_sessions(db: SQL.Database, build_uuid: String = ""): List[String] = db.execute_query_statement( Sessions.table.select(List(Sessions.name), sql = if_proper(build_uuid, Sessions.build_uuid.where_equal(build_uuid))), - Set.from[String], res => res.string(Sessions.name)) + List.from[String], res => res.string(Sessions.name)) - def read_sessions(db: SQL.Database, - names: Iterable[String] = Nil, - build_uuid: String = "" - ): List[Build_Job.Session_Context] = { - db.execute_query_statement( - Sessions.table.select( - sql = - SQL.where_and( - if_proper(names, Sessions.name.member(names)), - if_proper(build_uuid, Sessions.build_uuid.equal(build_uuid))) - ), - List.from[Build_Job.Session_Context], - { res => - val name = res.string(Sessions.name) - val deps = split_lines(res.string(Sessions.deps)) - val ancestors = split_lines(res.string(Sessions.ancestors)) - val options = res.string(Sessions.options) - val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources)) - val timeout = Time.ms(res.long(Sessions.timeout)) - val old_time = Time.ms(res.long(Sessions.old_time)) - val old_command_timings_blob = res.bytes(Sessions.old_command_timings) - val build_uuid = res.string(Sessions.build_uuid) - Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum, - timeout, old_time, old_command_timings_blob, build_uuid) - } + def pull_sessions(db: SQL.Database, build_id: Long, state: State): Sessions = + state.sessions.update( + read_updates(db, Sessions.table, build_id, state.serial, + { res => + val name = res.string(Sessions.name) + val deps = split_lines(res.string(Sessions.deps)) + val ancestors = split_lines(res.string(Sessions.ancestors)) + val options = res.string(Sessions.options) + val sources_shasum = SHA1.fake_shasum(res.string(Sessions.sources)) + val timeout = Time.ms(res.long(Sessions.timeout)) + val old_time = Time.ms(res.long(Sessions.old_time)) + val old_command_timings_blob = res.bytes(Sessions.old_command_timings) + val build_uuid = res.string(Sessions.build_uuid) + Build_Job.Session_Context(name, deps, ancestors, options, sources_shasum, + timeout, old_time, old_command_timings_blob, build_uuid) + }) ) - } def update_sessions( db: SQL.Database, @@ -720,16 +684,16 @@ lazy val table_index: Int = tables.index(table) } - def read_pending(db: SQL.Database): State.Pending = - db.execute_query_statement( - Pending.table.select(), - Map.from[String, Task], - { res => - val name = res.string(Pending.name) - val deps = res.string(Pending.deps) - val build_uuid = res.string(Pending.build_uuid) - Task.entry(name, split_lines(deps), build_uuid) + def pull_pending(db: SQL.Database, build_id: Long, state: State): State.Pending = + Library.Update.data(state.pending, + read_updates(db, Pending.table, build_id, state.serial, + { res => + val name = res.string(Pending.name) + val deps = res.string(Pending.deps) + val build_uuid = res.string(Pending.build_uuid) + Task(name, split_lines(deps), build_uuid) }) + ) def update_pending( db: SQL.Database, @@ -776,22 +740,21 @@ lazy val table_index: Int = tables.index(table) } - def read_running(db: SQL.Database): State.Running = - db.execute_query_statement( - Running.table.select(), - Map.from[String, Job], - { res => - val name = res.string(Running.name) - val worker_uuid = res.string(Running.worker_uuid) - val build_uuid = res.string(Running.build_uuid) - val hostname = res.string(Running.hostname) - val numa_node = res.get_int(Running.numa_node) - val rel_cpus = res.string(Running.rel_cpus) - val start_date = res.date(Running.start_date) + def pull_running(db: SQL.Database, build_id: Long, state: State): State.Running = + Library.Update.data(state.running, + read_updates(db, Running.table, build_id, state.serial, + { res => + val name = res.string(Running.name) + val worker_uuid = res.string(Running.worker_uuid) + val build_uuid = res.string(Running.build_uuid) + val hostname = res.string(Running.hostname) + val numa_node = res.get_int(Running.numa_node) + val rel_cpus = res.string(Running.rel_cpus) + val start_date = res.date(Running.start_date) + val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus)) - val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus)) - name -> Job(name, worker_uuid, build_uuid, node_info, start_date, None) - } + Job(name, worker_uuid, build_uuid, node_info, start_date, None) + }) ) def update_running( @@ -851,44 +814,37 @@ lazy val table_index: Int = tables.index(table) } - def read_results_domain(db: SQL.Database): Set[String] = - db.execute_query_statement( - Results.table.select(List(Results.name)), - Set.from[String], res => res.string(Results.name)) - - def read_results(db: SQL.Database, names: Iterable[String] = Nil): State.Results = - db.execute_query_statement( - Results.table.select(sql = if_proper(names, Results.name.where_member(names))), - Map.from[String, Result], - { res => - val name = res.string(Results.name) - val worker_uuid = res.string(Results.worker_uuid) - val build_uuid = res.string(Results.build_uuid) - val hostname = res.string(Results.hostname) - val numa_node = res.get_int(Results.numa_node) - val rel_cpus = res.string(Results.rel_cpus) - val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus)) + def pull_results(db: SQL.Database, build_id: Long, state: State): State.Results = + Library.Update.data(state.results, + read_updates(db, Results.table, build_id, state.serial, + { res => + val name = res.string(Results.name) + val worker_uuid = res.string(Results.worker_uuid) + val build_uuid = res.string(Results.build_uuid) + val hostname = res.string(Results.hostname) + val numa_node = res.get_int(Results.numa_node) + val rel_cpus = res.string(Results.rel_cpus) + val node_info = Host.Node_Info(hostname, numa_node, Host.Range.from(rel_cpus)) - val rc = res.int(Results.rc) - val out = res.string(Results.out) - val err = res.string(Results.err) - val timing = - res.timing( - Results.timing_elapsed, - Results.timing_cpu, - Results.timing_gc) - val process_result = - Process_Result(rc, - out_lines = split_lines(out), - err_lines = split_lines(err), - timing = timing) + val rc = res.int(Results.rc) + val out = res.string(Results.out) + val err = res.string(Results.err) + val timing = + res.timing( + Results.timing_elapsed, + Results.timing_cpu, + Results.timing_gc) + val process_result = + Process_Result(rc, + out_lines = split_lines(out), + err_lines = split_lines(err), + timing = timing) - val output_shasum = SHA1.fake_shasum(res.string(Results.output_shasum)) - val current = res.bool(Results.current) + val output_shasum = SHA1.fake_shasum(res.string(Results.output_shasum)) + val current = res.bool(Results.current) - name -> Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current) - } + }) ) def update_results( @@ -931,17 +887,17 @@ /* collective operations */ - def pull_database(db: SQL.Database, worker_uuid: String, state: State): State = { + def pull_database(db: SQL.Database, build_id: Long, worker_uuid: String, state: State): State = { val serial_db = read_serial(db) if (serial_db == state.serial) state else { val serial = serial_db max state.serial stamp_worker(db, worker_uuid, serial) - val sessions = state.sessions.pull(read_sessions_domain(db), read_sessions(db, _)) - val pending = read_pending(db) - val running = pull0(read_running(db), state.running) - val results = pull1(read_results_domain(db), read_results(db, _), state.results) + val sessions = pull_sessions(db, build_id, state) + val pending = pull_pending(db, build_id, state) + val running = pull_running(db, build_id, state) + val results = pull_results(db, build_id, state) state.copy(serial = serial, sessions = sessions, pending = pending, running = running, results = results) @@ -1129,7 +1085,8 @@ case None => body case Some(db) => Build_Process.private_data.transaction_lock(db, label = label) { - val old_state = Build_Process.private_data.pull_database(db, worker_uuid, _state) + val old_state = + Build_Process.private_data.pull_database(db, build_id, worker_uuid, _state) _state = old_state val res = body _state = diff -r 510fe8c3d9b8 -r 630a82f87310 src/Pure/Build/build_schedule.scala --- a/src/Pure/Build/build_schedule.scala Tue Mar 12 15:11:29 2024 +0100 +++ b/src/Pure/Build/build_schedule.scala Tue Mar 12 15:30:32 2024 +0100 @@ -849,7 +849,8 @@ case None => body case Some(db) => db.transaction_lock(Build_Schedule.private_data.all_tables, label = label) { - val old_state = Build_Process.private_data.pull_database(db, worker_uuid, _state) + val old_state = + Build_Process.private_data.pull_database(db, build_id, worker_uuid, _state) val old_schedule = Build_Schedule.private_data.pull_schedule(db, _schedule) _state = old_state _schedule = old_schedule