# HG changeset patch # User wenzelm # Date 1687960894 -7200 # Node ID 524ba83940c2dc9d163a8bccfbf5afd338c3c2bb # Parent 759c71cdaf2ab60a2c280a712bb331aaca995649# Parent 67e836ce3f04ece932f3aee7df7c09327bc51ab5 merged diff -r 759c71cdaf2a -r 524ba83940c2 src/Pure/General/sql.scala --- a/src/Pure/General/sql.scala Tue Jun 27 11:56:31 2023 +0100 +++ b/src/Pure/General/sql.scala Wed Jun 28 16:01:34 2023 +0200 @@ -253,8 +253,12 @@ def transaction_lock[A]( db: Database, more_tables: Tables = Tables.empty, - create: Boolean = false - )(body: => A): A = db.transaction { (tables ::: more_tables).lock(db, create = create); body } + create: Boolean = false, + synchronized: Boolean = false, + )(body: => A): A = { + def run: A = db.transaction { (tables ::: more_tables).lock(db, create = create); body } + if (synchronized) db.synchronized { run } else run + } def vacuum(db: Database, more_tables: Tables = Tables.empty): Unit = db.vacuum(tables = tables ::: more_tables) diff -r 759c71cdaf2a -r 524ba83940c2 src/Pure/ML/ml_heap.scala --- a/src/Pure/ML/ml_heap.scala Tue Jun 27 11:56:31 2023 +0100 +++ b/src/Pure/ML/ml_heap.scala Wed Jun 28 16:01:34 2023 +0200 @@ -110,29 +110,36 @@ }) } - def clean_entry(db: SQL.Database, name: String): Unit = - Data.transaction_lock(db, create = true) { Data.clean_entry(db, name) } + def clean_entry(db: SQL.Database, session_name: String): Unit = + Data.transaction_lock(db, create = true, synchronized = true) { + Data.clean_entry(db, session_name) + } - def get_entry(db: SQL.Database, name: String): Option[SHA1.Digest] = - Data.transaction_lock(db, create = true) { Data.get_entry(db, name) } + def get_entry(db: SQL.Database, session_name: String): Option[SHA1.Digest] = + Data.transaction_lock(db, create = true, synchronized = true) { + Data.get_entry(db, session_name) + } def store( database: Option[SQL.Database], + session_name: String, heap: Path, slice: Long, cache: Compress.Cache = Compress.Cache.none ): SHA1.Digest = { val digest = write_file_digest(heap) database match { + case None => case Some(db) => - val name = heap.file_name val size = File.space(heap).bytes - sha1_prefix.length - SHA1.digest_length val slices = (size.toDouble / slice.toDouble).ceil.toInt val step = (size.toDouble / slices.toDouble).ceil.toLong try { - Data.transaction_lock(db, create = true) { Data.prepare_entry(db, name) } + Data.transaction_lock(db, create = true, synchronized = true) { + Data.prepare_entry(db, session_name) + } for (i <- 0 until slices) { val j = i + 1 @@ -141,39 +148,48 @@ val content = Bytes.read_file(heap.file, offset = offset, limit = limit) .compress(cache = cache) - Data.transaction_lock(db) { Data.write_entry(db, name, i, content) } + Data.transaction_lock(db, synchronized = true) { + Data.write_entry(db, session_name, i, content) + } } - Data.transaction_lock(db) { Data.finish_entry(db, name, size, digest) } + Data.transaction_lock(db, synchronized = true) { + Data.finish_entry(db, session_name, size, digest) + } } catch { case exn: Throwable => - Data.transaction_lock(db, create = true) { Data.clean_entry(db, name) } + Data.transaction_lock(db, create = true, synchronized = true) { + Data.clean_entry(db, session_name) + } throw exn } - case None => } digest } def restore( - db: SQL.Database, + database: Option[SQL.Database], + session_name: String, heap: Path, cache: Compress.Cache = Compress.Cache.none ): Unit = { - val name = heap.file_name - Data.transaction_lock(db, create = true) { - val db_digest = Data.get_entry(db, name) - val file_digest = read_file_digest(heap) + database match { + case None => + case Some(db) => + Data.transaction_lock(db, create = true, synchronized = true) { + val db_digest = Data.get_entry(db, session_name) + val file_digest = read_file_digest(heap) - if (db_digest.isDefined && db_digest != file_digest) { - Isabelle_System.make_directory(heap.expand.dir) - Bytes.write(heap, Bytes.empty) - for (slice <- Data.read_entry(db, name)) { - Bytes.append(heap, slice.uncompress(cache = cache)) + if (db_digest.isDefined && db_digest != file_digest) { + Isabelle_System.make_directory(heap.expand.dir) + Bytes.write(heap, Bytes.empty) + for (slice <- Data.read_entry(db, session_name)) { + Bytes.append(heap, slice.uncompress(cache = cache)) + } + val digest = write_file_digest(heap) + if (db_digest.get != digest) error("Incoherent content for file " + heap) } - val digest = write_file_digest(heap) - if (db_digest.get != digest) error("Incoherent content for file " + heap) - } + } } } } diff -r 759c71cdaf2a -r 524ba83940c2 src/Pure/Thy/export.scala --- a/src/Pure/Thy/export.scala Tue Jun 27 11:56:31 2023 +0100 +++ b/src/Pure/Thy/export.scala Wed Jun 28 16:01:34 2023 +0200 @@ -29,25 +29,76 @@ /* SQL data model */ - object Data { - val session_name = SQL.Column.string("session_name").make_primary_key - val theory_name = SQL.Column.string("theory_name").make_primary_key - val name = SQL.Column.string("name").make_primary_key - val executable = SQL.Column.bool("executable") - val compressed = SQL.Column.bool("compressed") - val body = SQL.Column.bytes("body") + object Data extends SQL.Data() { + override lazy val tables = SQL.Tables(Base.table) - val table = - SQL.Table("isabelle_exports", - List(session_name, theory_name, name, executable, compressed, body)) + object Base { + val session_name = SQL.Column.string("session_name").make_primary_key + val theory_name = SQL.Column.string("theory_name").make_primary_key + val name = SQL.Column.string("name").make_primary_key + val executable = SQL.Column.bool("executable") + val compressed = SQL.Column.bool("compressed") + val body = SQL.Column.bytes("body") - val tables = SQL.Tables(table) + val table = + SQL.Table("isabelle_exports", + List(session_name, theory_name, name, executable, compressed, body)) + } def where_equal(session_name: String, theory_name: String = "", name: String = ""): SQL.Source = SQL.where_and( - Data.session_name.equal(session_name), - if_proper(theory_name, Data.theory_name.equal(theory_name)), - if_proper(name, Data.name.equal(name))) + Base.session_name.equal(session_name), + if_proper(theory_name, Base.theory_name.equal(theory_name)), + if_proper(name, Base.name.equal(name))) + + def readable_entry(db: SQL.Database, entry_name: Entry_Name): Boolean = { + db.execute_query_statementB( + Base.table.select(List(Base.name), + sql = where_equal(entry_name.session, entry_name.theory, entry_name.name))) + } + + def read_entry(db: SQL.Database, entry_name: Entry_Name, cache: XML.Cache): Option[Entry] = + db.execute_query_statementO[Entry]( + Base.table.select(List(Base.executable, Base.compressed, Base.body), + sql = Data.where_equal(entry_name.session, entry_name.theory, entry_name.name)), + { res => + val executable = res.bool(Base.executable) + val compressed = res.bool(Base.compressed) + val bytes = res.bytes(Base.body) + val body = Future.value(compressed, bytes) + Entry(entry_name, executable, body, cache) + } + ) + + def write_entry(db: SQL.Database, entry: Entry): Unit = { + val (compressed, bs) = entry.body.join + db.execute_statement(Base.table.insert(), body = { stmt => + stmt.string(1) = entry.session_name + stmt.string(2) = entry.theory_name + stmt.string(3) = entry.name + stmt.bool(4) = entry.executable + stmt.bool(5) = compressed + stmt.bytes(6) = bs + }) + } + + def read_theory_names(db: SQL.Database, session_name: String): List[String] = + db.execute_query_statement( + Base.table.select(List(Base.theory_name), distinct = true, + sql = Data.where_equal(session_name) + SQL.order_by(List(Base.theory_name))), + List.from[String], res => res.string(Base.theory_name)) + + def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] = + db.execute_query_statement( + Base.table.select(List(Base.theory_name, Base.name), + sql = Data.where_equal(session_name)) + SQL.order_by(List(Base.theory_name, Base.name)), + List.from[Entry_Name], + { res => + Entry_Name( + session = session_name, + theory = res.string(Base.theory_name), + name = res.string(Base.name)) + }) } def compound_name(a: String, b: String): String = @@ -63,45 +114,8 @@ } else Path.make(elems.drop(prune)) } - - def readable(db: SQL.Database): Boolean = { - db.execute_query_statementB( - Data.table.select(List(Data.name), - sql = Data.where_equal(session, theory, name))) - } - - def read(db: SQL.Database, cache: XML.Cache): Option[Entry] = - db.execute_query_statementO[Entry]( - Data.table.select(List(Data.executable, Data.compressed, Data.body), - sql = Data.where_equal(session, theory, name)), - { res => - val executable = res.bool(Data.executable) - val compressed = res.bool(Data.compressed) - val bytes = res.bytes(Data.body) - val body = Future.value(compressed, bytes) - Entry(this, executable, body, cache) - } - ) } - def read_theory_names(db: SQL.Database, session_name: String): List[String] = - db.execute_query_statement( - Data.table.select(List(Data.theory_name), distinct = true, - sql = Data.where_equal(session_name) + SQL.order_by(List(Data.theory_name))), - List.from[String], res => res.string(Data.theory_name)) - - def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] = - db.execute_query_statement( - Data.table.select(List(Data.theory_name, Data.name), - sql = Data.where_equal(session_name)) + SQL.order_by(List(Data.theory_name, Data.name)), - List.from[Entry_Name], - { res => - Entry_Name( - session = session_name, - theory = res.string(Data.theory_name), - name = res.string(Data.name)) - }) - def message(msg: String, theory_name: String, name: String): String = msg + " " + quote(name) + " for theory " + quote(theory_name) @@ -135,8 +149,8 @@ final class Entry private( val entry_name: Entry_Name, val executable: Boolean, - body: Future[(Boolean, Bytes)], - cache: XML.Cache + val body: Future[(Boolean, Bytes)], + val cache: XML.Cache ) { def session_name: String = entry_name.session def theory_name: String = entry_name.theory @@ -162,19 +176,6 @@ def text: String = bytes.text def yxml: XML.Body = YXML.parse_body(UTF8.decode_permissive(bytes), cache = cache) - - def write(db: SQL.Database): Unit = { - val (compressed, bs) = body.join - db.execute_statement(Data.table.insert(), body = - { stmt => - stmt.string(1) = session_name - stmt.string(2) = theory_name - stmt.string(3) = name - stmt.bool(4) = executable - stmt.bool(5) = compressed - stmt.bytes(6) = bs - }) - } } def make_regex(pattern: String): Regex = { @@ -199,6 +200,15 @@ (entry_name: Entry_Name) => regs.exists(_.matches(entry_name.compound_name)) } + def read_theory_names(db: SQL.Database, session_name: String): List[String] = + Data.transaction_lock(db) { Data.read_theory_names(db, session_name) } + + def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] = + Data.transaction_lock(db) { Data.read_entry_names(db, session_name) } + + def read_entry(db: SQL.Database, entry_name: Entry_Name, cache: XML.Cache): Option[Entry] = + Data.transaction_lock(db) { Data.read_entry(db, entry_name, cache) } + /* database consumer thread */ @@ -214,21 +224,21 @@ consume = { (args: List[(Entry, Boolean)]) => val results = - db.transaction { + Data.transaction_lock(db) { for ((entry, strict) <- args) yield { if (progress.stopped) { entry.cancel() Exn.Res(()) } - else if (entry.entry_name.readable(db)) { + else if (Data.readable_entry(db, entry.entry_name)) { if (strict) { val msg = message("Duplicate export", entry.theory_name, entry.name) errors.change(msg :: _) } Exn.Res(()) } - else Exn.capture { entry.write(db) } + else Exn.capture { Data.write_entry(db, entry) } } } (results, true) @@ -250,10 +260,8 @@ /* context for database access */ - def open_database_context(store: Store): Database_Context = { - val database_server = if (store.build_database_server) Some(store.open_database_server()) else None - new Database_Context(store, database_server) - } + def open_database_context(store: Store): Database_Context = + new Database_Context(store, store.maybe_open_database_server()) def open_session_context0(store: Store, session: String): Session_Context = open_database_context(store).open_session0(session, close_database_context = true) @@ -402,10 +410,10 @@ entry <- snapshot.all_exports.get(entry_name) } yield entry def db_entry: Option[Entry] = - db_hierarchy.view.map(database => - Export.Entry_Name(session = database.session, theory = theory, name = name) - .read(database.db, cache)) - .collectFirst({ case Some(entry) => entry }) + db_hierarchy.view.map { database => + val entry_name = Export.Entry_Name(session = database.session, theory = theory, name = name) + read_entry(database.db, entry_name, cache) + }.collectFirst({ case Some(entry) => entry }) snapshot_entry orElse db_entry } @@ -527,7 +535,7 @@ val matcher = make_matcher(export_patterns) for { entry_name <- entry_names if matcher(entry_name) - entry <- entry_name.read(db, store.cache) + entry <- read_entry(db, entry_name, store.cache) } { val path = export_dir + entry_name.make_path(prune = export_prune) progress.echo("export " + path + (if (entry.executable) " (executable)" else "")) diff -r 759c71cdaf2a -r 524ba83940c2 src/Pure/Thy/store.scala --- a/src/Pure/Thy/store.scala Tue Jun 27 11:56:31 2023 +0100 +++ b/src/Pure/Thy/store.scala Wed Jun 28 16:01:34 2023 +0200 @@ -79,7 +79,8 @@ object Data extends SQL.Data() { override lazy val tables = - SQL.Tables(Session_Info.table, Sources.table, Export.Data.table, Document_Build.Data.table) + SQL.Tables(Session_Info.table, Sources.table, + Export.Data.Base.table, Document_Build.Data.table) object Session_Info { val session_name = SQL.Column.string("session_name").make_primary_key @@ -232,8 +233,8 @@ error("Missing heap image for session " + quote(name) + " -- expected in:\n" + cat_lines(input_dirs.map(dir => " " + File.standard_path(dir)))) - def heap_shasum(database: Option[SQL.Database], name: String): SHA1.Shasum = { - def get_database = database.flatMap(ML_Heap.get_entry(_, name)) + def heap_shasum(database_server: Option[SQL.Database], name: String): SHA1.Shasum = { + def get_database = database_server.flatMap(ML_Heap.get_entry(_, name)) def get_file = find_heap(name).flatMap(ML_Heap.read_file_digest) get_database orElse get_file match { @@ -266,15 +267,16 @@ port = options.int("build_database_ssh_port"))), ssh_close = true) + def maybe_open_database_server(): Option[SQL.Database] = + if (build_database_server) Some(open_database_server()) else None + def open_build_database(path: Path): SQL.Database = if (build_database_server) open_database_server() else SQLite.open_database(path, restrict = true) - def maybe_open_build_database(path: Path): Option[SQL.Database] = - if (build_database_test) Some(open_build_database(path)) else None - - def maybe_open_heaps_database(): Option[SQL.Database] = - if (build_database_test && build_database_server) Some(open_database_server()) else None + def maybe_open_build_database( + path: Path = Path.explode("$ISABELLE_HOME_USER/build.db") + ): Option[SQL.Database] = if (build_database_test) Some(open_build_database(path)) else None def try_open_database( name: String, @@ -301,12 +303,18 @@ def open_database(name: String, output: Boolean = false): SQL.Database = try_open_database(name, output = output) getOrElse error_database(name) - def prepare_output(): Unit = Isabelle_System.make_directory(output_dir + Path.basic("log")) - - def clean_output(name: String, init: Boolean = false): Option[Boolean] = { + def clean_output( + database_server: Option[SQL.Database], + name: String, + session_init: Boolean = false + ): Option[Boolean] = { val relevant_db = - build_database_server && - using_option(try_open_database(name))(init_session_info(_, name)).getOrElse(false) + database_server match { + case Some(db) => + ML_Heap.clean_entry(db, name) + clean_session_info(db, name) + case None => false + } val del = for { @@ -316,12 +324,8 @@ path = dir + file if path.is_file } yield path.file.delete - using_optional(maybe_open_heaps_database()) { database => - database.foreach(ML_Heap.clean_entry(_, name)) - } - - if (init) { - using(open_database(name, output = true))(init_session_info(_, name)) + if (database_server.isEmpty && session_init) { + using(open_database(name, output = true))(clean_session_info(_, name)) } if (relevant_db || del.nonEmpty) Some(del.forall(identity)) else None @@ -382,8 +386,8 @@ Store.Data.Session_Info.table.select(List(Store.Data.Session_Info.session_name), sql = Store.Data.Session_Info.session_name.where_equal(name))) - def init_session_info(db: SQL.Database, name: String): Boolean = - Store.Data.transaction_lock(db, create = true) { + def clean_session_info(db: SQL.Database, name: String): Boolean = + Store.Data.transaction_lock(db, create = true, synchronized = true) { val already_defined = session_info_defined(db, name) db.execute_statement( @@ -395,7 +399,7 @@ sql = Store.Data.Sources.where_equal(name))) db.execute_statement( - Export.Data.table.delete(sql = Export.Data.session_name.where_equal(name))) + Export.Data.Base.table.delete(sql = Export.Data.Base.session_name.where_equal(name))) db.execute_statement( Document_Build.Data.table.delete(sql = Document_Build.Data.session_name.where_equal(name))) @@ -410,7 +414,7 @@ build_log: Build_Log.Session_Info, build: Store.Build_Info ): Unit = { - Store.Data.transaction_lock(db) { + Store.Data.transaction_lock(db, synchronized = true) { Store.Data.write_sources(db, session_name, sources) Store.Data.write_session_info(db, cache.compress, session_name, build_log, build) } diff -r 759c71cdaf2a -r 524ba83940c2 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Tue Jun 27 11:56:31 2023 +0100 +++ b/src/Pure/Tools/build.scala Wed Jun 28 16:01:34 2023 +0200 @@ -81,6 +81,14 @@ /* build */ + def build_results(options: Options, context: Build_Process.Context, progress: Progress): Results = + Isabelle_Thread.uninterruptible { + val engine = get_engine(options.string("build_engine")) + using(engine.init(context, progress)) { build_process => + Results(context, build_process.run()) + } + } + def build( options: Options, selection: Sessions.Selection = Sessions.Selection.empty, @@ -163,32 +171,24 @@ /* build process and results */ val build_context = - Build_Process.Context(store, build_deps, progress = progress, + Build_Process.init_context(store, build_deps, progress = progress, hostname = hostname(build_options), build_heap = build_heap, numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build, no_build = no_build, session_setup = session_setup, master = true) - store.prepare_output() - build_context.prepare_database() - if (clean_build) { - for (name <- full_sessions.imports_descendants(full_sessions_selection)) { - store.clean_output(name) match { - case None => - case Some(true) => progress.echo("Cleaned " + name) - case Some(false) => progress.echo(name + " FAILED to clean") + using_optional(store.maybe_open_database_server()) { database_server => + for (name <- full_sessions.imports_descendants(full_sessions_selection)) { + store.clean_output(database_server, name) match { + case None => + case Some(true) => progress.echo("Cleaned " + name) + case Some(false) => progress.echo(name + " FAILED to clean") + } } } } - val results = - Isabelle_Thread.uninterruptible { - val engine = get_engine(build_options.string("build_engine")) - using(engine.init(build_context, progress)) { build_process => - val res = build_process.run() - Results(build_context, res) - } - } + val results = build_results(build_options, build_context, progress) if (export_files) { for (name <- full_sessions_selection.iterator if results(name).ok) { @@ -379,25 +379,72 @@ /** "isabelle build_worker" **/ + /* identified builds */ + + def read_builds(options: Options): List[Build_Process.Build] = + using_option(Store(options).maybe_open_build_database())( + Build_Process.read_builds).getOrElse(Nil).filter(_.active) + + def print_builds(options: Options, builds: List[Build_Process.Build]): String = + using_optional(Store(options).maybe_open_build_database()) { build_database => + val print_database = + build_database match { + case None => "" + case Some(db) => " (database: " + db + ")" + } + if (builds.isEmpty) "No build processes available" + print_database + else { + "Available build processes" + print_database + + (for ((build, i) <- builds.iterator.zipWithIndex) + yield { + "\n " + (i + 1) + ": " + build.build_uuid + + " (platform: " + build.ml_platform + + ", start: " + Build_Log.print_date(build.start) + ")" + }).mkString + } + } + + def id_builds( + options: Options, + id: String, + builds: List[Build_Process.Build] + ): Build_Process.Build = + (id, builds.length) match { + case (Value.Int(i), n) if 1 <= i && i <= n => builds(i - 1) + case (UUID(_), _) if builds.exists(_.build_uuid == id) => builds.find(_.build_uuid == id).get + case ("", 0) => error(print_builds(options, builds)) + case ("", 1) => builds.head + case _ => cat_error("Cannot identify build process " + quote(id), print_builds(options, builds)) + } + + /* build_worker */ def build_worker( options: Options, - build_uuid: String, + build_master: Build_Process.Build, progress: Progress = new Progress, dirs: List[Path] = Nil, - infos: List[Sessions.Info] = Nil, numa_shuffling: Boolean = false, max_jobs: Int = 1, - session_setup: (String, Session) => Unit = (_, _) => (), cache: Term.Cache = Term.Cache.make() ): Results = { val store = build_init(options, cache = cache) val build_options = store.options - progress.echo("build worker for " + build_uuid) - progress.echo_warning("FIXME") - ??? + val sessions_structure = + Sessions.load_structure(build_options, dirs = dirs). + selection(Sessions.Selection(sessions = build_master.sessions)) + + val build_deps = + Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors + + val build_context = + Build_Process.init_context(store, build_deps, progress = progress, + hostname = hostname(build_options), numa_shuffling = numa_shuffling, max_jobs = max_jobs, + build_uuid = build_master.build_uuid) + + build_results(build_options, build_context, progress) } @@ -406,51 +453,63 @@ val isabelle_tool2 = Isabelle_Tool("build_worker", "external worker for session build process", Scala_Project.here, { args => + var build_id = "" + var list_builds = false var numa_shuffling = false var dirs: List[Path] = Nil var max_jobs = 1 - var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS) + var options = + Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: + List(Options.Spec.make("build_database_test"))) var verbose = false - var build_uuid = "" val getopts = Getopts(""" -Usage: isabelle build_worker [OPTIONS] ...] +Usage: isabelle build_worker [OPTIONS] Options are: -N cyclic shuffling of NUMA CPU nodes (performance tuning) - -U UUID Universally Unique Identifier of the build process -d DIR include session directory + -i ID identify build process, either via index (starting from 1) or + Universally Unique Identifier (UUID) -j INT maximum number of parallel jobs (default 1) + -l list build processes -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -v verbose Run as external worker for session build process, as identified via - option -U UUID. + option -i. The latter can be omitted, if there is exactly one build. """, "N" -> (_ => numa_shuffling = true), - "U:" -> (arg => build_uuid = arg), "d:" -> (arg => dirs = dirs ::: List(Path.explode(arg))), + "i:" -> (arg => build_id = arg), "j:" -> (arg => max_jobs = Value.Int.parse(arg)), + "l" -> (_ => list_builds = true), "o:" -> (arg => options = options + arg), "v" -> (_ => verbose = true)) val more_args = getopts(args) if (more_args.nonEmpty) getopts.usage() - if (build_uuid.isEmpty) error("Missing UUID for build process (option -U)") - val progress = new Console_Progress(verbose = verbose) - val results = - progress.interrupt_handler { - build_worker(options, build_uuid, - progress = progress, - dirs = dirs, - numa_shuffling = Host.numa_check(progress, numa_shuffling), - max_jobs = max_jobs) - } + val builds = read_builds(options) + + if (list_builds) progress.echo(print_builds(options, builds)) + + if (!list_builds || build_id.nonEmpty) { + val build = id_builds(options, build_id, builds) - sys.exit(results.rc) + val results = + progress.interrupt_handler { + build_worker(options, build, + progress = progress, + dirs = dirs, + numa_shuffling = Host.numa_check(progress, numa_shuffling), + max_jobs = max_jobs) + } + + sys.exit(results.rc) + } }) diff -r 759c71cdaf2a -r 524ba83940c2 src/Pure/Tools/build_job.scala --- a/src/Pure/Tools/build_job.scala Tue Jun 27 11:56:31 2023 +0100 +++ b/src/Pure/Tools/build_job.scala Wed Jun 28 16:01:34 2023 +0200 @@ -23,11 +23,13 @@ build_context: Build_Process.Context, progress: Progress, log: Logger, + database_server: Option[SQL.Database], session_background: Sessions.Background, input_shasum: SHA1.Shasum, node_info: Host.Node_Info ): Session_Job = { - new Session_Job(build_context, progress, log, session_background, input_shasum, node_info) + new Session_Job(build_context, progress, log, database_server, + session_background, input_shasum, node_info) } object Session_Context { @@ -93,6 +95,7 @@ build_context: Build_Process.Context, progress: Progress, log: Logger, + database_server: Option[SQL.Database], session_background: Sessions.Background, input_shasum: SHA1.Shasum, node_info: Host.Node_Info @@ -453,8 +456,7 @@ val heap = store.output_heap(session_name) if (process_result.ok && store_heap && heap.is_file) { val slice = Space.MiB(options.real("build_database_slice")).bytes - val digest = - using_optional(store.maybe_open_heaps_database())(ML_Heap.store(_, heap, slice)) + val digest = ML_Heap.store(database_server, session_name, heap, slice) SHA1.shasum(digest, session_name) } else SHA1.no_shasum diff -r 759c71cdaf2a -r 524ba83940c2 src/Pure/Tools/build_process.scala --- a/src/Pure/Tools/build_process.scala Tue Jun 27 11:56:31 2023 +0100 +++ b/src/Pure/Tools/build_process.scala Wed Jun 28 16:01:34 2023 +0200 @@ -16,79 +16,89 @@ object Build_Process { /** static context **/ - object Context { - def apply( - store: Store, - build_deps: Sessions.Deps, - progress: Progress = new Progress, - ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"), - hostname: String = Isabelle_System.hostname(), - numa_shuffling: Boolean = false, - build_heap: Boolean = false, - max_jobs: Int = 1, - fresh_build: Boolean = false, - no_build: Boolean = false, - session_setup: (String, Session) => Unit = (_, _) => (), - build_uuid: String = UUID.random().toString, - master: Boolean = false, - ): Context = { - val sessions_structure = build_deps.sessions_structure - val build_graph = sessions_structure.build_graph + def init_context( + store: Store, + build_deps: Sessions.Deps, + progress: Progress = new Progress, + ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"), + hostname: String = Isabelle_System.hostname(), + numa_shuffling: Boolean = false, + build_heap: Boolean = false, + max_jobs: Int = 1, + fresh_build: Boolean = false, + no_build: Boolean = false, + session_setup: (String, Session) => Unit = (_, _) => (), + build_uuid: String = UUID.random().toString, + master: Boolean = false, + ): Context = { + val sessions_structure = build_deps.sessions_structure + val build_graph = sessions_structure.build_graph - val sessions = - Map.from( - for ((name, (info, _)) <- build_graph.iterator) - yield { - val deps = info.parent.toList - val ancestors = sessions_structure.build_requirements(deps) - val sources_shasum = build_deps.sources_shasum(name) - val session_context = - Build_Job.Session_Context.load( - build_uuid, name, deps, ancestors, info.session_prefs, sources_shasum, - info.timeout, store, progress = progress) - name -> session_context - }) + val sessions = + Map.from( + for ((name, (info, _)) <- build_graph.iterator) + yield { + val deps = info.parent.toList + val ancestors = sessions_structure.build_requirements(deps) + val sources_shasum = build_deps.sources_shasum(name) + val session_context = + Build_Job.Session_Context.load( + build_uuid, name, deps, ancestors, info.session_prefs, sources_shasum, + info.timeout, store, progress = progress) + name -> session_context + }) - val sessions_time = { - val maximals = build_graph.maximals.toSet - def descendants_time(name: String): Double = { - if (maximals.contains(name)) sessions(name).old_time.seconds - else { - val descendants = build_graph.all_succs(List(name)).toSet - val g = build_graph.restrict(descendants) - (0.0 :: g.maximals.flatMap { desc => - val ps = g.all_preds(List(desc)) - if (ps.exists(p => !sessions.isDefinedAt(p))) None - else Some(ps.map(p => sessions(p).old_time.seconds).sum) - }).max + val sessions_time = { + val maximals = build_graph.maximals.toSet + def descendants_time(name: String): Double = { + if (maximals.contains(name)) sessions(name).old_time.seconds + else { + val descendants = build_graph.all_succs(List(name)).toSet + val g = build_graph.restrict(descendants) + (0.0 :: g.maximals.flatMap { desc => + val ps = g.all_preds(List(desc)) + if (ps.exists(p => !sessions.isDefinedAt(p))) None + else Some(ps.map(p => sessions(p).old_time.seconds).sum) + }).max + } + } + Map.from( + for (name <- sessions.keysIterator) + yield name -> descendants_time(name)).withDefaultValue(0.0) + } + + val ordering = + new Ordering[String] { + def compare(name1: String, name2: String): Int = + sessions_time(name2) compare sessions_time(name1) match { + case 0 => + sessions(name2).timeout compare sessions(name1).timeout match { + case 0 => name1 compare name2 + case ord => ord + } + case ord => ord } - } - Map.from( - for (name <- sessions.keysIterator) - yield name -> descendants_time(name)).withDefaultValue(0.0) } - val ordering = - new Ordering[String] { - def compare(name1: String, name2: String): Int = - sessions_time(name2) compare sessions_time(name1) match { - case 0 => - sessions(name2).timeout compare sessions(name1).timeout match { - case 0 => name1 compare name2 - case ord => ord - } - case ord => ord - } - } + Isabelle_System.make_directory(store.output_dir + Path.basic("log")) - val numa_nodes = Host.numa_nodes(enabled = numa_shuffling) - new Context(store, build_deps, sessions, ordering, ml_platform, hostname, numa_nodes, - build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build, - no_build = no_build, session_setup, build_uuid = build_uuid, master = master) + using_option(store.maybe_open_build_database()) { db => + val shared_db = db.is_postgresql + Data.transaction_lock(db, create = true) { + Data.clean_build(db) + if (shared_db) Store.Data.tables.lock(db, create = true) + } + Data.vacuum(db, more_tables = if (shared_db) Store.Data.tables else SQL.Tables.empty) } + + val numa_nodes = Host.numa_nodes(enabled = numa_shuffling) + + new Context(store, build_deps, sessions, ordering, ml_platform, hostname, numa_nodes, + build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build, + no_build = no_build, session_setup, build_uuid = build_uuid, master = master) } - final class Context private( + final class Context private[Build_Process]( val store: Store, val build_deps: Sessions.Deps, val sessions: State.Sessions, @@ -121,17 +131,6 @@ case None => Nil } - def prepare_database(): Unit = { - using_option(store.maybe_open_build_database(Data.database)) { db => - val shared_db = db.is_postgresql - Data.transaction_lock(db, create = true) { - Data.clean_build(db) - if (shared_db) Store.Data.tables.lock(db, create = true) - } - Data.vacuum(db, more_tables = if (shared_db) Store.Data.tables else SQL.Tables.empty) - } - } - def store_heap(name: String): Boolean = build_heap || Sessions.is_pure(name) || sessions.valuesIterator.exists(_.ancestors.contains(name)) @@ -148,8 +147,11 @@ ml_platform: String, options: String, start: Date, - stop: Option[Date] - ) + stop: Option[Date], + sessions: List[String] + ) { + def active: Boolean = stop.isEmpty + } case class Worker( worker_uuid: String, // Database_Progress.agent_uuid @@ -333,18 +335,25 @@ val table = make_table("", List(build_uuid, ml_platform, options, start, stop)) } - def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] = - db.execute_query_statement( - Base.table.select(sql = Generic.sql_where(build_uuid = build_uuid)), - List.from[Build], - { res => - val build_uuid = res.string(Base.build_uuid) - val ml_platform = res.string(Base.ml_platform) - val options = res.string(Base.options) - val start = res.date(Base.start) - val stop = res.get_date(Base.stop) - Build(build_uuid, ml_platform, options, start, stop) - }) + def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] = { + val builds = + db.execute_query_statement( + Base.table.select(sql = Generic.sql_where(build_uuid = build_uuid)), + List.from[Build], + { res => + val build_uuid = res.string(Base.build_uuid) + val ml_platform = res.string(Base.ml_platform) + val options = res.string(Base.options) + val start = res.date(Base.start) + val stop = res.get_date(Base.stop) + Build(build_uuid, ml_platform, options, start, stop, Nil) + }) + + for (build <- builds.sortBy(_.start)(Date.Ordering)) yield { + val sessions = Data.read_sessions_domain(db, build_uuid = build.build_uuid) + build.copy(sessions = sessions.toList.sorted) + } + } def start_build( db: SQL.Database, @@ -399,14 +408,23 @@ old_time, old_command_timings, build_uuid)) } - def read_sessions_domain(db: SQL.Database): Set[String] = + def read_sessions_domain(db: SQL.Database, build_uuid: String = ""): Set[String] = db.execute_query_statement( - Sessions.table.select(List(Sessions.name)), + Sessions.table.select(List(Sessions.name), + sql = if_proper(build_uuid, Sessions.name.where_equal(build_uuid))), Set.from[String], res => res.string(Sessions.name)) - def read_sessions(db: SQL.Database, names: Iterable[String] = Nil): State.Sessions = + def read_sessions(db: SQL.Database, + names: Iterable[String] = Nil, + build_uuid: String = "" + ): State.Sessions = db.execute_query_statement( - Sessions.table.select(sql = if_proper(names, Sessions.name.where_member(names))), + Sessions.table.select( + sql = + SQL.where_and( + if_proper(names, Sessions.name.member(names)), + if_proper(build_uuid, Sessions.build_uuid.equal(build_uuid))) + ), Map.from[String, Build_Job.Session_Context], { res => val name = res.string(Sessions.name) @@ -423,7 +441,7 @@ } ) - def update_sessions(db:SQL.Database, sessions: State.Sessions): Boolean = { + def update_sessions(db: SQL.Database, sessions: State.Sessions): Boolean = { val old_sessions = read_sessions_domain(db) val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList @@ -780,6 +798,9 @@ state.set_serial(serial) } } + + def read_builds(db: SQL.Database): List[Build] = + Data.transaction_lock(db, create = true) { Data.read_builds(db) } } @@ -802,30 +823,40 @@ /* progress backed by database */ - private val _database: Option[SQL.Database] = - store.maybe_open_build_database(Build_Process.Data.database) + private val _database_server: Option[SQL.Database] = + try { store.maybe_open_database_server() } + catch { case exn: Throwable => close(); throw exn } + + private val _build_database: Option[SQL.Database] = + try { store.maybe_open_build_database() } + catch { case exn: Throwable => close(); throw exn } private val _host_database: Option[SQL.Database] = - store.maybe_open_build_database(Host.Data.database) + try { store.maybe_open_build_database(path = Host.Data.database) } + catch { case exn: Throwable => close(); throw exn } protected val (progress, worker_uuid) = synchronized { - _database match { + _build_database match { case None => (build_progress, UUID.random().toString) case Some(db) => - val progress_db = store.open_build_database(Progress.Data.database) - val progress = - new Database_Progress(progress_db, build_progress, - hostname = hostname, - context_uuid = build_uuid) - (progress, progress.agent_uuid) + try { + val progress_db = store.open_build_database(Progress.Data.database) + val progress = + new Database_Progress(progress_db, build_progress, + hostname = hostname, + context_uuid = build_uuid) + (progress, progress.agent_uuid) + } + catch { case exn: Throwable => close(); throw exn } } } protected val log: Logger = Logger.make_system_log(progress, build_options) def close(): Unit = synchronized { - _database.foreach(_.close()) - _host_database.foreach(_.close()) + Option(_database_server).flatten.foreach(_.close()) + Option(_build_database).flatten.foreach(_.close()) + Option(_host_database).flatten.foreach(_.close()) progress match { case db_progress: Database_Progress => db_progress.exit() @@ -839,7 +870,7 @@ private var _state: Build_Process.State = Build_Process.State() protected def synchronized_database[A](body: => A): A = synchronized { - _database match { + _build_database match { case None => body case Some(db) => Build_Process.Data.transaction_lock(db) { @@ -906,10 +937,8 @@ val cancelled = progress.stopped || !ancestor_results.forall(_.ok) if (!skipped && !cancelled) { - using_optional(store.maybe_open_heaps_database()) { database => - database.foreach( - ML_Heap.restore(_, store.output_heap(session_name), cache = store.cache.compress)) - } + ML_Heap.restore(_database_server, session_name, store.output_heap(session_name), + cache = store.cache.compress) } val result_name = (session_name, worker_uuid, build_uuid) @@ -945,10 +974,10 @@ (if (store_heap) "Building " else "Running ") + session_name + if_proper(node_info.numa_node, " on " + node_info) + " ...") - store.clean_output(session_name, init = true) + store.clean_output(_database_server, session_name, session_init = true) val build = - Build_Job.start_session(build_context, progress, log, + Build_Job.start_session(build_context, progress, log, _database_server, build_deps.background(session_name), input_shasum, node_info) val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build)) @@ -964,27 +993,27 @@ !Long_Name.is_qualified(job_name) protected final def start_build(): Unit = synchronized_database { - for (db <- _database) { + for (db <- _build_database) { Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform, build_context.sessions_structure.session_prefs) } } protected final def stop_build(): Unit = synchronized_database { - for (db <- _database) { + for (db <- _build_database) { Build_Process.Data.stop_build(db, build_uuid) } } protected final def start_worker(): Unit = synchronized_database { - for (db <- _database) { + for (db <- _build_database) { _state = _state.inc_serial Build_Process.Data.start_worker(db, worker_uuid, build_uuid, _state.serial) } } protected final def stop_worker(): Unit = synchronized_database { - for (db <- _database) { + for (db <- _build_database) { Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true) } } @@ -1060,7 +1089,7 @@ def snapshot(): Build_Process.Snapshot = synchronized_database { val (builds, workers) = - _database match { + _build_database match { case None => (Nil, Nil) case Some(db) => (Build_Process.Data.read_builds(db),