build local log_db, with store/restore via optional database server;
tuned messages;
--- a/src/Pure/Build/build.scala Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/build.scala Wed Feb 21 19:36:53 2024 +0100
@@ -115,7 +115,7 @@
def build_options(options: Options, build_cluster: Boolean = false): Options = {
val options1 = options + "completion_limit=0" + "editor_tracing_messages=0"
- if (build_cluster) options1 + "build_database_server" + "build_database" else options1
+ if (build_cluster) options1 + "build_database" else options1
}
final def build_store(options: Options,
@@ -550,10 +550,8 @@
var force = false
var list_builds = false
var options =
- Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS :::
- List(
- Options.Spec.make("build_database_server"),
- Options.Spec.make("build_database")))
+ Options.init(specs =
+ Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database")))
var remove_builds = false
val getopts = Getopts("""
@@ -653,10 +651,8 @@
val dirs = new mutable.ListBuffer[Path]
var max_jobs: Option[Int] = None
var options =
- Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS :::
- List(
- Options.Spec.make("build_database_server"),
- Options.Spec.make("build_database")))
+ Options.init(specs =
+ Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database")))
var quiet = false
var verbose = false
--- a/src/Pure/Build/build_benchmark.scala Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/build_benchmark.scala Wed Feb 21 19:36:53 2024 +0100
@@ -51,13 +51,13 @@
val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress)
val session = sessions(benchmark_session)
- val heaps = session.ancestors.map(store.output_heap)
- ML_Heap.restore(database_server, heaps, cache = store.cache.compress)
+ val hierachy = session.ancestors.map(store.output_session(_, store_heap = true))
+ ML_Heap.restore(database_server, hierachy, cache = store.cache.compress)
val local_options = options + "build_database_server=false" + "build_database=false"
benchmark_requirements(local_options, progress)
- ML_Heap.restore(database_server, heaps, cache = store.cache.compress)
+ ML_Heap.restore(database_server, hierachy, cache = store.cache.compress)
def get_shasum(session_name: String): SHA1.Shasum = {
val ancestor_shasums = sessions(session_name).ancestors.map(get_shasum)
--- a/src/Pure/Build/build_job.scala Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/build_job.scala Wed Feb 21 19:36:53 2024 +0100
@@ -120,6 +120,7 @@
val options = Host.node_options(info.options, node_info)
val store = build_context.store
+ val store_session = store.output_session(session_name, store_heap = store_heap)
using_optional(store.maybe_open_database_server(server = server)) { database_server =>
@@ -467,15 +468,12 @@
/* output heap */
- val output_shasum = {
- val heap = store.output_heap(session_name)
- if (process_result.ok && store_heap && heap.is_file) {
- val slice = Space.MiB(options.real("build_database_slice"))
- val digest = ML_Heap.store(database_server, session_name, heap, slice)
- SHA1.shasum(digest, session_name)
+ val output_shasum =
+ store_session.heap match {
+ case Some(path) if process_result.ok =>
+ SHA1.shasum(ML_Heap.write_file_digest(path), session_name)
+ case _ => SHA1.no_shasum
}
- else SHA1.no_shasum
- }
val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
@@ -516,6 +514,17 @@
true
}
+ using_optional(store.maybe_open_heaps_database(database_server, server = server)) {
+ heaps_database =>
+ for (db <- database_server orElse heaps_database) {
+ ML_Heap.clean_entry(db, session_name)
+ if (process_result.ok) {
+ val slice = Space.MiB(options.real("build_database_slice"))
+ ML_Heap.store(db, store_session, slice, progress = progress)
+ }
+ }
+ }
+
// messages
process_result.err_lines.foreach(progress.echo(_))
--- a/src/Pure/Build/build_process.scala Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/build_process.scala Wed Feb 21 19:36:53 2024 +0100
@@ -857,6 +857,10 @@
try { store.maybe_open_database_server(server = server) }
catch { case exn: Throwable => close(); throw exn }
+ protected val _heaps_database: Option[SQL.Database] =
+ try { store.maybe_open_heaps_database(_database_server, server = server) }
+ catch { case exn: Throwable => close(); throw exn }
+
protected val _build_database: Option[SQL.Database] =
try {
for (db <- store.maybe_open_build_database(server = server)) yield {
@@ -926,6 +930,7 @@
def close(): Unit = synchronized {
Option(_database_server).flatten.foreach(_.close())
+ Option(_heaps_database).flatten.foreach(_.close())
Option(_build_database).flatten.foreach(_.close())
Option(_host_database).foreach(_.close())
Option(_build_cluster).foreach(_.close())
@@ -1015,8 +1020,11 @@
val cancelled = progress.stopped || !ancestor_results.forall(_.ok)
if (!skipped && !cancelled) {
- val heaps = (session_name :: ancestor_results.map(_.name)).map(store.output_heap)
- ML_Heap.restore(_database_server, heaps, cache = store.cache.compress)
+ val hierarchy =
+ (session_name :: ancestor_results.map(_.name))
+ .map(store.output_session(_, store_heap = true))
+ ML_Heap.restore(_database_server orElse _heaps_database,
+ hierarchy, cache = store.cache.compress)
}
val result_name = (session_name, worker_uuid, build_uuid)
--- a/src/Pure/Build/store.scala Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/store.scala Wed Feb 21 19:36:53 2024 +0100
@@ -190,6 +190,15 @@
uuid)
})
+ def read_build_uuid(db: SQL.Database, name: String): String =
+ db.execute_query_statementO[String](
+ Session_Info.table.select(List(Session_Info.uuid),
+ sql = Session_Info.session_name.where_equal(name)),
+ { res =>
+ try { Option(res.string(Session_Info.uuid)).getOrElse("") }
+ catch { case _: SQLException => "" }
+ }).getOrElse("")
+
def write_session_info(
db: SQL.Database,
cache: Compress.Cache,
@@ -249,6 +258,10 @@
)
}
}
+
+ def read_build_uuid(path: Path, session: String): String =
+ try { using(SQLite.open_database(path))(private_data.read_build_uuid(_, session)) }
+ catch { case _: SQLException => "" }
}
class Store private(
@@ -301,6 +314,12 @@
new Store.Session(name, heap, log_db, input_dirs)
}
+ def output_session(name: String, store_heap: Boolean = false): Store.Session = {
+ val heap = if (store_heap) Some(output_heap(name)) else None
+ val log_db = if (!build_database_server) Some(output_log_db(name)) else None
+ new Store.Session(name, heap, log_db, List(output_dir))
+ }
+
/* heap */
@@ -343,8 +362,20 @@
ssh_port = options.int("build_database_ssh_port"),
ssh_user = options.string("build_database_ssh_user"))
- def maybe_open_database_server(server: SSH.Server = SSH.no_server): Option[SQL.Database] =
- if (build_database_server) Some(open_database_server(server = server)) else None
+ def maybe_open_database_server(
+ server: SSH.Server = SSH.no_server,
+ guard: Boolean = build_database_server
+ ): Option[SQL.Database] = {
+ if (guard) Some(open_database_server(server = server)) else None
+ }
+
+ def maybe_open_heaps_database(
+ database_server: Option[SQL.Database],
+ server: SSH.Server = SSH.no_server
+ ): Option[SQL.Database] = {
+ if (database_server.isDefined) None
+ else store.maybe_open_database_server(server = server, guard = build_cluster)
+ }
def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database =
if (build_database_server || build_cluster) open_database_server(server = server)
@@ -395,9 +426,7 @@
): Option[Boolean] = {
val relevant_db =
database_server match {
- case Some(db) =>
- ML_Heap.clean_entry(db, name)
- clean_session_info(db, name)
+ case Some(db) => clean_session_info(db, name)
case None => false
}
--- a/src/Pure/ML/ml_heap.scala Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/ML/ml_heap.scala Wed Feb 21 19:36:53 2024 +0100
@@ -43,6 +43,8 @@
/* SQL data model */
+ sealed case class Log_DB(uuid: String, content: Bytes)
+
object private_data extends SQL.Data("isabelle_heaps") {
override lazy val tables = SQL.Tables(Base.table, Slices.table)
@@ -54,8 +56,10 @@
val name = Generic.name
val size = SQL.Column.long("size")
val digest = SQL.Column.string("digest")
+ val uuid = SQL.Column.string("uuid")
+ val log_db = SQL.Column.bytes("log_db")
- val table = make_table(List(name, size, digest))
+ val table = make_table(List(name, size, digest, uuid, log_db))
}
object Slices {
@@ -97,6 +101,20 @@
sql = Generic.name.where_equal(name) + SQL.order_by(List(Slices.slice))),
List.from[Bytes], _.bytes(Slices.content))
+ def read_log_db(db: SQL.Database, name: String, old_uuid: String = ""): Option[Log_DB] =
+ db.execute_query_statement(
+ Base.table.select(List(Base.uuid, Base.log_db), sql =
+ SQL.where_and(
+ Generic.name.equal(name),
+ if_proper(old_uuid, Base.uuid.ident + " <> " + SQL.string(old_uuid)))),
+ List.from[(String, Bytes)],
+ res => (res.string(Base.uuid), res.bytes(Base.log_db))
+ ).collectFirst(
+ {
+ case (uuid, content) if uuid.nonEmpty && !content.is_empty =>
+ Log_DB(uuid, content)
+ })
+
def write_slice(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit =
db.execute_statement(Slices.table.insert(), body =
{ stmt =>
@@ -118,15 +136,26 @@
stmt.string(1) = name
stmt.long(2) = None
stmt.string(3) = None
+ stmt.string(4) = None
+ stmt.bytes(5) = None
})
- def finish_entry(db: SQL.Database, name: String, size: Long, digest: SHA1.Digest): Unit =
+ def finish_entry(
+ db: SQL.Database,
+ name: String,
+ size: Long,
+ opt_digest: Option[SHA1.Digest],
+ opt_log_db: Option[Log_DB]
+ ): Unit =
db.execute_statement(
- Base.table.update(List(Base.size, Base.digest), sql = Base.name.where_equal(name)),
+ Base.table.update(List(Base.size, Base.digest, Base.uuid, Base.log_db),
+ sql = Base.name.where_equal(name)),
body =
{ stmt =>
stmt.long(1) = size
- stmt.string(2) = digest.toString
+ stmt.string(2) = opt_digest.map(_.toString)
+ stmt.string(3) = opt_log_db.map(_.uuid)
+ stmt.bytes(4) = opt_log_db.map(_.content)
})
}
@@ -136,72 +165,97 @@
}
def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] =
- private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") {
- private_data.read_digests(db, names)
+ if (names.isEmpty) Map.empty
+ else {
+ private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") {
+ private_data.read_digests(db, names)
+ }
}
def store(
- database: Option[SQL.Database],
- session_name: String,
- heap: Path,
+ db: SQL.Database,
+ session: Store.Session,
slice: Space,
- cache: Compress.Cache = Compress.Cache.none
- ): SHA1.Digest = {
- val digest = write_file_digest(heap)
- database match {
- case None =>
- case Some(db) =>
- val size = File.size(heap) - sha1_prefix.length - SHA1.digest_length
+ cache: Compress.Cache = Compress.Cache.none,
+ progress: Progress = new Progress
+ ): Unit = {
+ val size =
+ session.heap match {
+ case Some(heap) => File.size(heap) - sha1_prefix.length - SHA1.digest_length
+ case None => 0L
+ }
- val slice_size = slice.bytes max Space.MiB(1).bytes
- val slices = (size.toDouble / slice_size.toDouble).ceil.toInt
- val step = if (slices > 0) (size.toDouble / slices.toDouble).ceil.toLong else 0L
+ val slice_size = slice.bytes max Space.MiB(1).bytes
+ val slices = (size.toDouble / slice_size.toDouble).ceil.toInt
+ val step = if (slices > 0) (size.toDouble / slices.toDouble).ceil.toLong else 0L
- try {
- private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") {
- private_data.init_entry(db, session_name)
- }
+ try {
+ private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") {
+ private_data.init_entry(db, session.name)
+ }
- for (i <- 0 until slices) {
- val j = i + 1
- val offset = step * i
- val limit = if (j < slices) step * j else size
- val content =
- Bytes.read_file(heap, offset = offset, limit = limit)
- .compress(cache = cache)
- private_data.transaction_lock(db, label = "ML_Heap.store2") {
- private_data.write_slice(db, session_name, i, content)
- }
- }
+ if (slices > 0) progress.echo("Storing " + session.name + " ...")
+ for (i <- 0 until slices) {
+ val j = i + 1
+ val offset = step * i
+ val limit = if (j < slices) step * j else size
+ val content =
+ Bytes.read_file(session.the_heap, offset = offset, limit = limit)
+ .compress(cache = cache)
+ private_data.transaction_lock(db, label = "ML_Heap.store2") {
+ private_data.write_slice(db, session.name, i, content)
+ }
+ }
+
+ val opt_digest =
+ for {
+ path <- session.heap
+ digest <- read_file_digest(path)
+ } yield digest
- private_data.transaction_lock(db, label = "ML_Heap.store3") {
- private_data.finish_entry(db, session_name, size, digest)
- }
- }
- catch { case exn: Throwable =>
- private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") {
- private_data.clean_entry(db, session_name)
- }
- throw exn
- }
+ val opt_log_db =
+ for {
+ path <- session.log_db
+ uuid <- proper_string(Store.read_build_uuid(path, session.name))
+ } yield Log_DB(uuid, Bytes.read(path))
+
+ if (opt_log_db.isDefined) progress.echo("Storing " + session.name + ".db ...")
+
+ private_data.transaction_lock(db, label = "ML_Heap.store3") {
+ private_data.finish_entry(db, session.name, size, opt_digest, opt_log_db)
+ }
}
- digest
+ catch { case exn: Throwable =>
+ private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") {
+ private_data.clean_entry(db, session.name)
+ }
+ throw exn
+ }
}
def restore(
database: Option[SQL.Database],
- heaps: List[Path],
- cache: Compress.Cache = Compress.Cache.none
+ sessions: List[Store.Session],
+ cache: Compress.Cache = Compress.Cache.none,
+ progress: Progress = new Progress
): Unit = {
database match {
- case Some(db) if heaps.nonEmpty =>
+ case Some(db) if sessions.exists(_.defined) =>
private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") {
- val db_digests = private_data.read_digests(db, heaps.map(_.file_name))
- for (heap <- heaps) {
- val session_name = heap.file_name
+ /* heap */
+
+ val defined_heaps =
+ for (session <- sessions; heap <- session.heap)
+ yield session.name -> heap
+
+ val db_digests = private_data.read_digests(db, defined_heaps.map(_._1))
+
+ for ((session_name, heap) <- defined_heaps) {
val file_digest = read_file_digest(heap)
val db_digest = db_digests.get(session_name)
if (db_digest.isDefined && db_digest != file_digest) {
+ progress.echo("Restoring " + session_name + " ...")
+
val base_dir = Isabelle_System.make_directory(heap.expand.dir)
Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp =>
Bytes.write(tmp, Bytes.empty)
@@ -213,10 +267,25 @@
Isabelle_System.chmod("a+r", tmp)
Isabelle_System.move_file(tmp, heap)
}
- else error("Incoherent content for session heap " + quote(session_name))
+ else error("Incoherent content for session heap " + heap)
}
}
}
+
+
+ /* log_db */
+
+ for (session <- sessions; path <- session.log_db) {
+ val file_uuid = Store.read_build_uuid(path, session.name)
+ private_data.read_log_db(db, session.name, old_uuid = file_uuid) match {
+ case Some(log_db) if file_uuid.isEmpty =>
+ progress.echo("Restoring " + session.name + ".db ...")
+ Isabelle_System.make_directory(path.expand.dir)
+ Bytes.write(path, log_db.content)
+ case Some(_) => error("Incoherent content for session database " + path)
+ case None =>
+ }
+ }
}
case _ =>
}