--- a/src/Pure/Admin/build_history.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Admin/build_history.scala Thu Feb 22 16:31:58 2024 +0100
@@ -282,7 +282,7 @@
build_out_progress.echo("Reading session build info ...")
val session_build_info =
build_info.finished_sessions.flatMap { session_name =>
- val database = isabelle_output + store.log_db(session_name)
+ val database = isabelle_output + Store.log_db(session_name)
if (database.is_file) {
using(SQLite.open_database(database)) { db =>
@@ -309,8 +309,8 @@
build_out_progress.echo("Reading ML statistics ...")
val ml_statistics =
build_info.finished_sessions.flatMap { session_name =>
- val database = isabelle_output + store.log_db(session_name)
- val log_gz = isabelle_output + store.log_gz(session_name)
+ val database = isabelle_output + Store.log_db(session_name)
+ val log_gz = isabelle_output + Store.log_gz(session_name)
val properties =
if (database.is_file) {
@@ -336,7 +336,7 @@
build_out_progress.echo("Reading error messages ...")
val session_errors =
build_info.failed_sessions.flatMap { session_name =>
- val database = isabelle_output + store.log_db(session_name)
+ val database = isabelle_output + Store.log_db(session_name)
val errors =
if (database.is_file) {
try {
--- a/src/Pure/Build/build.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Build/build.scala Thu Feb 22 16:31:58 2024 +0100
@@ -115,14 +115,15 @@
def build_options(options: Options, build_cluster: Boolean = false): Options = {
val options1 = options + "completion_limit=0" + "editor_tracing_messages=0"
- if (build_cluster) options1 + "build_database_server" + "build_database" else options1
+ if (build_cluster) options1 + "build_database" else options1
}
final def build_store(options: Options,
build_cluster: Boolean = false,
cache: Term.Cache = Term.Cache.make()
): Store = {
- val store = Store(build_options(options, build_cluster = build_cluster), cache = cache)
+ val store_options = build_options(options, build_cluster = build_cluster)
+ val store = Store(store_options, build_cluster = build_cluster, cache = cache)
Isabelle_System.make_directory(store.output_dir + Path.basic("log"))
Isabelle_Fonts.init()
store
@@ -504,13 +505,14 @@
def build_process(
options: Options,
+ build_cluster: Boolean = false,
list_builds: Boolean = false,
remove_builds: Boolean = false,
force: Boolean = false,
progress: Progress = new Progress
): Unit = {
val build_engine = Engine(engine_name(options))
- val store = build_engine.build_store(options)
+ val store = build_engine.build_store(options, build_cluster = build_cluster)
using(store.open_server()) { server =>
using_optional(store.maybe_open_build_database(server = server)) { build_database =>
@@ -544,25 +546,28 @@
val isabelle_tool2 = Isabelle_Tool("build_process", "manage session build process",
Scala_Project.here,
{ args =>
+ var build_cluster = false
var force = false
var list_builds = false
var options =
- Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS :::
- List(
- Options.Spec.make("build_database_server"),
- Options.Spec.make("build_database")))
+ Options.init(specs =
+ Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database")))
var remove_builds = false
val getopts = Getopts("""
Usage: isabelle build_process [OPTIONS]
Options are:
+ -C build cluster mode (database server)
-f extra force for option -r
-l list build processes
-o OPTION override Isabelle system OPTION (via NAME=VAL or NAME)
-r remove data from build processes: inactive processes (default)
or all processes (option -f)
+
+ Manage Isabelle build process, notably distributed build cluster (option -C).
""",
+ "C" -> (_ => build_cluster = true),
"f" -> (_ => force = true),
"l" -> (_ => list_builds = true),
"o:" -> (arg => options = options + arg),
@@ -573,8 +578,8 @@
val progress = new Console_Progress()
- build_process(options, list_builds = list_builds, remove_builds = remove_builds,
- force = force, progress = progress)
+ build_process(options, build_cluster = build_cluster, list_builds = list_builds,
+ remove_builds = remove_builds, force = force, progress = progress)
})
@@ -613,7 +618,7 @@
max_jobs: Option[Int] = None
): Results = {
val build_engine = Engine(engine_name(options))
- val store = build_engine.build_store(options)
+ val store = build_engine.build_store(options, build_cluster = true)
val build_options = store.options
using(store.open_server()) { server =>
@@ -648,10 +653,8 @@
val dirs = new mutable.ListBuffer[Path]
var max_jobs: Option[Int] = None
var options =
- Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS :::
- List(
- Options.Spec.make("build_database_server"),
- Options.Spec.make("build_database")))
+ Options.init(specs =
+ Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database")))
var quiet = false
var verbose = false
--- a/src/Pure/Build/build_benchmark.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Build/build_benchmark.scala Thu Feb 22 16:31:58 2024 +0100
@@ -51,13 +51,13 @@
val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress)
val session = sessions(benchmark_session)
- val heaps = session.ancestors.map(store.output_heap)
- ML_Heap.restore(database_server, heaps, cache = store.cache.compress)
+ val hierachy = session.ancestors.map(store.output_session(_, store_heap = true))
+ for (db <- database_server) ML_Heap.restore(db, hierachy, cache = store.cache.compress)
val local_options = options + "build_database_server=false" + "build_database=false"
benchmark_requirements(local_options, progress)
- ML_Heap.restore(database_server, heaps, cache = store.cache.compress)
+ for (db <- database_server) ML_Heap.restore(db, hierachy, cache = store.cache.compress)
def get_shasum(session_name: String): SHA1.Shasum = {
val ancestor_shasums = sessions(session_name).ancestors.map(get_shasum)
--- a/src/Pure/Build/build_cluster.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Build/build_cluster.scala Thu Feb 22 16:31:58 2024 +0100
@@ -189,7 +189,9 @@
remote_isabelle
}
- def init(): Unit = remote_isabelle.init()
+ def init(): Unit =
+ remote_isabelle.init(other_settings =
+ remote_isabelle.init_components() ::: remote_isabelle.debug_settings())
def benchmark(): Unit = {
val script =
--- a/src/Pure/Build/build_job.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Build/build_job.scala Thu Feb 22 16:31:58 2024 +0100
@@ -277,7 +277,8 @@
for {
elapsed <- Markup.Elapsed.unapply(props)
elapsed_time = Time.seconds(elapsed)
- if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold")
+ if elapsed_time.is_relevant &&
+ elapsed_time >= options.seconds("command_timing_threshold")
} command_timings += props.filter(Markup.command_timing_property)
}
@@ -292,7 +293,8 @@
if (!progress.stopped) {
val theory_name = snapshot.node_name.theory
val args =
- Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress)
+ Protocol.Export.Args(
+ theory_name = theory_name, name = name, compress = compress)
val body = Bytes(Symbol.encode(YXML.string_of_body(xml)))
export_consumer.make_entry(session_name, args, body)
}
@@ -401,8 +403,9 @@
output_sources = info.document_output,
output_pdf = info.document_output)
}
- using(database_context.open_database(session_name, output = true))(session_database =>
- documents.foreach(_.write(session_database.db, session_name)))
+ using(database_context.open_database(session_name, output = true))(
+ session_database =>
+ documents.foreach(_.write(session_database.db, session_name)))
(documents.flatMap(_.log_lines), Nil)
}
}
@@ -450,7 +453,9 @@
errs.flatMap(s => split_lines(Output.error_message_text(s))) :::
errs.map(Protocol.Error_Message_Marker.apply))
}
- else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
+ else if (progress.stopped && result1.ok) {
+ result1.copy(rc = Process_Result.RC.interrupt)
+ }
else result1
case Exn.Exn(Exn.Interrupt()) =>
if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt)
@@ -464,18 +469,17 @@
else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt"))
else result2
+ val store_session =
+ store.output_session(session_name, store_heap = process_result.ok && store_heap)
+
/* output heap */
- val output_shasum = {
- val heap = store.output_heap(session_name)
- if (process_result.ok && store_heap && heap.is_file) {
- val slice = Space.MiB(options.real("build_database_slice")).bytes
- val digest = ML_Heap.store(database_server, session_name, heap, slice)
- SHA1.shasum(digest, session_name)
+ val output_shasum =
+ store_session.heap match {
+ case Some(path) => SHA1.shasum(ML_Heap.write_file_digest(path), session_name)
+ case None => SHA1.no_shasum
}
- else SHA1.no_shasum
- }
val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
@@ -516,6 +520,15 @@
true
}
+ using_optional(store.maybe_open_heaps_database(database_server, server = server)) {
+ heaps_database =>
+ for (db <- database_server orElse heaps_database) {
+ val slice = Space.MiB(options.real("build_database_slice"))
+ ML_Heap.store(db, store_session, slice,
+ cache = store.cache.compress, progress = progress)
+ }
+ }
+
// messages
process_result.err_lines.foreach(progress.echo(_))
@@ -531,11 +544,14 @@
}
else {
progress.echo(
- session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")")
+ session_name + " FAILED (see also \"isabelle build_log -H Error " +
+ session_name + "\")")
if (!process_result.interrupted) {
val tail = info.options.int("process_output_tail")
- val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)
- val prefix = if (log_lines.length == suffix.length) Nil else List("...")
+ val suffix =
+ if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0)
+ val prefix =
+ if (log_lines.length == suffix.length) Nil else List("...")
progress.echo(Library.trim_line(cat_lines(prefix ::: suffix)))
}
}
--- a/src/Pure/Build/build_process.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Build/build_process.scala Thu Feb 22 16:31:58 2024 +0100
@@ -785,7 +785,7 @@
Running.table,
Results.table)
- val build_uuid_tables =
+ private val build_uuid_tables =
tables.filter(table =>
table.columns.exists(column => column.name == Generic.build_uuid.name))
@@ -857,6 +857,10 @@
try { store.maybe_open_database_server(server = server) }
catch { case exn: Throwable => close(); throw exn }
+ protected val _heaps_database: Option[SQL.Database] =
+ try { store.maybe_open_heaps_database(_database_server, server = server) }
+ catch { case exn: Throwable => close(); throw exn }
+
protected val _build_database: Option[SQL.Database] =
try {
for (db <- store.maybe_open_build_database(server = server)) yield {
@@ -917,7 +921,7 @@
protected def open_build_cluster(): Build_Cluster =
Build_Cluster.make(build_context, progress = build_progress).open()
- protected val _build_cluster =
+ protected val _build_cluster: Build_Cluster =
try {
if (build_context.master && _build_database.isDefined) open_build_cluster()
else Build_Cluster.none
@@ -926,6 +930,7 @@
def close(): Unit = synchronized {
Option(_database_server).flatten.foreach(_.close())
+ Option(_heaps_database).flatten.foreach(_.close())
Option(_build_database).flatten.foreach(_.close())
Option(_host_database).foreach(_.close())
Option(_build_cluster).foreach(_.close())
@@ -1015,8 +1020,12 @@
val cancelled = progress.stopped || !ancestor_results.forall(_.ok)
if (!skipped && !cancelled) {
- val heaps = (session_name :: ancestor_results.map(_.name)).map(store.output_heap)
- ML_Heap.restore(_database_server, heaps, cache = store.cache.compress)
+ for (db <- _database_server orElse _heaps_database) {
+ val hierarchy =
+ (session_name :: ancestor_results.map(_.name))
+ .map(store.output_session(_, store_heap = true))
+ ML_Heap.restore(db, hierarchy, cache = store.cache.compress)
+ }
}
val result_name = (session_name, worker_uuid, build_uuid)
--- a/src/Pure/Build/build_schedule.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Build/build_schedule.scala Thu Feb 22 16:31:58 2024 +0100
@@ -1213,6 +1213,11 @@
class Build_Engine extends Build.Engine("build_schedule") {
+ override def build_options(options: Options, build_cluster: Boolean = false): Options = {
+ val options1 = super.build_options(options, build_cluster = build_cluster)
+ if (build_cluster) options1 + "build_database_server" else options1
+ }
+
def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = {
val sessions_structure = context.sessions_structure
--- a/src/Pure/Build/export.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Build/export.scala Thu Feb 22 16:31:58 2024 +0100
@@ -674,8 +674,7 @@
/* export files */
- val store = Store(options)
- export_files(store, session_name, export_dir, progress = progress, export_prune = export_prune,
- export_list = export_list, export_patterns = export_patterns)
+ export_files(Store(options), session_name, export_dir, progress = progress,
+ export_prune = export_prune, export_list = export_list, export_patterns = export_patterns)
})
}
--- a/src/Pure/Build/store.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Build/store.scala Thu Feb 22 16:31:58 2024 +0100
@@ -11,15 +11,38 @@
object Store {
- def apply(options: Options, cache: Term.Cache = Term.Cache.make()): Store =
- new Store(options, cache)
+ def apply(
+ options: Options,
+ build_cluster: Boolean = false,
+ cache: Term.Cache = Term.Cache.make()
+ ): Store = new Store(options, build_cluster, cache)
+
+
+ /* file names */
+
+ def heap(name: String): Path = Path.basic(name)
+ def log(name: String): Path = Path.basic("log") + Path.basic(name)
+ def log_db(name: String): Path = log(name).db
+ def log_gz(name: String): Path = log(name).gz
/* session */
- sealed case class Session(name: String, heap: Option[Path], log_db: Option[Path]) {
+ final class Session private[Store](
+ val name: String,
+ val heap: Option[Path],
+ val log_db: Option[Path],
+ dirs: List[Path]
+ ) {
+ def log_db_name: String = Store.log_db(name).implode
+
def defined: Boolean = heap.isDefined || log_db.isDefined
+ def the_heap: Path =
+ heap getOrElse
+ error("Missing heap image for session " + quote(name) + " -- expected in:\n" +
+ cat_lines(dirs.map(dir => " " + File.standard_path(dir))))
+
def heap_digest(): Option[SHA1.Digest] =
heap.flatMap(ML_Heap.read_file_digest)
@@ -177,6 +200,15 @@
uuid)
})
+ def read_build_uuid(db: SQL.Database, name: String): String =
+ db.execute_query_statementO[String](
+ Session_Info.table.select(List(Session_Info.uuid),
+ sql = Session_Info.session_name.where_equal(name)),
+ { res =>
+ try { Option(res.string(Session_Info.uuid)).getOrElse("") }
+ catch { case _: SQLException => "" }
+ }).getOrElse("")
+
def write_session_info(
db: SQL.Database,
cache: Compress.Cache,
@@ -236,9 +268,17 @@
)
}
}
+
+ def read_build_uuid(path: Path, session: String): String =
+ try { using(SQLite.open_database(path))(private_data.read_build_uuid(_, session)) }
+ catch { case _: SQLException => "" }
}
-class Store private(val options: Options, val cache: Term.Cache) {
+class Store private(
+ val options: Options,
+ val build_cluster: Boolean,
+ val cache: Term.Cache
+ ) {
store =>
override def toString: String = "Store(output_dir = " + output_dir.absolute + ")"
@@ -265,38 +305,34 @@
/* file names */
- def heap(name: String): Path = Path.basic(name)
- def log(name: String): Path = Path.basic("log") + Path.basic(name)
- def log_db(name: String): Path = log(name).db
- def log_gz(name: String): Path = log(name).gz
-
- def output_heap(name: String): Path = output_dir + heap(name)
- def output_log(name: String): Path = output_dir + log(name)
- def output_log_db(name: String): Path = output_dir + log_db(name)
- def output_log_gz(name: String): Path = output_dir + log_gz(name)
+ def output_heap(name: String): Path = output_dir + Store.heap(name)
+ def output_log(name: String): Path = output_dir + Store.log(name)
+ def output_log_db(name: String): Path = output_dir + Store.log_db(name)
+ def output_log_gz(name: String): Path = output_dir + Store.log_gz(name)
/* session */
def get_session(name: String): Store.Session = {
- val heap = input_dirs.view.map(_ + store.heap(name)).find(_.is_file)
- val log_db = input_dirs.view.map(_ + store.log_db(name)).find(_.is_file)
- Store.Session(name, heap, log_db)
+ val heap = input_dirs.view.map(_ + Store.heap(name)).find(_.is_file)
+ val log_db = input_dirs.view.map(_ + Store.log_db(name)).find(_.is_file)
+ new Store.Session(name, heap, log_db, input_dirs)
+ }
+
+ def output_session(name: String, store_heap: Boolean = false): Store.Session = {
+ val heap = if (store_heap) Some(output_heap(name)) else None
+ val log_db = if (!build_database_server) Some(output_log_db(name)) else None
+ new Store.Session(name, heap, log_db, List(output_dir))
}
/* heap */
- def the_heap(name: String): Path =
- get_session(name).heap getOrElse
- error("Missing heap image for session " + quote(name) + " -- expected in:\n" +
- cat_lines(input_dirs.map(dir => " " + File.standard_path(dir))))
-
def heap_shasum(database_server: Option[SQL.Database], name: String): SHA1.Shasum = {
def get_database: Option[SHA1.Digest] = {
for {
db <- database_server
- digest <- ML_Heap.get_entries(db, List(name)).valuesIterator.nextOption()
+ digest <- ML_Heap.read_digests(db, List(name)).valuesIterator.nextOption()
} yield digest
}
@@ -331,11 +367,23 @@
ssh_port = options.int("build_database_ssh_port"),
ssh_user = options.string("build_database_ssh_user"))
- def maybe_open_database_server(server: SSH.Server = SSH.no_server): Option[SQL.Database] =
- if (build_database_server) Some(open_database_server(server = server)) else None
+ def maybe_open_database_server(
+ server: SSH.Server = SSH.no_server,
+ guard: Boolean = build_database_server
+ ): Option[SQL.Database] = {
+ if (guard) Some(open_database_server(server = server)) else None
+ }
+
+ def maybe_open_heaps_database(
+ database_server: Option[SQL.Database],
+ server: SSH.Server = SSH.no_server
+ ): Option[SQL.Database] = {
+ if (database_server.isDefined) None
+ else store.maybe_open_database_server(server = server, guard = build_cluster)
+ }
def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database =
- if (build_database_server) open_database_server(server = server)
+ if (build_database_server || build_cluster) open_database_server(server = server)
else SQLite.open_database(path, restrict = true)
def maybe_open_build_database(
@@ -359,7 +407,7 @@
else {
(for {
dir <- input_dirs.view
- path = dir + log_db(name) if path.is_file
+ path = dir + Store.log_db(name) if path.is_file
db <- check(SQLite.open_database(path))
} yield db).headOption
}
@@ -383,9 +431,7 @@
): Option[Boolean] = {
val relevant_db =
database_server match {
- case Some(db) =>
- ML_Heap.clean_entry(db, name)
- clean_session_info(db, name)
+ case Some(db) => clean_session_info(db, name)
case None => false
}
@@ -393,7 +439,7 @@
for {
dir <-
(if (system_heaps) List(user_output_dir, system_output_dir) else List(user_output_dir))
- file <- List(heap(name), log_db(name), log(name), log_gz(name))
+ file <- List(Store.heap(name), Store.log_db(name), Store.log(name), Store.log_gz(name))
path = dir + file if path.is_file
} yield path.file.delete
--- a/src/Pure/ML/ml_heap.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/ML/ml_heap.scala Thu Feb 22 16:31:58 2024 +0100
@@ -7,11 +7,6 @@
package isabelle
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-import java.nio.file.StandardOpenOption
-
-
object ML_Heap {
/** heap file with SHA1 digest **/
@@ -43,6 +38,8 @@
/* SQL data model */
+ sealed case class Log_DB(uuid: String, content: Bytes)
+
object private_data extends SQL.Data("isabelle_heaps") {
override lazy val tables = SQL.Tables(Base.table, Slices.table)
@@ -52,10 +49,24 @@
object Base {
val name = Generic.name
- val size = SQL.Column.long("size")
- val digest = SQL.Column.string("digest")
+ val heap_size = SQL.Column.long("heap_size")
+ val heap_digest = SQL.Column.string("heap_digest")
+ val uuid = SQL.Column.string("uuid")
+ val log_db = SQL.Column.bytes("log_db")
+
+ val table = make_table(List(name, heap_size, heap_digest, uuid, log_db))
+ }
- val table = make_table(List(name, size, digest))
+ object Size {
+ val name = Generic.name
+ val heap = SQL.Column.string("heap")
+ val log_db = SQL.Column.string("log_db")
+
+ val table = make_table(List(name, heap, log_db),
+ body =
+ "SELECT name, pg_size_pretty(heap_size::bigint) as heap, " +
+ " pg_size_pretty(length(log_db)::bigint) as log_db FROM " + Base.table.ident,
+ name = "size")
}
object Slices {
@@ -68,8 +79,8 @@
object Slices_Size {
val name = Generic.name
- val slice = SQL.Column.int("slice").make_primary_key
- val size = SQL.Column.long("size")
+ val slice = Slices.slice
+ val size = SQL.Column.string("size")
val table = make_table(List(name, slice, size),
body = "SELECT name, slice, pg_size_pretty(length(content)::bigint) as size FROM " +
@@ -77,42 +88,38 @@
name = "slices_size")
}
- def get_entries(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = {
- require(names.nonEmpty)
-
+ def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = {
db.execute_query_statement(
- Base.table.select(List(Base.name, Base.digest),
+ Base.table.select(List(Base.name, Base.heap_digest),
sql = Generic.name.where_member(names)),
List.from[(String, String)],
- res => res.string(Base.name) -> res.string(Base.digest)
- ).flatMap({
- case (_, "") => None
- case (name, digest) => Some(name -> SHA1.fake_digest(digest))
+ res => res.string(Base.name) -> res.string(Base.heap_digest)
+ ).collect({
+ case (name, digest) if digest.nonEmpty => name -> SHA1.fake_digest(digest)
}).toMap
}
- def read_entry(db: SQL.Database, name: String): List[Bytes] =
+ def read_slices(db: SQL.Database, name: String): List[Bytes] =
db.execute_query_statement(
Slices.table.select(List(Slices.content),
sql = Generic.name.where_equal(name) + SQL.order_by(List(Slices.slice))),
List.from[Bytes], _.bytes(Slices.content))
- def clean_entry(db: SQL.Database, name: String): Unit = {
- for (table <- List(Base.table, Slices.table)) {
- db.execute_statement(table.delete(sql = Base.name.where_equal(name)))
- }
- db.create_view(Slices_Size.table)
- }
-
- def prepare_entry(db: SQL.Database, name: String): Unit =
- db.execute_statement(Base.table.insert(), body =
- { stmt =>
- stmt.string(1) = name
- stmt.long(2) = None
- stmt.string(3) = None
+ def read_log_db(db: SQL.Database, name: String, old_uuid: String = ""): Option[Log_DB] =
+ db.execute_query_statement(
+ Base.table.select(List(Base.uuid, Base.log_db), sql =
+ SQL.where_and(
+ Generic.name.equal(name),
+ if_proper(old_uuid, Base.uuid.ident + " <> " + SQL.string(old_uuid)))),
+ List.from[(String, Bytes)],
+ res => (res.string(Base.uuid), res.bytes(Base.log_db))
+ ).collectFirst(
+ {
+ case (uuid, content) if uuid.nonEmpty && !content.is_empty =>
+ Log_DB(uuid, content)
})
- def write_entry(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit =
+ def write_slice(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit =
db.execute_statement(Slices.table.insert(), body =
{ stmt =>
stmt.string(1) = name
@@ -120,13 +127,43 @@
stmt.bytes(3) = content
})
- def finish_entry(db: SQL.Database, name: String, size: Long, digest: SHA1.Digest): Unit =
+ def clean_entry(db: SQL.Database, name: String): Unit = {
+ for (table <- List(Base.table, Slices.table)) {
+ db.execute_statement(table.delete(sql = Base.name.where_equal(name)))
+ }
+ }
+
+ def init_entry(db: SQL.Database, name: String, log_db: Option[Log_DB] = None): Unit = {
+ clean_entry(db, name)
+ for (table <- List(Size.table, Slices_Size.table)) {
+ db.create_view(table)
+ }
+ db.execute_statement(Base.table.insert(), body =
+ { stmt =>
+ stmt.string(1) = name
+ stmt.long(2) = None
+ stmt.string(3) = None
+ stmt.string(4) = log_db.map(_.uuid)
+ stmt.bytes(5) = log_db.map(_.content)
+ })
+ }
+
+ def finish_entry(
+ db: SQL.Database,
+ name: String,
+ heap_size: Long,
+ heap_digest: Option[SHA1.Digest],
+ log_db: Option[Log_DB]
+ ): Unit =
db.execute_statement(
- Base.table.update(List(Base.size, Base.digest), sql = Base.name.where_equal(name)),
+ Base.table.update(List(Base.heap_size, Base.heap_digest, Base.uuid, Base.log_db),
+ sql = Base.name.where_equal(name)),
body =
{ stmt =>
- stmt.long(1) = size
- stmt.string(2) = digest.toString
+ stmt.long(1) = heap_size
+ stmt.string(2) = heap_digest.map(_.toString)
+ stmt.string(3) = log_db.map(_.uuid)
+ stmt.bytes(4) = log_db.map(_.content)
})
}
@@ -135,89 +172,127 @@
private_data.clean_entry(db, session_name)
}
- def get_entries(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] =
- private_data.transaction_lock(db, create = true, label = "ML_Heap.get_entries") {
- private_data.get_entries(db, names)
+ def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] =
+ if (names.isEmpty) Map.empty
+ else {
+ private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") {
+ private_data.read_digests(db, names)
+ }
}
def store(
- database: Option[SQL.Database],
- session_name: String,
- heap: Path,
- slice: Long,
- cache: Compress.Cache = Compress.Cache.none
- ): SHA1.Digest = {
- val digest = write_file_digest(heap)
- database match {
- case None =>
- case Some(db) =>
- val size = File.size(heap) - sha1_prefix.length - SHA1.digest_length
+ db: SQL.Database,
+ session: Store.Session,
+ slice: Space,
+ cache: Compress.Cache = Compress.Cache.none,
+ progress: Progress = new Progress
+ ): Unit = {
+ val log_db =
+ for {
+ path <- session.log_db
+ uuid <- proper_string(Store.read_build_uuid(path, session.name))
+ } yield Log_DB(uuid, Bytes.read(path))
- val slices = (size.toDouble / slice.toDouble).ceil.toInt
- val step = (size.toDouble / slices.toDouble).ceil.toLong
+ val heap_digest = session.heap.map(write_file_digest)
+ val heap_size =
+ session.heap match {
+ case Some(heap) => File.size(heap) - sha1_prefix.length - SHA1.digest_length
+ case None => 0L
+ }
- try {
- private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") {
- private_data.prepare_entry(db, session_name)
- }
+ val slice_size = slice.bytes max Space.MiB(1).bytes
+ val slices = (heap_size.toDouble / slice_size.toDouble).ceil.toInt
+
+ try {
+ if (slices == 0 && log_db.isDefined) progress.echo("Storing " + session.log_db_name + " ...")
+
+ private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") {
+ private_data.init_entry(db, session.name, log_db = if (slices == 0) log_db else None)
+ }
- for (i <- 0 until slices) {
- val j = i + 1
- val offset = step * i
- val limit = if (j < slices) step * j else size
- val content =
- Bytes.read_file(heap, offset = offset, limit = limit)
- .compress(cache = cache)
- private_data.transaction_lock(db, label = "ML_Heap.store2") {
- private_data.write_entry(db, session_name, i, content)
- }
- }
-
- private_data.transaction_lock(db, label = "ML_Heap.store3") {
- private_data.finish_entry(db, session_name, size, digest)
+ if (slices > 0) {
+ progress.echo("Storing " + session.name + " ...")
+ val step = (heap_size.toDouble / slices.toDouble).ceil.toLong
+ for (i <- 0 until slices) {
+ val j = i + 1
+ val offset = step * i
+ val limit = if (j < slices) step * j else heap_size
+ val content =
+ Bytes.read_file(session.the_heap, offset = offset, limit = limit)
+ .compress(cache = cache)
+ private_data.transaction_lock(db, label = "ML_Heap.store2") {
+ private_data.write_slice(db, session.name, i, content)
}
}
- catch { case exn: Throwable =>
- private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") {
- private_data.clean_entry(db, session_name)
- }
- throw exn
+
+ if (log_db.isDefined) progress.echo("Storing " + session.log_db_name + " ...")
+
+ private_data.transaction_lock(db, label = "ML_Heap.store3") {
+ private_data.finish_entry(db, session.name, heap_size, heap_digest, log_db)
}
+ }
}
- digest
+ catch { case exn: Throwable =>
+ private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") {
+ private_data.clean_entry(db, session.name)
+ }
+ throw exn
+ }
}
def restore(
- database: Option[SQL.Database],
- heaps: List[Path],
- cache: Compress.Cache = Compress.Cache.none
+ db: SQL.Database,
+ sessions: List[Store.Session],
+ cache: Compress.Cache = Compress.Cache.none,
+ progress: Progress = new Progress
): Unit = {
- database match {
- case Some(db) if heaps.nonEmpty =>
- private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") {
- val db_digests = private_data.get_entries(db, heaps.map(_.file_name))
- for (heap <- heaps) {
- val session_name = heap.file_name
- val file_digest = read_file_digest(heap)
- val db_digest = db_digests.get(session_name)
- if (db_digest.isDefined && db_digest != file_digest) {
- val base_dir = Isabelle_System.make_directory(heap.expand.dir)
- Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp =>
- Bytes.write(tmp, Bytes.empty)
- for (slice <- private_data.read_entry(db, session_name)) {
- Bytes.append(tmp, slice.uncompress(cache = cache))
- }
- val digest = write_file_digest(tmp)
- if (db_digest.get == digest) {
- Isabelle_System.chmod("a+r", tmp)
- Isabelle_System.move_file(tmp, heap)
- }
- else error("Incoherent content for session heap " + quote(session_name))
+ if (sessions.exists(_.defined)) {
+ private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") {
+ /* heap */
+
+ val defined_heaps =
+ for (session <- sessions; heap <- session.heap)
+ yield session.name -> heap
+
+ val db_digests = private_data.read_digests(db, defined_heaps.map(_._1))
+
+ for ((session_name, heap) <- defined_heaps) {
+ val file_digest = read_file_digest(heap)
+ val db_digest = db_digests.get(session_name)
+ if (db_digest.isDefined && db_digest != file_digest) {
+ progress.echo("Restoring " + session_name + " ...")
+
+ val base_dir = Isabelle_System.make_directory(heap.expand.dir)
+ Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp =>
+ Bytes.write(tmp, Bytes.empty)
+ for (slice <- private_data.read_slices(db, session_name)) {
+ Bytes.append(tmp, slice.uncompress(cache = cache))
}
+ val digest = write_file_digest(tmp)
+ if (db_digest.get == digest) {
+ Isabelle_System.chmod("a+r", tmp)
+ Isabelle_System.move_file(tmp, heap)
+ }
+ else error("Incoherent content for session heap " + heap)
}
}
}
- case _ =>
+
+
+ /* log_db */
+
+ for (session <- sessions; path <- session.log_db) {
+ val file_uuid = Store.read_build_uuid(path, session.name)
+ private_data.read_log_db(db, session.name, old_uuid = file_uuid) match {
+ case Some(log_db) if file_uuid.isEmpty =>
+ progress.echo("Restoring " + session.log_db_name + " ...")
+ Isabelle_System.make_directory(path.expand.dir)
+ Bytes.write(path, log_db.content)
+ case Some(_) => error("Incoherent content for session database " + path)
+ case None =>
+ }
+ }
+ }
}
}
}
--- a/src/Pure/ML/ml_process.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/ML/ml_process.scala Thu Feb 22 16:31:58 2024 +0100
@@ -24,7 +24,7 @@
session_background.sessions_structure.selection(logic_name).
build_requirements(List(logic_name)).
- map(store.the_heap)
+ map(name => store.get_session(name).the_heap)
}
def apply(
--- a/src/Pure/System/other_isabelle.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/System/other_isabelle.scala Thu Feb 22 16:31:58 2024 +0100
@@ -147,6 +147,14 @@
else error("Cannot proceed with existing user settings file: " + etc_settings)
}
+ def debug_settings(): List[String] = {
+ val debug = System.getProperty("isabelle.debug", "") == "true"
+ if (debug) {
+ List("ISABELLE_JAVA_SYSTEM_OPTIONS=\"$ISABELLE_JAVA_SYSTEM_OPTIONS -Disabelle.debug=true\"")
+ }
+ else Nil
+ }
+
/* init */
--- a/src/Pure/Tools/profiling.scala Wed Feb 21 16:19:36 2024 +0000
+++ b/src/Pure/Tools/profiling.scala Thu Feb 22 16:31:58 2024 +0100
@@ -99,7 +99,7 @@
locales = session.locales,
locale_thms = session.locale_thms,
global_thms = session.global_thms,
- heap_size = File.space(store.the_heap(session_name)),
+ heap_size = File.space(store.get_session(session_name).the_heap),
thys_id_size = session.sizeof_thys_id,
thms_id_size = session.sizeof_thms_id,
terms_size = session.sizeof_terms,