--- a/src/Pure/General/sql.scala Tue Jun 27 11:56:31 2023 +0100
+++ b/src/Pure/General/sql.scala Wed Jun 28 16:01:34 2023 +0200
@@ -253,8 +253,12 @@
def transaction_lock[A](
db: Database,
more_tables: Tables = Tables.empty,
- create: Boolean = false
- )(body: => A): A = db.transaction { (tables ::: more_tables).lock(db, create = create); body }
+ create: Boolean = false,
+ synchronized: Boolean = false,
+ )(body: => A): A = {
+ def run: A = db.transaction { (tables ::: more_tables).lock(db, create = create); body }
+ if (synchronized) db.synchronized { run } else run
+ }
def vacuum(db: Database, more_tables: Tables = Tables.empty): Unit =
db.vacuum(tables = tables ::: more_tables)
--- a/src/Pure/ML/ml_heap.scala Tue Jun 27 11:56:31 2023 +0100
+++ b/src/Pure/ML/ml_heap.scala Wed Jun 28 16:01:34 2023 +0200
@@ -110,29 +110,36 @@
})
}
- def clean_entry(db: SQL.Database, name: String): Unit =
- Data.transaction_lock(db, create = true) { Data.clean_entry(db, name) }
+ def clean_entry(db: SQL.Database, session_name: String): Unit =
+ Data.transaction_lock(db, create = true, synchronized = true) {
+ Data.clean_entry(db, session_name)
+ }
- def get_entry(db: SQL.Database, name: String): Option[SHA1.Digest] =
- Data.transaction_lock(db, create = true) { Data.get_entry(db, name) }
+ def get_entry(db: SQL.Database, session_name: String): Option[SHA1.Digest] =
+ Data.transaction_lock(db, create = true, synchronized = true) {
+ Data.get_entry(db, session_name)
+ }
def store(
database: Option[SQL.Database],
+ session_name: String,
heap: Path,
slice: Long,
cache: Compress.Cache = Compress.Cache.none
): SHA1.Digest = {
val digest = write_file_digest(heap)
database match {
+ case None =>
case Some(db) =>
- val name = heap.file_name
val size = File.space(heap).bytes - sha1_prefix.length - SHA1.digest_length
val slices = (size.toDouble / slice.toDouble).ceil.toInt
val step = (size.toDouble / slices.toDouble).ceil.toLong
try {
- Data.transaction_lock(db, create = true) { Data.prepare_entry(db, name) }
+ Data.transaction_lock(db, create = true, synchronized = true) {
+ Data.prepare_entry(db, session_name)
+ }
for (i <- 0 until slices) {
val j = i + 1
@@ -141,39 +148,48 @@
val content =
Bytes.read_file(heap.file, offset = offset, limit = limit)
.compress(cache = cache)
- Data.transaction_lock(db) { Data.write_entry(db, name, i, content) }
+ Data.transaction_lock(db, synchronized = true) {
+ Data.write_entry(db, session_name, i, content)
+ }
}
- Data.transaction_lock(db) { Data.finish_entry(db, name, size, digest) }
+ Data.transaction_lock(db, synchronized = true) {
+ Data.finish_entry(db, session_name, size, digest)
+ }
}
catch { case exn: Throwable =>
- Data.transaction_lock(db, create = true) { Data.clean_entry(db, name) }
+ Data.transaction_lock(db, create = true, synchronized = true) {
+ Data.clean_entry(db, session_name)
+ }
throw exn
}
- case None =>
}
digest
}
def restore(
- db: SQL.Database,
+ database: Option[SQL.Database],
+ session_name: String,
heap: Path,
cache: Compress.Cache = Compress.Cache.none
): Unit = {
- val name = heap.file_name
- Data.transaction_lock(db, create = true) {
- val db_digest = Data.get_entry(db, name)
- val file_digest = read_file_digest(heap)
+ database match {
+ case None =>
+ case Some(db) =>
+ Data.transaction_lock(db, create = true, synchronized = true) {
+ val db_digest = Data.get_entry(db, session_name)
+ val file_digest = read_file_digest(heap)
- if (db_digest.isDefined && db_digest != file_digest) {
- Isabelle_System.make_directory(heap.expand.dir)
- Bytes.write(heap, Bytes.empty)
- for (slice <- Data.read_entry(db, name)) {
- Bytes.append(heap, slice.uncompress(cache = cache))
+ if (db_digest.isDefined && db_digest != file_digest) {
+ Isabelle_System.make_directory(heap.expand.dir)
+ Bytes.write(heap, Bytes.empty)
+ for (slice <- Data.read_entry(db, session_name)) {
+ Bytes.append(heap, slice.uncompress(cache = cache))
+ }
+ val digest = write_file_digest(heap)
+ if (db_digest.get != digest) error("Incoherent content for file " + heap)
}
- val digest = write_file_digest(heap)
- if (db_digest.get != digest) error("Incoherent content for file " + heap)
- }
+ }
}
}
}
--- a/src/Pure/Thy/export.scala Tue Jun 27 11:56:31 2023 +0100
+++ b/src/Pure/Thy/export.scala Wed Jun 28 16:01:34 2023 +0200
@@ -29,25 +29,76 @@
/* SQL data model */
- object Data {
- val session_name = SQL.Column.string("session_name").make_primary_key
- val theory_name = SQL.Column.string("theory_name").make_primary_key
- val name = SQL.Column.string("name").make_primary_key
- val executable = SQL.Column.bool("executable")
- val compressed = SQL.Column.bool("compressed")
- val body = SQL.Column.bytes("body")
+ object Data extends SQL.Data() {
+ override lazy val tables = SQL.Tables(Base.table)
- val table =
- SQL.Table("isabelle_exports",
- List(session_name, theory_name, name, executable, compressed, body))
+ object Base {
+ val session_name = SQL.Column.string("session_name").make_primary_key
+ val theory_name = SQL.Column.string("theory_name").make_primary_key
+ val name = SQL.Column.string("name").make_primary_key
+ val executable = SQL.Column.bool("executable")
+ val compressed = SQL.Column.bool("compressed")
+ val body = SQL.Column.bytes("body")
- val tables = SQL.Tables(table)
+ val table =
+ SQL.Table("isabelle_exports",
+ List(session_name, theory_name, name, executable, compressed, body))
+ }
def where_equal(session_name: String, theory_name: String = "", name: String = ""): SQL.Source =
SQL.where_and(
- Data.session_name.equal(session_name),
- if_proper(theory_name, Data.theory_name.equal(theory_name)),
- if_proper(name, Data.name.equal(name)))
+ Base.session_name.equal(session_name),
+ if_proper(theory_name, Base.theory_name.equal(theory_name)),
+ if_proper(name, Base.name.equal(name)))
+
+ def readable_entry(db: SQL.Database, entry_name: Entry_Name): Boolean = {
+ db.execute_query_statementB(
+ Base.table.select(List(Base.name),
+ sql = where_equal(entry_name.session, entry_name.theory, entry_name.name)))
+ }
+
+ def read_entry(db: SQL.Database, entry_name: Entry_Name, cache: XML.Cache): Option[Entry] =
+ db.execute_query_statementO[Entry](
+ Base.table.select(List(Base.executable, Base.compressed, Base.body),
+ sql = Data.where_equal(entry_name.session, entry_name.theory, entry_name.name)),
+ { res =>
+ val executable = res.bool(Base.executable)
+ val compressed = res.bool(Base.compressed)
+ val bytes = res.bytes(Base.body)
+ val body = Future.value(compressed, bytes)
+ Entry(entry_name, executable, body, cache)
+ }
+ )
+
+ def write_entry(db: SQL.Database, entry: Entry): Unit = {
+ val (compressed, bs) = entry.body.join
+ db.execute_statement(Base.table.insert(), body = { stmt =>
+ stmt.string(1) = entry.session_name
+ stmt.string(2) = entry.theory_name
+ stmt.string(3) = entry.name
+ stmt.bool(4) = entry.executable
+ stmt.bool(5) = compressed
+ stmt.bytes(6) = bs
+ })
+ }
+
+ def read_theory_names(db: SQL.Database, session_name: String): List[String] =
+ db.execute_query_statement(
+ Base.table.select(List(Base.theory_name), distinct = true,
+ sql = Data.where_equal(session_name) + SQL.order_by(List(Base.theory_name))),
+ List.from[String], res => res.string(Base.theory_name))
+
+ def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] =
+ db.execute_query_statement(
+ Base.table.select(List(Base.theory_name, Base.name),
+ sql = Data.where_equal(session_name)) + SQL.order_by(List(Base.theory_name, Base.name)),
+ List.from[Entry_Name],
+ { res =>
+ Entry_Name(
+ session = session_name,
+ theory = res.string(Base.theory_name),
+ name = res.string(Base.name))
+ })
}
def compound_name(a: String, b: String): String =
@@ -63,45 +114,8 @@
}
else Path.make(elems.drop(prune))
}
-
- def readable(db: SQL.Database): Boolean = {
- db.execute_query_statementB(
- Data.table.select(List(Data.name),
- sql = Data.where_equal(session, theory, name)))
- }
-
- def read(db: SQL.Database, cache: XML.Cache): Option[Entry] =
- db.execute_query_statementO[Entry](
- Data.table.select(List(Data.executable, Data.compressed, Data.body),
- sql = Data.where_equal(session, theory, name)),
- { res =>
- val executable = res.bool(Data.executable)
- val compressed = res.bool(Data.compressed)
- val bytes = res.bytes(Data.body)
- val body = Future.value(compressed, bytes)
- Entry(this, executable, body, cache)
- }
- )
}
- def read_theory_names(db: SQL.Database, session_name: String): List[String] =
- db.execute_query_statement(
- Data.table.select(List(Data.theory_name), distinct = true,
- sql = Data.where_equal(session_name) + SQL.order_by(List(Data.theory_name))),
- List.from[String], res => res.string(Data.theory_name))
-
- def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] =
- db.execute_query_statement(
- Data.table.select(List(Data.theory_name, Data.name),
- sql = Data.where_equal(session_name)) + SQL.order_by(List(Data.theory_name, Data.name)),
- List.from[Entry_Name],
- { res =>
- Entry_Name(
- session = session_name,
- theory = res.string(Data.theory_name),
- name = res.string(Data.name))
- })
-
def message(msg: String, theory_name: String, name: String): String =
msg + " " + quote(name) + " for theory " + quote(theory_name)
@@ -135,8 +149,8 @@
final class Entry private(
val entry_name: Entry_Name,
val executable: Boolean,
- body: Future[(Boolean, Bytes)],
- cache: XML.Cache
+ val body: Future[(Boolean, Bytes)],
+ val cache: XML.Cache
) {
def session_name: String = entry_name.session
def theory_name: String = entry_name.theory
@@ -162,19 +176,6 @@
def text: String = bytes.text
def yxml: XML.Body = YXML.parse_body(UTF8.decode_permissive(bytes), cache = cache)
-
- def write(db: SQL.Database): Unit = {
- val (compressed, bs) = body.join
- db.execute_statement(Data.table.insert(), body =
- { stmt =>
- stmt.string(1) = session_name
- stmt.string(2) = theory_name
- stmt.string(3) = name
- stmt.bool(4) = executable
- stmt.bool(5) = compressed
- stmt.bytes(6) = bs
- })
- }
}
def make_regex(pattern: String): Regex = {
@@ -199,6 +200,15 @@
(entry_name: Entry_Name) => regs.exists(_.matches(entry_name.compound_name))
}
+ def read_theory_names(db: SQL.Database, session_name: String): List[String] =
+ Data.transaction_lock(db) { Data.read_theory_names(db, session_name) }
+
+ def read_entry_names(db: SQL.Database, session_name: String): List[Entry_Name] =
+ Data.transaction_lock(db) { Data.read_entry_names(db, session_name) }
+
+ def read_entry(db: SQL.Database, entry_name: Entry_Name, cache: XML.Cache): Option[Entry] =
+ Data.transaction_lock(db) { Data.read_entry(db, entry_name, cache) }
+
/* database consumer thread */
@@ -214,21 +224,21 @@
consume =
{ (args: List[(Entry, Boolean)]) =>
val results =
- db.transaction {
+ Data.transaction_lock(db) {
for ((entry, strict) <- args)
yield {
if (progress.stopped) {
entry.cancel()
Exn.Res(())
}
- else if (entry.entry_name.readable(db)) {
+ else if (Data.readable_entry(db, entry.entry_name)) {
if (strict) {
val msg = message("Duplicate export", entry.theory_name, entry.name)
errors.change(msg :: _)
}
Exn.Res(())
}
- else Exn.capture { entry.write(db) }
+ else Exn.capture { Data.write_entry(db, entry) }
}
}
(results, true)
@@ -250,10 +260,8 @@
/* context for database access */
- def open_database_context(store: Store): Database_Context = {
- val database_server = if (store.build_database_server) Some(store.open_database_server()) else None
- new Database_Context(store, database_server)
- }
+ def open_database_context(store: Store): Database_Context =
+ new Database_Context(store, store.maybe_open_database_server())
def open_session_context0(store: Store, session: String): Session_Context =
open_database_context(store).open_session0(session, close_database_context = true)
@@ -402,10 +410,10 @@
entry <- snapshot.all_exports.get(entry_name)
} yield entry
def db_entry: Option[Entry] =
- db_hierarchy.view.map(database =>
- Export.Entry_Name(session = database.session, theory = theory, name = name)
- .read(database.db, cache))
- .collectFirst({ case Some(entry) => entry })
+ db_hierarchy.view.map { database =>
+ val entry_name = Export.Entry_Name(session = database.session, theory = theory, name = name)
+ read_entry(database.db, entry_name, cache)
+ }.collectFirst({ case Some(entry) => entry })
snapshot_entry orElse db_entry
}
@@ -527,7 +535,7 @@
val matcher = make_matcher(export_patterns)
for {
entry_name <- entry_names if matcher(entry_name)
- entry <- entry_name.read(db, store.cache)
+ entry <- read_entry(db, entry_name, store.cache)
} {
val path = export_dir + entry_name.make_path(prune = export_prune)
progress.echo("export " + path + (if (entry.executable) " (executable)" else ""))
--- a/src/Pure/Thy/store.scala Tue Jun 27 11:56:31 2023 +0100
+++ b/src/Pure/Thy/store.scala Wed Jun 28 16:01:34 2023 +0200
@@ -79,7 +79,8 @@
object Data extends SQL.Data() {
override lazy val tables =
- SQL.Tables(Session_Info.table, Sources.table, Export.Data.table, Document_Build.Data.table)
+ SQL.Tables(Session_Info.table, Sources.table,
+ Export.Data.Base.table, Document_Build.Data.table)
object Session_Info {
val session_name = SQL.Column.string("session_name").make_primary_key
@@ -232,8 +233,8 @@
error("Missing heap image for session " + quote(name) + " -- expected in:\n" +
cat_lines(input_dirs.map(dir => " " + File.standard_path(dir))))
- def heap_shasum(database: Option[SQL.Database], name: String): SHA1.Shasum = {
- def get_database = database.flatMap(ML_Heap.get_entry(_, name))
+ def heap_shasum(database_server: Option[SQL.Database], name: String): SHA1.Shasum = {
+ def get_database = database_server.flatMap(ML_Heap.get_entry(_, name))
def get_file = find_heap(name).flatMap(ML_Heap.read_file_digest)
get_database orElse get_file match {
@@ -266,15 +267,16 @@
port = options.int("build_database_ssh_port"))),
ssh_close = true)
+ def maybe_open_database_server(): Option[SQL.Database] =
+ if (build_database_server) Some(open_database_server()) else None
+
def open_build_database(path: Path): SQL.Database =
if (build_database_server) open_database_server()
else SQLite.open_database(path, restrict = true)
- def maybe_open_build_database(path: Path): Option[SQL.Database] =
- if (build_database_test) Some(open_build_database(path)) else None
-
- def maybe_open_heaps_database(): Option[SQL.Database] =
- if (build_database_test && build_database_server) Some(open_database_server()) else None
+ def maybe_open_build_database(
+ path: Path = Path.explode("$ISABELLE_HOME_USER/build.db")
+ ): Option[SQL.Database] = if (build_database_test) Some(open_build_database(path)) else None
def try_open_database(
name: String,
@@ -301,12 +303,18 @@
def open_database(name: String, output: Boolean = false): SQL.Database =
try_open_database(name, output = output) getOrElse error_database(name)
- def prepare_output(): Unit = Isabelle_System.make_directory(output_dir + Path.basic("log"))
-
- def clean_output(name: String, init: Boolean = false): Option[Boolean] = {
+ def clean_output(
+ database_server: Option[SQL.Database],
+ name: String,
+ session_init: Boolean = false
+ ): Option[Boolean] = {
val relevant_db =
- build_database_server &&
- using_option(try_open_database(name))(init_session_info(_, name)).getOrElse(false)
+ database_server match {
+ case Some(db) =>
+ ML_Heap.clean_entry(db, name)
+ clean_session_info(db, name)
+ case None => false
+ }
val del =
for {
@@ -316,12 +324,8 @@
path = dir + file if path.is_file
} yield path.file.delete
- using_optional(maybe_open_heaps_database()) { database =>
- database.foreach(ML_Heap.clean_entry(_, name))
- }
-
- if (init) {
- using(open_database(name, output = true))(init_session_info(_, name))
+ if (database_server.isEmpty && session_init) {
+ using(open_database(name, output = true))(clean_session_info(_, name))
}
if (relevant_db || del.nonEmpty) Some(del.forall(identity)) else None
@@ -382,8 +386,8 @@
Store.Data.Session_Info.table.select(List(Store.Data.Session_Info.session_name),
sql = Store.Data.Session_Info.session_name.where_equal(name)))
- def init_session_info(db: SQL.Database, name: String): Boolean =
- Store.Data.transaction_lock(db, create = true) {
+ def clean_session_info(db: SQL.Database, name: String): Boolean =
+ Store.Data.transaction_lock(db, create = true, synchronized = true) {
val already_defined = session_info_defined(db, name)
db.execute_statement(
@@ -395,7 +399,7 @@
sql = Store.Data.Sources.where_equal(name)))
db.execute_statement(
- Export.Data.table.delete(sql = Export.Data.session_name.where_equal(name)))
+ Export.Data.Base.table.delete(sql = Export.Data.Base.session_name.where_equal(name)))
db.execute_statement(
Document_Build.Data.table.delete(sql = Document_Build.Data.session_name.where_equal(name)))
@@ -410,7 +414,7 @@
build_log: Build_Log.Session_Info,
build: Store.Build_Info
): Unit = {
- Store.Data.transaction_lock(db) {
+ Store.Data.transaction_lock(db, synchronized = true) {
Store.Data.write_sources(db, session_name, sources)
Store.Data.write_session_info(db, cache.compress, session_name, build_log, build)
}
--- a/src/Pure/Tools/build.scala Tue Jun 27 11:56:31 2023 +0100
+++ b/src/Pure/Tools/build.scala Wed Jun 28 16:01:34 2023 +0200
@@ -81,6 +81,14 @@
/* build */
+ def build_results(options: Options, context: Build_Process.Context, progress: Progress): Results =
+ Isabelle_Thread.uninterruptible {
+ val engine = get_engine(options.string("build_engine"))
+ using(engine.init(context, progress)) { build_process =>
+ Results(context, build_process.run())
+ }
+ }
+
def build(
options: Options,
selection: Sessions.Selection = Sessions.Selection.empty,
@@ -163,32 +171,24 @@
/* build process and results */
val build_context =
- Build_Process.Context(store, build_deps, progress = progress,
+ Build_Process.init_context(store, build_deps, progress = progress,
hostname = hostname(build_options), build_heap = build_heap,
numa_shuffling = numa_shuffling, max_jobs = max_jobs, fresh_build = fresh_build,
no_build = no_build, session_setup = session_setup, master = true)
- store.prepare_output()
- build_context.prepare_database()
-
if (clean_build) {
- for (name <- full_sessions.imports_descendants(full_sessions_selection)) {
- store.clean_output(name) match {
- case None =>
- case Some(true) => progress.echo("Cleaned " + name)
- case Some(false) => progress.echo(name + " FAILED to clean")
+ using_optional(store.maybe_open_database_server()) { database_server =>
+ for (name <- full_sessions.imports_descendants(full_sessions_selection)) {
+ store.clean_output(database_server, name) match {
+ case None =>
+ case Some(true) => progress.echo("Cleaned " + name)
+ case Some(false) => progress.echo(name + " FAILED to clean")
+ }
}
}
}
- val results =
- Isabelle_Thread.uninterruptible {
- val engine = get_engine(build_options.string("build_engine"))
- using(engine.init(build_context, progress)) { build_process =>
- val res = build_process.run()
- Results(build_context, res)
- }
- }
+ val results = build_results(build_options, build_context, progress)
if (export_files) {
for (name <- full_sessions_selection.iterator if results(name).ok) {
@@ -379,25 +379,72 @@
/** "isabelle build_worker" **/
+ /* identified builds */
+
+ def read_builds(options: Options): List[Build_Process.Build] =
+ using_option(Store(options).maybe_open_build_database())(
+ Build_Process.read_builds).getOrElse(Nil).filter(_.active)
+
+ def print_builds(options: Options, builds: List[Build_Process.Build]): String =
+ using_optional(Store(options).maybe_open_build_database()) { build_database =>
+ val print_database =
+ build_database match {
+ case None => ""
+ case Some(db) => " (database: " + db + ")"
+ }
+ if (builds.isEmpty) "No build processes available" + print_database
+ else {
+ "Available build processes" + print_database +
+ (for ((build, i) <- builds.iterator.zipWithIndex)
+ yield {
+ "\n " + (i + 1) + ": " + build.build_uuid +
+ " (platform: " + build.ml_platform +
+ ", start: " + Build_Log.print_date(build.start) + ")"
+ }).mkString
+ }
+ }
+
+ def id_builds(
+ options: Options,
+ id: String,
+ builds: List[Build_Process.Build]
+ ): Build_Process.Build =
+ (id, builds.length) match {
+ case (Value.Int(i), n) if 1 <= i && i <= n => builds(i - 1)
+ case (UUID(_), _) if builds.exists(_.build_uuid == id) => builds.find(_.build_uuid == id).get
+ case ("", 0) => error(print_builds(options, builds))
+ case ("", 1) => builds.head
+ case _ => cat_error("Cannot identify build process " + quote(id), print_builds(options, builds))
+ }
+
+
/* build_worker */
def build_worker(
options: Options,
- build_uuid: String,
+ build_master: Build_Process.Build,
progress: Progress = new Progress,
dirs: List[Path] = Nil,
- infos: List[Sessions.Info] = Nil,
numa_shuffling: Boolean = false,
max_jobs: Int = 1,
- session_setup: (String, Session) => Unit = (_, _) => (),
cache: Term.Cache = Term.Cache.make()
): Results = {
val store = build_init(options, cache = cache)
val build_options = store.options
- progress.echo("build worker for " + build_uuid)
- progress.echo_warning("FIXME")
- ???
+ val sessions_structure =
+ Sessions.load_structure(build_options, dirs = dirs).
+ selection(Sessions.Selection(sessions = build_master.sessions))
+
+ val build_deps =
+ Sessions.deps(sessions_structure, progress = progress, inlined_files = true).check_errors
+
+ val build_context =
+ Build_Process.init_context(store, build_deps, progress = progress,
+ hostname = hostname(build_options), numa_shuffling = numa_shuffling, max_jobs = max_jobs,
+ build_uuid = build_master.build_uuid)
+
+ build_results(build_options, build_context, progress)
}
@@ -406,51 +453,63 @@
val isabelle_tool2 = Isabelle_Tool("build_worker", "external worker for session build process",
Scala_Project.here,
{ args =>
+ var build_id = ""
+ var list_builds = false
var numa_shuffling = false
var dirs: List[Path] = Nil
var max_jobs = 1
- var options = Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS)
+ var options =
+ Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS :::
+ List(Options.Spec.make("build_database_test")))
var verbose = false
- var build_uuid = ""
val getopts = Getopts("""
-Usage: isabelle build_worker [OPTIONS] ...]
+Usage: isabelle build_worker [OPTIONS]
Options are:
-N cyclic shuffling of NUMA CPU nodes (performance tuning)
- -U UUID Universally Unique Identifier of the build process
-d DIR include session directory
+ -i ID identify build process, either via index (starting from 1) or
+ Universally Unique Identifier (UUID)
-j INT maximum number of parallel jobs (default 1)
+ -l list build processes
-o OPTION override Isabelle system OPTION (via NAME=VAL or NAME)
-v verbose
Run as external worker for session build process, as identified via
- option -U UUID.
+ option -i. The latter can be omitted, if there is exactly one build.
""",
"N" -> (_ => numa_shuffling = true),
- "U:" -> (arg => build_uuid = arg),
"d:" -> (arg => dirs = dirs ::: List(Path.explode(arg))),
+ "i:" -> (arg => build_id = arg),
"j:" -> (arg => max_jobs = Value.Int.parse(arg)),
+ "l" -> (_ => list_builds = true),
"o:" -> (arg => options = options + arg),
"v" -> (_ => verbose = true))
val more_args = getopts(args)
if (more_args.nonEmpty) getopts.usage()
- if (build_uuid.isEmpty) error("Missing UUID for build process (option -U)")
-
val progress = new Console_Progress(verbose = verbose)
- val results =
- progress.interrupt_handler {
- build_worker(options, build_uuid,
- progress = progress,
- dirs = dirs,
- numa_shuffling = Host.numa_check(progress, numa_shuffling),
- max_jobs = max_jobs)
- }
+ val builds = read_builds(options)
+
+ if (list_builds) progress.echo(print_builds(options, builds))
+
+ if (!list_builds || build_id.nonEmpty) {
+ val build = id_builds(options, build_id, builds)
- sys.exit(results.rc)
+ val results =
+ progress.interrupt_handler {
+ build_worker(options, build,
+ progress = progress,
+ dirs = dirs,
+ numa_shuffling = Host.numa_check(progress, numa_shuffling),
+ max_jobs = max_jobs)
+ }
+
+ sys.exit(results.rc)
+ }
})
--- a/src/Pure/Tools/build_job.scala Tue Jun 27 11:56:31 2023 +0100
+++ b/src/Pure/Tools/build_job.scala Wed Jun 28 16:01:34 2023 +0200
@@ -23,11 +23,13 @@
build_context: Build_Process.Context,
progress: Progress,
log: Logger,
+ database_server: Option[SQL.Database],
session_background: Sessions.Background,
input_shasum: SHA1.Shasum,
node_info: Host.Node_Info
): Session_Job = {
- new Session_Job(build_context, progress, log, session_background, input_shasum, node_info)
+ new Session_Job(build_context, progress, log, database_server,
+ session_background, input_shasum, node_info)
}
object Session_Context {
@@ -93,6 +95,7 @@
build_context: Build_Process.Context,
progress: Progress,
log: Logger,
+ database_server: Option[SQL.Database],
session_background: Sessions.Background,
input_shasum: SHA1.Shasum,
node_info: Host.Node_Info
@@ -453,8 +456,7 @@
val heap = store.output_heap(session_name)
if (process_result.ok && store_heap && heap.is_file) {
val slice = Space.MiB(options.real("build_database_slice")).bytes
- val digest =
- using_optional(store.maybe_open_heaps_database())(ML_Heap.store(_, heap, slice))
+ val digest = ML_Heap.store(database_server, session_name, heap, slice)
SHA1.shasum(digest, session_name)
}
else SHA1.no_shasum
--- a/src/Pure/Tools/build_process.scala Tue Jun 27 11:56:31 2023 +0100
+++ b/src/Pure/Tools/build_process.scala Wed Jun 28 16:01:34 2023 +0200
@@ -16,79 +16,89 @@
object Build_Process {
/** static context **/
- object Context {
- def apply(
- store: Store,
- build_deps: Sessions.Deps,
- progress: Progress = new Progress,
- ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"),
- hostname: String = Isabelle_System.hostname(),
- numa_shuffling: Boolean = false,
- build_heap: Boolean = false,
- max_jobs: Int = 1,
- fresh_build: Boolean = false,
- no_build: Boolean = false,
- session_setup: (String, Session) => Unit = (_, _) => (),
- build_uuid: String = UUID.random().toString,
- master: Boolean = false,
- ): Context = {
- val sessions_structure = build_deps.sessions_structure
- val build_graph = sessions_structure.build_graph
+ def init_context(
+ store: Store,
+ build_deps: Sessions.Deps,
+ progress: Progress = new Progress,
+ ml_platform: String = Isabelle_System.getenv("ML_PLATFORM"),
+ hostname: String = Isabelle_System.hostname(),
+ numa_shuffling: Boolean = false,
+ build_heap: Boolean = false,
+ max_jobs: Int = 1,
+ fresh_build: Boolean = false,
+ no_build: Boolean = false,
+ session_setup: (String, Session) => Unit = (_, _) => (),
+ build_uuid: String = UUID.random().toString,
+ master: Boolean = false,
+ ): Context = {
+ val sessions_structure = build_deps.sessions_structure
+ val build_graph = sessions_structure.build_graph
- val sessions =
- Map.from(
- for ((name, (info, _)) <- build_graph.iterator)
- yield {
- val deps = info.parent.toList
- val ancestors = sessions_structure.build_requirements(deps)
- val sources_shasum = build_deps.sources_shasum(name)
- val session_context =
- Build_Job.Session_Context.load(
- build_uuid, name, deps, ancestors, info.session_prefs, sources_shasum,
- info.timeout, store, progress = progress)
- name -> session_context
- })
+ val sessions =
+ Map.from(
+ for ((name, (info, _)) <- build_graph.iterator)
+ yield {
+ val deps = info.parent.toList
+ val ancestors = sessions_structure.build_requirements(deps)
+ val sources_shasum = build_deps.sources_shasum(name)
+ val session_context =
+ Build_Job.Session_Context.load(
+ build_uuid, name, deps, ancestors, info.session_prefs, sources_shasum,
+ info.timeout, store, progress = progress)
+ name -> session_context
+ })
- val sessions_time = {
- val maximals = build_graph.maximals.toSet
- def descendants_time(name: String): Double = {
- if (maximals.contains(name)) sessions(name).old_time.seconds
- else {
- val descendants = build_graph.all_succs(List(name)).toSet
- val g = build_graph.restrict(descendants)
- (0.0 :: g.maximals.flatMap { desc =>
- val ps = g.all_preds(List(desc))
- if (ps.exists(p => !sessions.isDefinedAt(p))) None
- else Some(ps.map(p => sessions(p).old_time.seconds).sum)
- }).max
+ val sessions_time = {
+ val maximals = build_graph.maximals.toSet
+ def descendants_time(name: String): Double = {
+ if (maximals.contains(name)) sessions(name).old_time.seconds
+ else {
+ val descendants = build_graph.all_succs(List(name)).toSet
+ val g = build_graph.restrict(descendants)
+ (0.0 :: g.maximals.flatMap { desc =>
+ val ps = g.all_preds(List(desc))
+ if (ps.exists(p => !sessions.isDefinedAt(p))) None
+ else Some(ps.map(p => sessions(p).old_time.seconds).sum)
+ }).max
+ }
+ }
+ Map.from(
+ for (name <- sessions.keysIterator)
+ yield name -> descendants_time(name)).withDefaultValue(0.0)
+ }
+
+ val ordering =
+ new Ordering[String] {
+ def compare(name1: String, name2: String): Int =
+ sessions_time(name2) compare sessions_time(name1) match {
+ case 0 =>
+ sessions(name2).timeout compare sessions(name1).timeout match {
+ case 0 => name1 compare name2
+ case ord => ord
+ }
+ case ord => ord
}
- }
- Map.from(
- for (name <- sessions.keysIterator)
- yield name -> descendants_time(name)).withDefaultValue(0.0)
}
- val ordering =
- new Ordering[String] {
- def compare(name1: String, name2: String): Int =
- sessions_time(name2) compare sessions_time(name1) match {
- case 0 =>
- sessions(name2).timeout compare sessions(name1).timeout match {
- case 0 => name1 compare name2
- case ord => ord
- }
- case ord => ord
- }
- }
+ Isabelle_System.make_directory(store.output_dir + Path.basic("log"))
- val numa_nodes = Host.numa_nodes(enabled = numa_shuffling)
- new Context(store, build_deps, sessions, ordering, ml_platform, hostname, numa_nodes,
- build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build,
- no_build = no_build, session_setup, build_uuid = build_uuid, master = master)
+ using_option(store.maybe_open_build_database()) { db =>
+ val shared_db = db.is_postgresql
+ Data.transaction_lock(db, create = true) {
+ Data.clean_build(db)
+ if (shared_db) Store.Data.tables.lock(db, create = true)
+ }
+ Data.vacuum(db, more_tables = if (shared_db) Store.Data.tables else SQL.Tables.empty)
}
+
+ val numa_nodes = Host.numa_nodes(enabled = numa_shuffling)
+
+ new Context(store, build_deps, sessions, ordering, ml_platform, hostname, numa_nodes,
+ build_heap = build_heap, max_jobs = max_jobs, fresh_build = fresh_build,
+ no_build = no_build, session_setup, build_uuid = build_uuid, master = master)
}
- final class Context private(
+ final class Context private[Build_Process](
val store: Store,
val build_deps: Sessions.Deps,
val sessions: State.Sessions,
@@ -121,17 +131,6 @@
case None => Nil
}
- def prepare_database(): Unit = {
- using_option(store.maybe_open_build_database(Data.database)) { db =>
- val shared_db = db.is_postgresql
- Data.transaction_lock(db, create = true) {
- Data.clean_build(db)
- if (shared_db) Store.Data.tables.lock(db, create = true)
- }
- Data.vacuum(db, more_tables = if (shared_db) Store.Data.tables else SQL.Tables.empty)
- }
- }
-
def store_heap(name: String): Boolean =
build_heap || Sessions.is_pure(name) ||
sessions.valuesIterator.exists(_.ancestors.contains(name))
@@ -148,8 +147,11 @@
ml_platform: String,
options: String,
start: Date,
- stop: Option[Date]
- )
+ stop: Option[Date],
+ sessions: List[String]
+ ) {
+ def active: Boolean = stop.isEmpty
+ }
case class Worker(
worker_uuid: String, // Database_Progress.agent_uuid
@@ -333,18 +335,25 @@
val table = make_table("", List(build_uuid, ml_platform, options, start, stop))
}
- def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] =
- db.execute_query_statement(
- Base.table.select(sql = Generic.sql_where(build_uuid = build_uuid)),
- List.from[Build],
- { res =>
- val build_uuid = res.string(Base.build_uuid)
- val ml_platform = res.string(Base.ml_platform)
- val options = res.string(Base.options)
- val start = res.date(Base.start)
- val stop = res.get_date(Base.stop)
- Build(build_uuid, ml_platform, options, start, stop)
- })
+ def read_builds(db: SQL.Database, build_uuid: String = ""): List[Build] = {
+ val builds =
+ db.execute_query_statement(
+ Base.table.select(sql = Generic.sql_where(build_uuid = build_uuid)),
+ List.from[Build],
+ { res =>
+ val build_uuid = res.string(Base.build_uuid)
+ val ml_platform = res.string(Base.ml_platform)
+ val options = res.string(Base.options)
+ val start = res.date(Base.start)
+ val stop = res.get_date(Base.stop)
+ Build(build_uuid, ml_platform, options, start, stop, Nil)
+ })
+
+ for (build <- builds.sortBy(_.start)(Date.Ordering)) yield {
+ val sessions = Data.read_sessions_domain(db, build_uuid = build.build_uuid)
+ build.copy(sessions = sessions.toList.sorted)
+ }
+ }
def start_build(
db: SQL.Database,
@@ -399,14 +408,23 @@
old_time, old_command_timings, build_uuid))
}
- def read_sessions_domain(db: SQL.Database): Set[String] =
+ def read_sessions_domain(db: SQL.Database, build_uuid: String = ""): Set[String] =
db.execute_query_statement(
- Sessions.table.select(List(Sessions.name)),
+ Sessions.table.select(List(Sessions.name),
+ sql = if_proper(build_uuid, Sessions.name.where_equal(build_uuid))),
Set.from[String], res => res.string(Sessions.name))
- def read_sessions(db: SQL.Database, names: Iterable[String] = Nil): State.Sessions =
+ def read_sessions(db: SQL.Database,
+ names: Iterable[String] = Nil,
+ build_uuid: String = ""
+ ): State.Sessions =
db.execute_query_statement(
- Sessions.table.select(sql = if_proper(names, Sessions.name.where_member(names))),
+ Sessions.table.select(
+ sql =
+ SQL.where_and(
+ if_proper(names, Sessions.name.member(names)),
+ if_proper(build_uuid, Sessions.build_uuid.equal(build_uuid)))
+ ),
Map.from[String, Build_Job.Session_Context],
{ res =>
val name = res.string(Sessions.name)
@@ -423,7 +441,7 @@
}
)
- def update_sessions(db:SQL.Database, sessions: State.Sessions): Boolean = {
+ def update_sessions(db: SQL.Database, sessions: State.Sessions): Boolean = {
val old_sessions = read_sessions_domain(db)
val insert = sessions.iterator.filterNot(p => old_sessions.contains(p._1)).toList
@@ -780,6 +798,9 @@
state.set_serial(serial)
}
}
+
+ def read_builds(db: SQL.Database): List[Build] =
+ Data.transaction_lock(db, create = true) { Data.read_builds(db) }
}
@@ -802,30 +823,40 @@
/* progress backed by database */
- private val _database: Option[SQL.Database] =
- store.maybe_open_build_database(Build_Process.Data.database)
+ private val _database_server: Option[SQL.Database] =
+ try { store.maybe_open_database_server() }
+ catch { case exn: Throwable => close(); throw exn }
+
+ private val _build_database: Option[SQL.Database] =
+ try { store.maybe_open_build_database() }
+ catch { case exn: Throwable => close(); throw exn }
private val _host_database: Option[SQL.Database] =
- store.maybe_open_build_database(Host.Data.database)
+ try { store.maybe_open_build_database(path = Host.Data.database) }
+ catch { case exn: Throwable => close(); throw exn }
protected val (progress, worker_uuid) = synchronized {
- _database match {
+ _build_database match {
case None => (build_progress, UUID.random().toString)
case Some(db) =>
- val progress_db = store.open_build_database(Progress.Data.database)
- val progress =
- new Database_Progress(progress_db, build_progress,
- hostname = hostname,
- context_uuid = build_uuid)
- (progress, progress.agent_uuid)
+ try {
+ val progress_db = store.open_build_database(Progress.Data.database)
+ val progress =
+ new Database_Progress(progress_db, build_progress,
+ hostname = hostname,
+ context_uuid = build_uuid)
+ (progress, progress.agent_uuid)
+ }
+ catch { case exn: Throwable => close(); throw exn }
}
}
protected val log: Logger = Logger.make_system_log(progress, build_options)
def close(): Unit = synchronized {
- _database.foreach(_.close())
- _host_database.foreach(_.close())
+ Option(_database_server).flatten.foreach(_.close())
+ Option(_build_database).flatten.foreach(_.close())
+ Option(_host_database).flatten.foreach(_.close())
progress match {
case db_progress: Database_Progress =>
db_progress.exit()
@@ -839,7 +870,7 @@
private var _state: Build_Process.State = Build_Process.State()
protected def synchronized_database[A](body: => A): A = synchronized {
- _database match {
+ _build_database match {
case None => body
case Some(db) =>
Build_Process.Data.transaction_lock(db) {
@@ -906,10 +937,8 @@
val cancelled = progress.stopped || !ancestor_results.forall(_.ok)
if (!skipped && !cancelled) {
- using_optional(store.maybe_open_heaps_database()) { database =>
- database.foreach(
- ML_Heap.restore(_, store.output_heap(session_name), cache = store.cache.compress))
- }
+ ML_Heap.restore(_database_server, session_name, store.output_heap(session_name),
+ cache = store.cache.compress)
}
val result_name = (session_name, worker_uuid, build_uuid)
@@ -945,10 +974,10 @@
(if (store_heap) "Building " else "Running ") + session_name +
if_proper(node_info.numa_node, " on " + node_info) + " ...")
- store.clean_output(session_name, init = true)
+ store.clean_output(_database_server, session_name, session_init = true)
val build =
- Build_Job.start_session(build_context, progress, log,
+ Build_Job.start_session(build_context, progress, log, _database_server,
build_deps.background(session_name), input_shasum, node_info)
val job = Build_Process.Job(session_name, worker_uuid, build_uuid, node_info, Some(build))
@@ -964,27 +993,27 @@
!Long_Name.is_qualified(job_name)
protected final def start_build(): Unit = synchronized_database {
- for (db <- _database) {
+ for (db <- _build_database) {
Build_Process.Data.start_build(db, build_uuid, build_context.ml_platform,
build_context.sessions_structure.session_prefs)
}
}
protected final def stop_build(): Unit = synchronized_database {
- for (db <- _database) {
+ for (db <- _build_database) {
Build_Process.Data.stop_build(db, build_uuid)
}
}
protected final def start_worker(): Unit = synchronized_database {
- for (db <- _database) {
+ for (db <- _build_database) {
_state = _state.inc_serial
Build_Process.Data.start_worker(db, worker_uuid, build_uuid, _state.serial)
}
}
protected final def stop_worker(): Unit = synchronized_database {
- for (db <- _database) {
+ for (db <- _build_database) {
Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true)
}
}
@@ -1060,7 +1089,7 @@
def snapshot(): Build_Process.Snapshot = synchronized_database {
val (builds, workers) =
- _database match {
+ _build_database match {
case None => (Nil, Nil)
case Some(db) =>
(Build_Process.Data.read_builds(db),