# HG changeset patch # User wenzelm # Date 1677408924 -3600 # Node ID 44fe9fe96130dbb5b6102ad564cd697a185a9fee # Parent 84ca5e036897bd55ab76ca4304a1bf6190a90d66 support for build database: still inactive; more detailed Build_Job.Node_Info; diff -r 84ca5e036897 -r 44fe9fe96130 etc/options --- a/etc/options Sat Feb 25 17:45:10 2023 +0100 +++ b/etc/options Sun Feb 26 11:55:24 2023 +0100 @@ -180,6 +180,9 @@ option build_engine : string = "" -- "alternative session build engine" +option build_database : bool = false + -- "expose state of build process via central database" + section "Editor Session" diff -r 84ca5e036897 -r 44fe9fe96130 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Sat Feb 25 17:45:10 2023 +0100 +++ b/src/Pure/Tools/build.scala Sun Feb 26 11:55:24 2023 +0100 @@ -171,9 +171,10 @@ val results = Isabelle_Thread.uninterruptible { val engine = get_engine(build_options.string("build_engine")) - val build_process = engine.init(build_context) - val res = build_process.run() - Results(build_context, res) + using(engine.init(build_context)) { build_process => + val res = build_process.run() + Results(build_context, res) + } } if (export_files) { diff -r 84ca5e036897 -r 44fe9fe96130 src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Sat Feb 25 17:45:10 2023 +0100 +++ b/src/Pure/Tools/build_job.scala Sun Feb 26 11:55:24 2023 +0100 @@ -13,18 +13,25 @@ trait Build_Job { def job_name: String - def numa_node: Option[Int] = None + def node_info: Build_Job.Node_Info def start(): Unit = () def terminate(): Unit = () def is_finished: Boolean = false def join: Process_Result = Process_Result.undefined - def make_abstract: Build_Job.Abstract = Build_Job.Abstract(job_name, numa_node) + def make_abstract: Build_Job.Abstract = Build_Job.Abstract(job_name, node_info) } object Build_Job { + object Node_Info { def none: Node_Info = Node_Info("", None) } + sealed case class Node_Info(hostname: String, numa_node: Option[Int]) + + sealed case class Result(node_info: Node_Info, process_result: Process_Result) { + def ok: Boolean = process_result.ok + } + sealed case class Abstract( override val job_name: String, - override val numa_node: Option[Int] + override val node_info: Node_Info ) extends Build_Job { override def make_abstract: Abstract = this } @@ -36,13 +43,13 @@ resources: Resources, session_setup: (String, Session) => Unit, val input_heaps: SHA1.Shasum, - override val numa_node: Option[Int] + override val node_info: Node_Info ) extends Build_Job { def session_name: String = session_background.session_name def job_name: String = session_name val info: Sessions.Info = session_background.sessions_structure(session_name) - val options: Options = NUMA.policy_options(info.options, numa_node) + val options: Options = NUMA.policy_options(info.options, node_info.numa_node) val session_sources: Sessions.Sources = Sessions.Sources.load(session_background.base, cache = store.cache.compress) diff -r 84ca5e036897 -r 44fe9fe96130 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Sat Feb 25 17:45:10 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Sun Feb 26 11:55:24 2023 +0100 @@ -161,12 +161,14 @@ case class Result( current: Boolean, output_heap: SHA1.Shasum, + node_info: Build_Job.Node_Info, process_result: Process_Result ) { def ok: Boolean = process_result.ok } sealed case class State( + serial: Long = 0, numa_index: Int = 0, pending: List[Entry] = Nil, running: Map[String, Build_Job] = Map.empty, @@ -176,7 +178,8 @@ if (numa_nodes.isEmpty) (None, this) else { val available = numa_nodes.zipWithIndex - val used = Set.from(for (job <- running.valuesIterator; i <- job.numa_node) yield i) + val used = + Set.from(for (job <- running.valuesIterator; i <- job.node_info.numa_node) yield i) val candidates = available.drop(numa_index) ::: available.take(numa_index) val (n, i) = candidates.find({ case (n, i) => i == numa_index && !used(n) }) orElse @@ -211,9 +214,10 @@ name: String, current: Boolean, output_heap: SHA1.Shasum, - process_result: Process_Result + process_result: Process_Result, + node_info: Build_Job.Node_Info = Build_Job.Node_Info.none ): State = { - val result = Build_Process.Result(current, output_heap, process_result) + val result = Build_Process.Result(current, output_heap, node_info, process_result) copy(results = results + (name -> result)) } @@ -222,6 +226,266 @@ } + /* SQL data model */ + + object Data { + val database = Path.explode("$ISABELLE_HOME_USER/build.db") + + def make_table(name: String, columns: List[SQL.Column], body: String = ""): SQL.Table = + SQL.Table("isabelle_build" + if_proper(name, "_" + name), columns, body = body) + + object Generic { + val instance = SQL.Column.string("instance") + val name = SQL.Column.string("name") + + def sql_equal(instance: String = "", name: String = ""): SQL.Source = + SQL.and( + if_proper(instance, Generic.instance.equal(instance)), + if_proper(name, Generic.name.equal(name))) + + def sql_member(instance: String = "", names: Iterable[String] = Nil): SQL.Source = + SQL.and( + if_proper(instance, Generic.instance.equal(instance)), + if_proper(names, SQL.member(Generic.name.toString, names))) + } + + object Config { + val instance = Generic.instance.make_primary_key + val ml_identifier = SQL.Column.string("ml_identifier") + val options = SQL.Column.string("options") + + val table = make_table("", List(instance, ml_identifier, options)) + } + + object State { + val instance = Generic.instance.make_primary_key + val serial = SQL.Column.long("serial") + val numa_index = SQL.Column.int("numa_index") + + val table = make_table("state", List(instance, serial, numa_index)) + } + + object Pending { + val name = Generic.name.make_primary_key + val deps = SQL.Column.string("deps") + val info = SQL.Column.string("info") + + val table = make_table("pending", List(name, deps, info)) + } + + object Running { + val name = Generic.name.make_primary_key + val hostname = SQL.Column.string("hostname") + val numa_node = SQL.Column.int("numa_node") + + val table = make_table("running", List(name, hostname, numa_node)) + } + + object Results { + val name = Generic.name.make_primary_key + val hostname = SQL.Column.string("hostname") + val numa_node = SQL.Column.string("numa_node") + val rc = SQL.Column.int("rc") + val out = SQL.Column.string("out") + val err = SQL.Column.string("err") + val timing_elapsed = SQL.Column.long("timing_elapsed") + val timing_cpu = SQL.Column.long("timing_cpu") + val timing_gc = SQL.Column.long("timing_gc") + + val table = + make_table("results", + List(name, hostname, numa_node, rc, out, err, timing_elapsed, timing_cpu, timing_gc)) + } + + def read_pending(db: SQL.Database): List[Entry] = + db.using_statement(Pending.table.select() + SQL.order_by(List(Pending.name))) { stmt => + List.from( + stmt.execute_query().iterator { res => + val name = res.string(Pending.name) + val deps = res.string(Pending.deps) + val info = res.string(Pending.info) + Entry(name, split_lines(deps), info = JSON.Object.parse(info)) + }) + } + + def update_pending(db: SQL.Database, pending: List[Entry]): Boolean = { + val old_pending = read_pending(db) + val (delete, insert) = Library.symmetric_difference(old_pending, pending) + + if (delete.nonEmpty) { + db.using_statement( + Pending.table.delete() + SQL.where(Generic.sql_member(names = delete.map(_.name))) + )(_.execute()) + } + + for (entry <- insert) { + db.using_statement(Pending.table.insert()) { stmt => + stmt.string(1) = entry.name + stmt.string(2) = cat_lines(entry.deps) + stmt.string(3) = JSON.Format(entry.info) + stmt.execute() + } + } + + delete.nonEmpty || insert.nonEmpty + } + + def read_running(db: SQL.Database): List[Build_Job.Abstract] = + db.using_statement(Running.table.select() + SQL.order_by(List(Running.name))) { stmt => + List.from( + stmt.execute_query().iterator { res => + val name = res.string(Running.name) + val hostname = res.string(Running.hostname) + val numa_node = res.get_int(Running.numa_node) + Build_Job.Abstract(name, Build_Job.Node_Info(hostname, numa_node)) + }) + } + + def update_running(db: SQL.Database, running: Map[String, Build_Job]): Boolean = { + val old_running = read_running(db) + val abs_running = running.valuesIterator.map(_.make_abstract).toList + + val (delete, insert) = Library.symmetric_difference(old_running, abs_running) + + if (delete.nonEmpty) { + db.using_statement( + Running.table.delete() + SQL.where(Generic.sql_member(names = delete.map(_.job_name))) + )(_.execute()) + } + + for (job <- insert) { + db.using_statement(Running.table.insert()) { stmt => + stmt.string(1) = job.job_name + stmt.string(2) = job.node_info.hostname + stmt.int(3) = job.node_info.numa_node + stmt.execute() + } + } + + delete.nonEmpty || insert.nonEmpty + } + + def read_results(db: SQL.Database, names: List[String] = Nil): Map[String, Build_Job.Result] = + db.using_statement( + Results.table.select() + if_proper(names, SQL.where_member(Results.name.toString, names)) + ) { stmt => + Map.from( + stmt.execute_query().iterator { res => + val name = res.string(Results.name) + val hostname = res.string(Results.hostname) + val numa_node = res.get_int(Results.numa_node) + val rc = res.int(Results.rc) + val out = res.string(Results.out) + val err = res.string(Results.err) + val timing_elapsed = res.long(Results.timing_elapsed) + val timing_cpu = res.long(Results.timing_cpu) + val timing_gc = res.long(Results.timing_gc) + val node_info = Build_Job.Node_Info(hostname, numa_node) + val process_result = + Process_Result(rc, + out_lines = split_lines(out), + err_lines = split_lines(err), + timing = Timing(Time.ms(timing_elapsed), Time.ms(timing_cpu), Time.ms(timing_gc))) + name -> Build_Job.Result(node_info, process_result) + }) + } + + def read_results_name(db: SQL.Database): Set[String] = + db.using_statement(Results.table.select(List(Results.name))) { stmt => + Set.from(stmt.execute_query().iterator(_.string(Results.name))) + } + + def update_results(db: SQL.Database, results: Map[String, Build_Process.Result]): Boolean = { + val old_results = read_results_name(db) + val insert = results.iterator.filterNot(p => !old_results.contains(p._1)).toList + + for ((name, result) <- insert) { + val node_info = result.node_info + val process_result = result.process_result + db.using_statement(Results.table.insert()) { stmt => + stmt.string(1) = name + stmt.string(2) = node_info.hostname + stmt.int(3) = node_info.numa_node + stmt.int(4) = process_result.rc + stmt.string(5) = cat_lines(process_result.out_lines) + stmt.string(6) = cat_lines(process_result.err_lines) + stmt.long(7) = process_result.timing.elapsed.ms + stmt.long(8) = process_result.timing.cpu.ms + stmt.long(9) = process_result.timing.gc.ms + stmt.execute() + } + } + + insert.nonEmpty + } + + def write_config(db: SQL.Database, instance: String, hostname: String, options: Options): Unit = + db.using_statement(Config.table.insert()) { stmt => + stmt.string(1) = instance + stmt.string(2) = Isabelle_System.getenv("ML_IDENTIFIER") + stmt.string(3) = options.changed(Options.init(prefs = "")).mkString("\u0001") + stmt.execute() + } + + def read_state(db: SQL.Database, instance: String): (Long, Int) = + db.using_statement( + State.table.select() + SQL.where(Generic.sql_equal(instance = instance)) + ) { stmt => + (stmt.execute_query().iterator { res => + val serial = res.long(State.serial) + val numa_index = res.int(State.numa_index) + (serial, numa_index) + }).nextOption.getOrElse(error("No build state instance " + instance + " in database " + db)) + } + + def write_state(db: SQL.Database, instance: String, serial: Long, numa_index: Int): Unit = { + db.using_statement( + State.table.delete() + SQL.where(Generic.sql_equal(instance = instance)))(_.execute()) + db.using_statement(State.table.insert()) { stmt => + stmt.string(1) = instance + stmt.long(2) = serial + stmt.int(3) = numa_index + stmt.execute() + } + } + + def init_database( + db: SQL.Database, + instance: String, + hostname: String, + options: Options + ): Unit = { + val tables = + List(Config.table, State.table, Pending.table, Running.table, Results.table) + + for (table <- tables) db.create_table(table) + + val old_pending = Data.read_pending(db) + if (old_pending.nonEmpty) { + error("Cannot init build process, because of unfinished " + + commas_quote(old_pending.map(_.name))) + } + + for (table <- tables) db.using_statement(table.delete())(_.execute()) + + write_config(db, instance, hostname, options) + write_state(db, instance, 0, 0) + } + + def update_database(db: SQL.Database, instance: String, state: State): State = { + val ch1 = update_pending(db, state.pending) + val ch2 = update_running(db, state.running) + val ch3 = update_results(db, state.results) + + val (serial0, numa_index0) = read_state(db, instance) + val serial = if (ch1 || ch2 || ch3) serial0 + 1 else serial0 + if (serial != serial0) write_state(db, instance, serial, state.numa_index) + + state.copy(serial = serial) + } + } + + /* main process */ def session_finished(session_name: String, process_result: Process_Result): String = @@ -235,7 +499,8 @@ } } -class Build_Process(protected val build_context: Build_Process.Context) { +class Build_Process(protected val build_context: Build_Process.Context) extends AutoCloseable { + protected val instance: String = UUID.random().toString protected val store: Sessions.Store = build_context.store protected val build_options: Options = store.options protected val build_deps: Sessions.Deps = build_context.deps @@ -249,6 +514,15 @@ case log_file => Logger.make(Some(Path.explode(log_file))) } + protected val hostname: String = Isabelle_System.hostname() + + protected val database: Option[SQL.Database] = + if (!build_options.bool("build_database") || true /*FIXME*/) None + else if (store.database_server) Some(store.open_database_server()) + else Some(SQLite.open_database(Build_Process.Data.database)) + + def close(): Unit = database.map(_.close()) + // global state protected var _state: Build_Process.State = init_state() @@ -327,7 +601,7 @@ _state = _state. remove_pending(session_name). remove_running(session_name). - make_result(session_name, false, output_heap, process_result_tail) + make_result(session_name, false, output_heap, process_result_tail, node_info = job.node_info) } } @@ -395,9 +669,10 @@ val job = synchronized { val (numa_node, state1) = _state.numa_next(build_context.numa_nodes) + val node_info = Build_Job.Node_Info(hostname, numa_node) val job = new Build_Job.Build_Session(progress, session_background, store, do_store, - resources, build_context.session_setup, input_heaps, numa_node) + resources, build_context.session_setup, input_heaps, node_info) _state = state1.add_running(session_name, job) job } @@ -413,6 +688,24 @@ } } + protected def setup_database(): Unit = + for (db <- database) { + synchronized { + db.transaction { + Build_Process.Data.init_database(db, instance, hostname, build_options) + } + } + db.rebuild() + } + protected def sync_database(): Unit = + for (db <- database) { + synchronized { + db.transaction { + _state = Build_Process.Data.update_database(db, instance, _state) + } + } + } + protected def sleep(): Unit = Isabelle_Thread.interrupt_handler(_ => progress.stop()) { build_options.seconds("editor_input_delay").sleep() @@ -424,14 +717,18 @@ Map.empty[String, Process_Result] } else { + setup_database() while (!finished()) { if (progress.stopped) stop_running() for (job <- finished_running()) finish_job(job) next_pending() match { - case Some(name) => start_job(name) - case None => sleep() + case Some(name) => + start_job(name) + case None => + sync_database() + sleep() } } synchronized { diff -r 84ca5e036897 -r 44fe9fe96130 src/Pure/library.scala --- a/src/Pure/library.scala Sat Feb 25 17:45:10 2023 +0100 +++ b/src/Pure/library.scala Sun Feb 26 11:55:24 2023 +0100 @@ -271,6 +271,9 @@ case _ => error("Single argument expected") } + def symmetric_difference[A](xs: List[A], ys: List[A]): (List[A], List[A]) = + (xs.filterNot(ys.toSet), ys.filterNot(xs.toSet)) + /* proper values */