# HG changeset patch # User wenzelm # Date 1708615918 -3600 # Node ID 611587256801c0c104b92f3d5b234f32ba20c7fb # Parent c172eecba85d49b637f124fe9869d142ee948d61# Parent e8122e84aa5896e080649fc356a06bb9d993163e merged diff -r c172eecba85d -r 611587256801 src/Pure/Admin/build_history.scala --- a/src/Pure/Admin/build_history.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Admin/build_history.scala Thu Feb 22 16:31:58 2024 +0100 @@ -282,7 +282,7 @@ build_out_progress.echo("Reading session build info ...") val session_build_info = build_info.finished_sessions.flatMap { session_name => - val database = isabelle_output + store.log_db(session_name) + val database = isabelle_output + Store.log_db(session_name) if (database.is_file) { using(SQLite.open_database(database)) { db => @@ -309,8 +309,8 @@ build_out_progress.echo("Reading ML statistics ...") val ml_statistics = build_info.finished_sessions.flatMap { session_name => - val database = isabelle_output + store.log_db(session_name) - val log_gz = isabelle_output + store.log_gz(session_name) + val database = isabelle_output + Store.log_db(session_name) + val log_gz = isabelle_output + Store.log_gz(session_name) val properties = if (database.is_file) { @@ -336,7 +336,7 @@ build_out_progress.echo("Reading error messages ...") val session_errors = build_info.failed_sessions.flatMap { session_name => - val database = isabelle_output + store.log_db(session_name) + val database = isabelle_output + Store.log_db(session_name) val errors = if (database.is_file) { try { diff -r c172eecba85d -r 611587256801 src/Pure/Build/build.scala --- a/src/Pure/Build/build.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Build/build.scala Thu Feb 22 16:31:58 2024 +0100 @@ -115,14 +115,15 @@ def build_options(options: Options, build_cluster: Boolean = false): Options = { val options1 = options + "completion_limit=0" + "editor_tracing_messages=0" - if (build_cluster) options1 + "build_database_server" + "build_database" else options1 + if (build_cluster) options1 + "build_database" else options1 } final def build_store(options: Options, build_cluster: Boolean = false, cache: Term.Cache = Term.Cache.make() ): Store = { - val store = Store(build_options(options, build_cluster = build_cluster), cache = cache) + val store_options = build_options(options, build_cluster = build_cluster) + val store = Store(store_options, build_cluster = build_cluster, cache = cache) Isabelle_System.make_directory(store.output_dir + Path.basic("log")) Isabelle_Fonts.init() store @@ -504,13 +505,14 @@ def build_process( options: Options, + build_cluster: Boolean = false, list_builds: Boolean = false, remove_builds: Boolean = false, force: Boolean = false, progress: Progress = new Progress ): Unit = { val build_engine = Engine(engine_name(options)) - val store = build_engine.build_store(options) + val store = build_engine.build_store(options, build_cluster = build_cluster) using(store.open_server()) { server => using_optional(store.maybe_open_build_database(server = server)) { build_database => @@ -544,25 +546,28 @@ val isabelle_tool2 = Isabelle_Tool("build_process", "manage session build process", Scala_Project.here, { args => + var build_cluster = false var force = false var list_builds = false var options = - Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: - List( - Options.Spec.make("build_database_server"), - Options.Spec.make("build_database"))) + Options.init(specs = + Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database"))) var remove_builds = false val getopts = Getopts(""" Usage: isabelle build_process [OPTIONS] Options are: + -C build cluster mode (database server) -f extra force for option -r -l list build processes -o OPTION override Isabelle system OPTION (via NAME=VAL or NAME) -r remove data from build processes: inactive processes (default) or all processes (option -f) + + Manage Isabelle build process, notably distributed build cluster (option -C). """, + "C" -> (_ => build_cluster = true), "f" -> (_ => force = true), "l" -> (_ => list_builds = true), "o:" -> (arg => options = options + arg), @@ -573,8 +578,8 @@ val progress = new Console_Progress() - build_process(options, list_builds = list_builds, remove_builds = remove_builds, - force = force, progress = progress) + build_process(options, build_cluster = build_cluster, list_builds = list_builds, + remove_builds = remove_builds, force = force, progress = progress) }) @@ -613,7 +618,7 @@ max_jobs: Option[Int] = None ): Results = { val build_engine = Engine(engine_name(options)) - val store = build_engine.build_store(options) + val store = build_engine.build_store(options, build_cluster = true) val build_options = store.options using(store.open_server()) { server => @@ -648,10 +653,8 @@ val dirs = new mutable.ListBuffer[Path] var max_jobs: Option[Int] = None var options = - Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: - List( - Options.Spec.make("build_database_server"), - Options.Spec.make("build_database"))) + Options.init(specs = + Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database"))) var quiet = false var verbose = false diff -r c172eecba85d -r 611587256801 src/Pure/Build/build_benchmark.scala --- a/src/Pure/Build/build_benchmark.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Build/build_benchmark.scala Thu Feb 22 16:31:58 2024 +0100 @@ -51,13 +51,13 @@ val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress) val session = sessions(benchmark_session) - val heaps = session.ancestors.map(store.output_heap) - ML_Heap.restore(database_server, heaps, cache = store.cache.compress) + val hierachy = session.ancestors.map(store.output_session(_, store_heap = true)) + for (db <- database_server) ML_Heap.restore(db, hierachy, cache = store.cache.compress) val local_options = options + "build_database_server=false" + "build_database=false" benchmark_requirements(local_options, progress) - ML_Heap.restore(database_server, heaps, cache = store.cache.compress) + for (db <- database_server) ML_Heap.restore(db, hierachy, cache = store.cache.compress) def get_shasum(session_name: String): SHA1.Shasum = { val ancestor_shasums = sessions(session_name).ancestors.map(get_shasum) diff -r c172eecba85d -r 611587256801 src/Pure/Build/build_cluster.scala --- a/src/Pure/Build/build_cluster.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Build/build_cluster.scala Thu Feb 22 16:31:58 2024 +0100 @@ -189,7 +189,9 @@ remote_isabelle } - def init(): Unit = remote_isabelle.init() + def init(): Unit = + remote_isabelle.init(other_settings = + remote_isabelle.init_components() ::: remote_isabelle.debug_settings()) def benchmark(): Unit = { val script = diff -r c172eecba85d -r 611587256801 src/Pure/Build/build_job.scala --- a/src/Pure/Build/build_job.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Build/build_job.scala Thu Feb 22 16:31:58 2024 +0100 @@ -277,7 +277,8 @@ for { elapsed <- Markup.Elapsed.unapply(props) elapsed_time = Time.seconds(elapsed) - if elapsed_time.is_relevant && elapsed_time >= options.seconds("command_timing_threshold") + if elapsed_time.is_relevant && + elapsed_time >= options.seconds("command_timing_threshold") } command_timings += props.filter(Markup.command_timing_property) } @@ -292,7 +293,8 @@ if (!progress.stopped) { val theory_name = snapshot.node_name.theory val args = - Protocol.Export.Args(theory_name = theory_name, name = name, compress = compress) + Protocol.Export.Args( + theory_name = theory_name, name = name, compress = compress) val body = Bytes(Symbol.encode(YXML.string_of_body(xml))) export_consumer.make_entry(session_name, args, body) } @@ -401,8 +403,9 @@ output_sources = info.document_output, output_pdf = info.document_output) } - using(database_context.open_database(session_name, output = true))(session_database => - documents.foreach(_.write(session_database.db, session_name))) + using(database_context.open_database(session_name, output = true))( + session_database => + documents.foreach(_.write(session_database.db, session_name))) (documents.flatMap(_.log_lines), Nil) } } @@ -450,7 +453,9 @@ errs.flatMap(s => split_lines(Output.error_message_text(s))) ::: errs.map(Protocol.Error_Message_Marker.apply)) } - else if (progress.stopped && result1.ok) result1.copy(rc = Process_Result.RC.interrupt) + else if (progress.stopped && result1.ok) { + result1.copy(rc = Process_Result.RC.interrupt) + } else result1 case Exn.Exn(Exn.Interrupt()) => if (result1.ok) result1.copy(rc = Process_Result.RC.interrupt) @@ -464,18 +469,17 @@ else if (result2.interrupted) result2.error(Output.error_message_text("Interrupt")) else result2 + val store_session = + store.output_session(session_name, store_heap = process_result.ok && store_heap) + /* output heap */ - val output_shasum = { - val heap = store.output_heap(session_name) - if (process_result.ok && store_heap && heap.is_file) { - val slice = Space.MiB(options.real("build_database_slice")).bytes - val digest = ML_Heap.store(database_server, session_name, heap, slice) - SHA1.shasum(digest, session_name) + val output_shasum = + store_session.heap match { + case Some(path) => SHA1.shasum(ML_Heap.write_file_digest(path), session_name) + case None => SHA1.no_shasum } - else SHA1.no_shasum - } val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) @@ -516,6 +520,15 @@ true } + using_optional(store.maybe_open_heaps_database(database_server, server = server)) { + heaps_database => + for (db <- database_server orElse heaps_database) { + val slice = Space.MiB(options.real("build_database_slice")) + ML_Heap.store(db, store_session, slice, + cache = store.cache.compress, progress = progress) + } + } + // messages process_result.err_lines.foreach(progress.echo(_)) @@ -531,11 +544,14 @@ } else { progress.echo( - session_name + " FAILED (see also \"isabelle build_log -H Error " + session_name + "\")") + session_name + " FAILED (see also \"isabelle build_log -H Error " + + session_name + "\")") if (!process_result.interrupted) { val tail = info.options.int("process_output_tail") - val suffix = if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0) - val prefix = if (log_lines.length == suffix.length) Nil else List("...") + val suffix = + if (tail == 0) log_lines else log_lines.drop(log_lines.length - tail max 0) + val prefix = + if (log_lines.length == suffix.length) Nil else List("...") progress.echo(Library.trim_line(cat_lines(prefix ::: suffix))) } } diff -r c172eecba85d -r 611587256801 src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Build/build_process.scala Thu Feb 22 16:31:58 2024 +0100 @@ -785,7 +785,7 @@ Running.table, Results.table) - val build_uuid_tables = + private val build_uuid_tables = tables.filter(table => table.columns.exists(column => column.name == Generic.build_uuid.name)) @@ -857,6 +857,10 @@ try { store.maybe_open_database_server(server = server) } catch { case exn: Throwable => close(); throw exn } + protected val _heaps_database: Option[SQL.Database] = + try { store.maybe_open_heaps_database(_database_server, server = server) } + catch { case exn: Throwable => close(); throw exn } + protected val _build_database: Option[SQL.Database] = try { for (db <- store.maybe_open_build_database(server = server)) yield { @@ -917,7 +921,7 @@ protected def open_build_cluster(): Build_Cluster = Build_Cluster.make(build_context, progress = build_progress).open() - protected val _build_cluster = + protected val _build_cluster: Build_Cluster = try { if (build_context.master && _build_database.isDefined) open_build_cluster() else Build_Cluster.none @@ -926,6 +930,7 @@ def close(): Unit = synchronized { Option(_database_server).flatten.foreach(_.close()) + Option(_heaps_database).flatten.foreach(_.close()) Option(_build_database).flatten.foreach(_.close()) Option(_host_database).foreach(_.close()) Option(_build_cluster).foreach(_.close()) @@ -1015,8 +1020,12 @@ val cancelled = progress.stopped || !ancestor_results.forall(_.ok) if (!skipped && !cancelled) { - val heaps = (session_name :: ancestor_results.map(_.name)).map(store.output_heap) - ML_Heap.restore(_database_server, heaps, cache = store.cache.compress) + for (db <- _database_server orElse _heaps_database) { + val hierarchy = + (session_name :: ancestor_results.map(_.name)) + .map(store.output_session(_, store_heap = true)) + ML_Heap.restore(db, hierarchy, cache = store.cache.compress) + } } val result_name = (session_name, worker_uuid, build_uuid) diff -r c172eecba85d -r 611587256801 src/Pure/Build/build_schedule.scala --- a/src/Pure/Build/build_schedule.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Build/build_schedule.scala Thu Feb 22 16:31:58 2024 +0100 @@ -1213,6 +1213,11 @@ class Build_Engine extends Build.Engine("build_schedule") { + override def build_options(options: Options, build_cluster: Boolean = false): Options = { + val options1 = super.build_options(options, build_cluster = build_cluster) + if (build_cluster) options1 + "build_database_server" else options1 + } + def scheduler(timing_data: Timing_Data, context: Build.Context): Scheduler = { val sessions_structure = context.sessions_structure diff -r c172eecba85d -r 611587256801 src/Pure/Build/export.scala --- a/src/Pure/Build/export.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Build/export.scala Thu Feb 22 16:31:58 2024 +0100 @@ -674,8 +674,7 @@ /* export files */ - val store = Store(options) - export_files(store, session_name, export_dir, progress = progress, export_prune = export_prune, - export_list = export_list, export_patterns = export_patterns) + export_files(Store(options), session_name, export_dir, progress = progress, + export_prune = export_prune, export_list = export_list, export_patterns = export_patterns) }) } diff -r c172eecba85d -r 611587256801 src/Pure/Build/store.scala --- a/src/Pure/Build/store.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Build/store.scala Thu Feb 22 16:31:58 2024 +0100 @@ -11,15 +11,38 @@ object Store { - def apply(options: Options, cache: Term.Cache = Term.Cache.make()): Store = - new Store(options, cache) + def apply( + options: Options, + build_cluster: Boolean = false, + cache: Term.Cache = Term.Cache.make() + ): Store = new Store(options, build_cluster, cache) + + + /* file names */ + + def heap(name: String): Path = Path.basic(name) + def log(name: String): Path = Path.basic("log") + Path.basic(name) + def log_db(name: String): Path = log(name).db + def log_gz(name: String): Path = log(name).gz /* session */ - sealed case class Session(name: String, heap: Option[Path], log_db: Option[Path]) { + final class Session private[Store]( + val name: String, + val heap: Option[Path], + val log_db: Option[Path], + dirs: List[Path] + ) { + def log_db_name: String = Store.log_db(name).implode + def defined: Boolean = heap.isDefined || log_db.isDefined + def the_heap: Path = + heap getOrElse + error("Missing heap image for session " + quote(name) + " -- expected in:\n" + + cat_lines(dirs.map(dir => " " + File.standard_path(dir)))) + def heap_digest(): Option[SHA1.Digest] = heap.flatMap(ML_Heap.read_file_digest) @@ -177,6 +200,15 @@ uuid) }) + def read_build_uuid(db: SQL.Database, name: String): String = + db.execute_query_statementO[String]( + Session_Info.table.select(List(Session_Info.uuid), + sql = Session_Info.session_name.where_equal(name)), + { res => + try { Option(res.string(Session_Info.uuid)).getOrElse("") } + catch { case _: SQLException => "" } + }).getOrElse("") + def write_session_info( db: SQL.Database, cache: Compress.Cache, @@ -236,9 +268,17 @@ ) } } + + def read_build_uuid(path: Path, session: String): String = + try { using(SQLite.open_database(path))(private_data.read_build_uuid(_, session)) } + catch { case _: SQLException => "" } } -class Store private(val options: Options, val cache: Term.Cache) { +class Store private( + val options: Options, + val build_cluster: Boolean, + val cache: Term.Cache + ) { store => override def toString: String = "Store(output_dir = " + output_dir.absolute + ")" @@ -265,38 +305,34 @@ /* file names */ - def heap(name: String): Path = Path.basic(name) - def log(name: String): Path = Path.basic("log") + Path.basic(name) - def log_db(name: String): Path = log(name).db - def log_gz(name: String): Path = log(name).gz - - def output_heap(name: String): Path = output_dir + heap(name) - def output_log(name: String): Path = output_dir + log(name) - def output_log_db(name: String): Path = output_dir + log_db(name) - def output_log_gz(name: String): Path = output_dir + log_gz(name) + def output_heap(name: String): Path = output_dir + Store.heap(name) + def output_log(name: String): Path = output_dir + Store.log(name) + def output_log_db(name: String): Path = output_dir + Store.log_db(name) + def output_log_gz(name: String): Path = output_dir + Store.log_gz(name) /* session */ def get_session(name: String): Store.Session = { - val heap = input_dirs.view.map(_ + store.heap(name)).find(_.is_file) - val log_db = input_dirs.view.map(_ + store.log_db(name)).find(_.is_file) - Store.Session(name, heap, log_db) + val heap = input_dirs.view.map(_ + Store.heap(name)).find(_.is_file) + val log_db = input_dirs.view.map(_ + Store.log_db(name)).find(_.is_file) + new Store.Session(name, heap, log_db, input_dirs) + } + + def output_session(name: String, store_heap: Boolean = false): Store.Session = { + val heap = if (store_heap) Some(output_heap(name)) else None + val log_db = if (!build_database_server) Some(output_log_db(name)) else None + new Store.Session(name, heap, log_db, List(output_dir)) } /* heap */ - def the_heap(name: String): Path = - get_session(name).heap getOrElse - error("Missing heap image for session " + quote(name) + " -- expected in:\n" + - cat_lines(input_dirs.map(dir => " " + File.standard_path(dir)))) - def heap_shasum(database_server: Option[SQL.Database], name: String): SHA1.Shasum = { def get_database: Option[SHA1.Digest] = { for { db <- database_server - digest <- ML_Heap.get_entries(db, List(name)).valuesIterator.nextOption() + digest <- ML_Heap.read_digests(db, List(name)).valuesIterator.nextOption() } yield digest } @@ -331,11 +367,23 @@ ssh_port = options.int("build_database_ssh_port"), ssh_user = options.string("build_database_ssh_user")) - def maybe_open_database_server(server: SSH.Server = SSH.no_server): Option[SQL.Database] = - if (build_database_server) Some(open_database_server(server = server)) else None + def maybe_open_database_server( + server: SSH.Server = SSH.no_server, + guard: Boolean = build_database_server + ): Option[SQL.Database] = { + if (guard) Some(open_database_server(server = server)) else None + } + + def maybe_open_heaps_database( + database_server: Option[SQL.Database], + server: SSH.Server = SSH.no_server + ): Option[SQL.Database] = { + if (database_server.isDefined) None + else store.maybe_open_database_server(server = server, guard = build_cluster) + } def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database = - if (build_database_server) open_database_server(server = server) + if (build_database_server || build_cluster) open_database_server(server = server) else SQLite.open_database(path, restrict = true) def maybe_open_build_database( @@ -359,7 +407,7 @@ else { (for { dir <- input_dirs.view - path = dir + log_db(name) if path.is_file + path = dir + Store.log_db(name) if path.is_file db <- check(SQLite.open_database(path)) } yield db).headOption } @@ -383,9 +431,7 @@ ): Option[Boolean] = { val relevant_db = database_server match { - case Some(db) => - ML_Heap.clean_entry(db, name) - clean_session_info(db, name) + case Some(db) => clean_session_info(db, name) case None => false } @@ -393,7 +439,7 @@ for { dir <- (if (system_heaps) List(user_output_dir, system_output_dir) else List(user_output_dir)) - file <- List(heap(name), log_db(name), log(name), log_gz(name)) + file <- List(Store.heap(name), Store.log_db(name), Store.log(name), Store.log_gz(name)) path = dir + file if path.is_file } yield path.file.delete diff -r c172eecba85d -r 611587256801 src/Pure/ML/ml_heap.scala --- a/src/Pure/ML/ml_heap.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/ML/ml_heap.scala Thu Feb 22 16:31:58 2024 +0100 @@ -7,11 +7,6 @@ package isabelle -import java.nio.ByteBuffer -import java.nio.channels.FileChannel -import java.nio.file.StandardOpenOption - - object ML_Heap { /** heap file with SHA1 digest **/ @@ -43,6 +38,8 @@ /* SQL data model */ + sealed case class Log_DB(uuid: String, content: Bytes) + object private_data extends SQL.Data("isabelle_heaps") { override lazy val tables = SQL.Tables(Base.table, Slices.table) @@ -52,10 +49,24 @@ object Base { val name = Generic.name - val size = SQL.Column.long("size") - val digest = SQL.Column.string("digest") + val heap_size = SQL.Column.long("heap_size") + val heap_digest = SQL.Column.string("heap_digest") + val uuid = SQL.Column.string("uuid") + val log_db = SQL.Column.bytes("log_db") + + val table = make_table(List(name, heap_size, heap_digest, uuid, log_db)) + } - val table = make_table(List(name, size, digest)) + object Size { + val name = Generic.name + val heap = SQL.Column.string("heap") + val log_db = SQL.Column.string("log_db") + + val table = make_table(List(name, heap, log_db), + body = + "SELECT name, pg_size_pretty(heap_size::bigint) as heap, " + + " pg_size_pretty(length(log_db)::bigint) as log_db FROM " + Base.table.ident, + name = "size") } object Slices { @@ -68,8 +79,8 @@ object Slices_Size { val name = Generic.name - val slice = SQL.Column.int("slice").make_primary_key - val size = SQL.Column.long("size") + val slice = Slices.slice + val size = SQL.Column.string("size") val table = make_table(List(name, slice, size), body = "SELECT name, slice, pg_size_pretty(length(content)::bigint) as size FROM " + @@ -77,42 +88,38 @@ name = "slices_size") } - def get_entries(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = { - require(names.nonEmpty) - + def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = { db.execute_query_statement( - Base.table.select(List(Base.name, Base.digest), + Base.table.select(List(Base.name, Base.heap_digest), sql = Generic.name.where_member(names)), List.from[(String, String)], - res => res.string(Base.name) -> res.string(Base.digest) - ).flatMap({ - case (_, "") => None - case (name, digest) => Some(name -> SHA1.fake_digest(digest)) + res => res.string(Base.name) -> res.string(Base.heap_digest) + ).collect({ + case (name, digest) if digest.nonEmpty => name -> SHA1.fake_digest(digest) }).toMap } - def read_entry(db: SQL.Database, name: String): List[Bytes] = + def read_slices(db: SQL.Database, name: String): List[Bytes] = db.execute_query_statement( Slices.table.select(List(Slices.content), sql = Generic.name.where_equal(name) + SQL.order_by(List(Slices.slice))), List.from[Bytes], _.bytes(Slices.content)) - def clean_entry(db: SQL.Database, name: String): Unit = { - for (table <- List(Base.table, Slices.table)) { - db.execute_statement(table.delete(sql = Base.name.where_equal(name))) - } - db.create_view(Slices_Size.table) - } - - def prepare_entry(db: SQL.Database, name: String): Unit = - db.execute_statement(Base.table.insert(), body = - { stmt => - stmt.string(1) = name - stmt.long(2) = None - stmt.string(3) = None + def read_log_db(db: SQL.Database, name: String, old_uuid: String = ""): Option[Log_DB] = + db.execute_query_statement( + Base.table.select(List(Base.uuid, Base.log_db), sql = + SQL.where_and( + Generic.name.equal(name), + if_proper(old_uuid, Base.uuid.ident + " <> " + SQL.string(old_uuid)))), + List.from[(String, Bytes)], + res => (res.string(Base.uuid), res.bytes(Base.log_db)) + ).collectFirst( + { + case (uuid, content) if uuid.nonEmpty && !content.is_empty => + Log_DB(uuid, content) }) - def write_entry(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit = + def write_slice(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit = db.execute_statement(Slices.table.insert(), body = { stmt => stmt.string(1) = name @@ -120,13 +127,43 @@ stmt.bytes(3) = content }) - def finish_entry(db: SQL.Database, name: String, size: Long, digest: SHA1.Digest): Unit = + def clean_entry(db: SQL.Database, name: String): Unit = { + for (table <- List(Base.table, Slices.table)) { + db.execute_statement(table.delete(sql = Base.name.where_equal(name))) + } + } + + def init_entry(db: SQL.Database, name: String, log_db: Option[Log_DB] = None): Unit = { + clean_entry(db, name) + for (table <- List(Size.table, Slices_Size.table)) { + db.create_view(table) + } + db.execute_statement(Base.table.insert(), body = + { stmt => + stmt.string(1) = name + stmt.long(2) = None + stmt.string(3) = None + stmt.string(4) = log_db.map(_.uuid) + stmt.bytes(5) = log_db.map(_.content) + }) + } + + def finish_entry( + db: SQL.Database, + name: String, + heap_size: Long, + heap_digest: Option[SHA1.Digest], + log_db: Option[Log_DB] + ): Unit = db.execute_statement( - Base.table.update(List(Base.size, Base.digest), sql = Base.name.where_equal(name)), + Base.table.update(List(Base.heap_size, Base.heap_digest, Base.uuid, Base.log_db), + sql = Base.name.where_equal(name)), body = { stmt => - stmt.long(1) = size - stmt.string(2) = digest.toString + stmt.long(1) = heap_size + stmt.string(2) = heap_digest.map(_.toString) + stmt.string(3) = log_db.map(_.uuid) + stmt.bytes(4) = log_db.map(_.content) }) } @@ -135,89 +172,127 @@ private_data.clean_entry(db, session_name) } - def get_entries(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = - private_data.transaction_lock(db, create = true, label = "ML_Heap.get_entries") { - private_data.get_entries(db, names) + def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = + if (names.isEmpty) Map.empty + else { + private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") { + private_data.read_digests(db, names) + } } def store( - database: Option[SQL.Database], - session_name: String, - heap: Path, - slice: Long, - cache: Compress.Cache = Compress.Cache.none - ): SHA1.Digest = { - val digest = write_file_digest(heap) - database match { - case None => - case Some(db) => - val size = File.size(heap) - sha1_prefix.length - SHA1.digest_length + db: SQL.Database, + session: Store.Session, + slice: Space, + cache: Compress.Cache = Compress.Cache.none, + progress: Progress = new Progress + ): Unit = { + val log_db = + for { + path <- session.log_db + uuid <- proper_string(Store.read_build_uuid(path, session.name)) + } yield Log_DB(uuid, Bytes.read(path)) - val slices = (size.toDouble / slice.toDouble).ceil.toInt - val step = (size.toDouble / slices.toDouble).ceil.toLong + val heap_digest = session.heap.map(write_file_digest) + val heap_size = + session.heap match { + case Some(heap) => File.size(heap) - sha1_prefix.length - SHA1.digest_length + case None => 0L + } - try { - private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") { - private_data.prepare_entry(db, session_name) - } + val slice_size = slice.bytes max Space.MiB(1).bytes + val slices = (heap_size.toDouble / slice_size.toDouble).ceil.toInt + + try { + if (slices == 0 && log_db.isDefined) progress.echo("Storing " + session.log_db_name + " ...") + + private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") { + private_data.init_entry(db, session.name, log_db = if (slices == 0) log_db else None) + } - for (i <- 0 until slices) { - val j = i + 1 - val offset = step * i - val limit = if (j < slices) step * j else size - val content = - Bytes.read_file(heap, offset = offset, limit = limit) - .compress(cache = cache) - private_data.transaction_lock(db, label = "ML_Heap.store2") { - private_data.write_entry(db, session_name, i, content) - } - } - - private_data.transaction_lock(db, label = "ML_Heap.store3") { - private_data.finish_entry(db, session_name, size, digest) + if (slices > 0) { + progress.echo("Storing " + session.name + " ...") + val step = (heap_size.toDouble / slices.toDouble).ceil.toLong + for (i <- 0 until slices) { + val j = i + 1 + val offset = step * i + val limit = if (j < slices) step * j else heap_size + val content = + Bytes.read_file(session.the_heap, offset = offset, limit = limit) + .compress(cache = cache) + private_data.transaction_lock(db, label = "ML_Heap.store2") { + private_data.write_slice(db, session.name, i, content) } } - catch { case exn: Throwable => - private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") { - private_data.clean_entry(db, session_name) - } - throw exn + + if (log_db.isDefined) progress.echo("Storing " + session.log_db_name + " ...") + + private_data.transaction_lock(db, label = "ML_Heap.store3") { + private_data.finish_entry(db, session.name, heap_size, heap_digest, log_db) } + } } - digest + catch { case exn: Throwable => + private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") { + private_data.clean_entry(db, session.name) + } + throw exn + } } def restore( - database: Option[SQL.Database], - heaps: List[Path], - cache: Compress.Cache = Compress.Cache.none + db: SQL.Database, + sessions: List[Store.Session], + cache: Compress.Cache = Compress.Cache.none, + progress: Progress = new Progress ): Unit = { - database match { - case Some(db) if heaps.nonEmpty => - private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") { - val db_digests = private_data.get_entries(db, heaps.map(_.file_name)) - for (heap <- heaps) { - val session_name = heap.file_name - val file_digest = read_file_digest(heap) - val db_digest = db_digests.get(session_name) - if (db_digest.isDefined && db_digest != file_digest) { - val base_dir = Isabelle_System.make_directory(heap.expand.dir) - Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp => - Bytes.write(tmp, Bytes.empty) - for (slice <- private_data.read_entry(db, session_name)) { - Bytes.append(tmp, slice.uncompress(cache = cache)) - } - val digest = write_file_digest(tmp) - if (db_digest.get == digest) { - Isabelle_System.chmod("a+r", tmp) - Isabelle_System.move_file(tmp, heap) - } - else error("Incoherent content for session heap " + quote(session_name)) + if (sessions.exists(_.defined)) { + private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") { + /* heap */ + + val defined_heaps = + for (session <- sessions; heap <- session.heap) + yield session.name -> heap + + val db_digests = private_data.read_digests(db, defined_heaps.map(_._1)) + + for ((session_name, heap) <- defined_heaps) { + val file_digest = read_file_digest(heap) + val db_digest = db_digests.get(session_name) + if (db_digest.isDefined && db_digest != file_digest) { + progress.echo("Restoring " + session_name + " ...") + + val base_dir = Isabelle_System.make_directory(heap.expand.dir) + Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp => + Bytes.write(tmp, Bytes.empty) + for (slice <- private_data.read_slices(db, session_name)) { + Bytes.append(tmp, slice.uncompress(cache = cache)) } + val digest = write_file_digest(tmp) + if (db_digest.get == digest) { + Isabelle_System.chmod("a+r", tmp) + Isabelle_System.move_file(tmp, heap) + } + else error("Incoherent content for session heap " + heap) } } } - case _ => + + + /* log_db */ + + for (session <- sessions; path <- session.log_db) { + val file_uuid = Store.read_build_uuid(path, session.name) + private_data.read_log_db(db, session.name, old_uuid = file_uuid) match { + case Some(log_db) if file_uuid.isEmpty => + progress.echo("Restoring " + session.log_db_name + " ...") + Isabelle_System.make_directory(path.expand.dir) + Bytes.write(path, log_db.content) + case Some(_) => error("Incoherent content for session database " + path) + case None => + } + } + } } } } diff -r c172eecba85d -r 611587256801 src/Pure/ML/ml_process.scala --- a/src/Pure/ML/ml_process.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/ML/ml_process.scala Thu Feb 22 16:31:58 2024 +0100 @@ -24,7 +24,7 @@ session_background.sessions_structure.selection(logic_name). build_requirements(List(logic_name)). - map(store.the_heap) + map(name => store.get_session(name).the_heap) } def apply( diff -r c172eecba85d -r 611587256801 src/Pure/System/other_isabelle.scala --- a/src/Pure/System/other_isabelle.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/System/other_isabelle.scala Thu Feb 22 16:31:58 2024 +0100 @@ -147,6 +147,14 @@ else error("Cannot proceed with existing user settings file: " + etc_settings) } + def debug_settings(): List[String] = { + val debug = System.getProperty("isabelle.debug", "") == "true" + if (debug) { + List("ISABELLE_JAVA_SYSTEM_OPTIONS=\"$ISABELLE_JAVA_SYSTEM_OPTIONS -Disabelle.debug=true\"") + } + else Nil + } + /* init */ diff -r c172eecba85d -r 611587256801 src/Pure/Tools/profiling.scala --- a/src/Pure/Tools/profiling.scala Wed Feb 21 16:19:36 2024 +0000 +++ b/src/Pure/Tools/profiling.scala Thu Feb 22 16:31:58 2024 +0100 @@ -99,7 +99,7 @@ locales = session.locales, locale_thms = session.locale_thms, global_thms = session.global_thms, - heap_size = File.space(store.the_heap(session_name)), + heap_size = File.space(store.get_session(session_name).the_heap), thys_id_size = session.sizeof_thys_id, thms_id_size = session.sizeof_thms_id, terms_size = session.sizeof_terms,