# HG changeset patch # User wenzelm # Date 1708540613 -3600 # Node ID 1fa1b32b037901bec1ca7fc3bb803e29395e4f33 # Parent df1059ea8846f1dabf56dbf726f0a4adec1dc266 build local log_db, with store/restore via optional database server; tuned messages; diff -r df1059ea8846 -r 1fa1b32b0379 src/Pure/Build/build.scala --- a/src/Pure/Build/build.scala Wed Feb 21 11:43:30 2024 +0100 +++ b/src/Pure/Build/build.scala Wed Feb 21 19:36:53 2024 +0100 @@ -115,7 +115,7 @@ def build_options(options: Options, build_cluster: Boolean = false): Options = { val options1 = options + "completion_limit=0" + "editor_tracing_messages=0" - if (build_cluster) options1 + "build_database_server" + "build_database" else options1 + if (build_cluster) options1 + "build_database" else options1 } final def build_store(options: Options, @@ -550,10 +550,8 @@ var force = false var list_builds = false var options = - Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: - List( - Options.Spec.make("build_database_server"), - Options.Spec.make("build_database"))) + Options.init(specs = + Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database"))) var remove_builds = false val getopts = Getopts(""" @@ -653,10 +651,8 @@ val dirs = new mutable.ListBuffer[Path] var max_jobs: Option[Int] = None var options = - Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS ::: - List( - Options.Spec.make("build_database_server"), - Options.Spec.make("build_database"))) + Options.init(specs = + Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database"))) var quiet = false var verbose = false diff -r df1059ea8846 -r 1fa1b32b0379 src/Pure/Build/build_benchmark.scala --- a/src/Pure/Build/build_benchmark.scala Wed Feb 21 11:43:30 2024 +0100 +++ b/src/Pure/Build/build_benchmark.scala Wed Feb 21 19:36:53 2024 +0100 @@ -51,13 +51,13 @@ val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress) val session = sessions(benchmark_session) - val heaps = session.ancestors.map(store.output_heap) - ML_Heap.restore(database_server, heaps, cache = store.cache.compress) + val hierachy = session.ancestors.map(store.output_session(_, store_heap = true)) + ML_Heap.restore(database_server, hierachy, cache = store.cache.compress) val local_options = options + "build_database_server=false" + "build_database=false" benchmark_requirements(local_options, progress) - ML_Heap.restore(database_server, heaps, cache = store.cache.compress) + ML_Heap.restore(database_server, hierachy, cache = store.cache.compress) def get_shasum(session_name: String): SHA1.Shasum = { val ancestor_shasums = sessions(session_name).ancestors.map(get_shasum) diff -r df1059ea8846 -r 1fa1b32b0379 src/Pure/Build/build_job.scala --- a/src/Pure/Build/build_job.scala Wed Feb 21 11:43:30 2024 +0100 +++ b/src/Pure/Build/build_job.scala Wed Feb 21 19:36:53 2024 +0100 @@ -120,6 +120,7 @@ val options = Host.node_options(info.options, node_info) val store = build_context.store + val store_session = store.output_session(session_name, store_heap = store_heap) using_optional(store.maybe_open_database_server(server = server)) { database_server => @@ -467,15 +468,12 @@ /* output heap */ - val output_shasum = { - val heap = store.output_heap(session_name) - if (process_result.ok && store_heap && heap.is_file) { - val slice = Space.MiB(options.real("build_database_slice")) - val digest = ML_Heap.store(database_server, session_name, heap, slice) - SHA1.shasum(digest, session_name) + val output_shasum = + store_session.heap match { + case Some(path) if process_result.ok => + SHA1.shasum(ML_Heap.write_file_digest(path), session_name) + case _ => SHA1.no_shasum } - else SHA1.no_shasum - } val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test) @@ -516,6 +514,17 @@ true } + using_optional(store.maybe_open_heaps_database(database_server, server = server)) { + heaps_database => + for (db <- database_server orElse heaps_database) { + ML_Heap.clean_entry(db, session_name) + if (process_result.ok) { + val slice = Space.MiB(options.real("build_database_slice")) + ML_Heap.store(db, store_session, slice, progress = progress) + } + } + } + // messages process_result.err_lines.foreach(progress.echo(_)) diff -r df1059ea8846 -r 1fa1b32b0379 src/Pure/Build/build_process.scala --- a/src/Pure/Build/build_process.scala Wed Feb 21 11:43:30 2024 +0100 +++ b/src/Pure/Build/build_process.scala Wed Feb 21 19:36:53 2024 +0100 @@ -857,6 +857,10 @@ try { store.maybe_open_database_server(server = server) } catch { case exn: Throwable => close(); throw exn } + protected val _heaps_database: Option[SQL.Database] = + try { store.maybe_open_heaps_database(_database_server, server = server) } + catch { case exn: Throwable => close(); throw exn } + protected val _build_database: Option[SQL.Database] = try { for (db <- store.maybe_open_build_database(server = server)) yield { @@ -926,6 +930,7 @@ def close(): Unit = synchronized { Option(_database_server).flatten.foreach(_.close()) + Option(_heaps_database).flatten.foreach(_.close()) Option(_build_database).flatten.foreach(_.close()) Option(_host_database).foreach(_.close()) Option(_build_cluster).foreach(_.close()) @@ -1015,8 +1020,11 @@ val cancelled = progress.stopped || !ancestor_results.forall(_.ok) if (!skipped && !cancelled) { - val heaps = (session_name :: ancestor_results.map(_.name)).map(store.output_heap) - ML_Heap.restore(_database_server, heaps, cache = store.cache.compress) + val hierarchy = + (session_name :: ancestor_results.map(_.name)) + .map(store.output_session(_, store_heap = true)) + ML_Heap.restore(_database_server orElse _heaps_database, + hierarchy, cache = store.cache.compress) } val result_name = (session_name, worker_uuid, build_uuid) diff -r df1059ea8846 -r 1fa1b32b0379 src/Pure/Build/store.scala --- a/src/Pure/Build/store.scala Wed Feb 21 11:43:30 2024 +0100 +++ b/src/Pure/Build/store.scala Wed Feb 21 19:36:53 2024 +0100 @@ -190,6 +190,15 @@ uuid) }) + def read_build_uuid(db: SQL.Database, name: String): String = + db.execute_query_statementO[String]( + Session_Info.table.select(List(Session_Info.uuid), + sql = Session_Info.session_name.where_equal(name)), + { res => + try { Option(res.string(Session_Info.uuid)).getOrElse("") } + catch { case _: SQLException => "" } + }).getOrElse("") + def write_session_info( db: SQL.Database, cache: Compress.Cache, @@ -249,6 +258,10 @@ ) } } + + def read_build_uuid(path: Path, session: String): String = + try { using(SQLite.open_database(path))(private_data.read_build_uuid(_, session)) } + catch { case _: SQLException => "" } } class Store private( @@ -301,6 +314,12 @@ new Store.Session(name, heap, log_db, input_dirs) } + def output_session(name: String, store_heap: Boolean = false): Store.Session = { + val heap = if (store_heap) Some(output_heap(name)) else None + val log_db = if (!build_database_server) Some(output_log_db(name)) else None + new Store.Session(name, heap, log_db, List(output_dir)) + } + /* heap */ @@ -343,8 +362,20 @@ ssh_port = options.int("build_database_ssh_port"), ssh_user = options.string("build_database_ssh_user")) - def maybe_open_database_server(server: SSH.Server = SSH.no_server): Option[SQL.Database] = - if (build_database_server) Some(open_database_server(server = server)) else None + def maybe_open_database_server( + server: SSH.Server = SSH.no_server, + guard: Boolean = build_database_server + ): Option[SQL.Database] = { + if (guard) Some(open_database_server(server = server)) else None + } + + def maybe_open_heaps_database( + database_server: Option[SQL.Database], + server: SSH.Server = SSH.no_server + ): Option[SQL.Database] = { + if (database_server.isDefined) None + else store.maybe_open_database_server(server = server, guard = build_cluster) + } def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database = if (build_database_server || build_cluster) open_database_server(server = server) @@ -395,9 +426,7 @@ ): Option[Boolean] = { val relevant_db = database_server match { - case Some(db) => - ML_Heap.clean_entry(db, name) - clean_session_info(db, name) + case Some(db) => clean_session_info(db, name) case None => false } diff -r df1059ea8846 -r 1fa1b32b0379 src/Pure/ML/ml_heap.scala --- a/src/Pure/ML/ml_heap.scala Wed Feb 21 11:43:30 2024 +0100 +++ b/src/Pure/ML/ml_heap.scala Wed Feb 21 19:36:53 2024 +0100 @@ -43,6 +43,8 @@ /* SQL data model */ + sealed case class Log_DB(uuid: String, content: Bytes) + object private_data extends SQL.Data("isabelle_heaps") { override lazy val tables = SQL.Tables(Base.table, Slices.table) @@ -54,8 +56,10 @@ val name = Generic.name val size = SQL.Column.long("size") val digest = SQL.Column.string("digest") + val uuid = SQL.Column.string("uuid") + val log_db = SQL.Column.bytes("log_db") - val table = make_table(List(name, size, digest)) + val table = make_table(List(name, size, digest, uuid, log_db)) } object Slices { @@ -97,6 +101,20 @@ sql = Generic.name.where_equal(name) + SQL.order_by(List(Slices.slice))), List.from[Bytes], _.bytes(Slices.content)) + def read_log_db(db: SQL.Database, name: String, old_uuid: String = ""): Option[Log_DB] = + db.execute_query_statement( + Base.table.select(List(Base.uuid, Base.log_db), sql = + SQL.where_and( + Generic.name.equal(name), + if_proper(old_uuid, Base.uuid.ident + " <> " + SQL.string(old_uuid)))), + List.from[(String, Bytes)], + res => (res.string(Base.uuid), res.bytes(Base.log_db)) + ).collectFirst( + { + case (uuid, content) if uuid.nonEmpty && !content.is_empty => + Log_DB(uuid, content) + }) + def write_slice(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit = db.execute_statement(Slices.table.insert(), body = { stmt => @@ -118,15 +136,26 @@ stmt.string(1) = name stmt.long(2) = None stmt.string(3) = None + stmt.string(4) = None + stmt.bytes(5) = None }) - def finish_entry(db: SQL.Database, name: String, size: Long, digest: SHA1.Digest): Unit = + def finish_entry( + db: SQL.Database, + name: String, + size: Long, + opt_digest: Option[SHA1.Digest], + opt_log_db: Option[Log_DB] + ): Unit = db.execute_statement( - Base.table.update(List(Base.size, Base.digest), sql = Base.name.where_equal(name)), + Base.table.update(List(Base.size, Base.digest, Base.uuid, Base.log_db), + sql = Base.name.where_equal(name)), body = { stmt => stmt.long(1) = size - stmt.string(2) = digest.toString + stmt.string(2) = opt_digest.map(_.toString) + stmt.string(3) = opt_log_db.map(_.uuid) + stmt.bytes(4) = opt_log_db.map(_.content) }) } @@ -136,72 +165,97 @@ } def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] = - private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") { - private_data.read_digests(db, names) + if (names.isEmpty) Map.empty + else { + private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") { + private_data.read_digests(db, names) + } } def store( - database: Option[SQL.Database], - session_name: String, - heap: Path, + db: SQL.Database, + session: Store.Session, slice: Space, - cache: Compress.Cache = Compress.Cache.none - ): SHA1.Digest = { - val digest = write_file_digest(heap) - database match { - case None => - case Some(db) => - val size = File.size(heap) - sha1_prefix.length - SHA1.digest_length + cache: Compress.Cache = Compress.Cache.none, + progress: Progress = new Progress + ): Unit = { + val size = + session.heap match { + case Some(heap) => File.size(heap) - sha1_prefix.length - SHA1.digest_length + case None => 0L + } - val slice_size = slice.bytes max Space.MiB(1).bytes - val slices = (size.toDouble / slice_size.toDouble).ceil.toInt - val step = if (slices > 0) (size.toDouble / slices.toDouble).ceil.toLong else 0L + val slice_size = slice.bytes max Space.MiB(1).bytes + val slices = (size.toDouble / slice_size.toDouble).ceil.toInt + val step = if (slices > 0) (size.toDouble / slices.toDouble).ceil.toLong else 0L - try { - private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") { - private_data.init_entry(db, session_name) - } + try { + private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") { + private_data.init_entry(db, session.name) + } - for (i <- 0 until slices) { - val j = i + 1 - val offset = step * i - val limit = if (j < slices) step * j else size - val content = - Bytes.read_file(heap, offset = offset, limit = limit) - .compress(cache = cache) - private_data.transaction_lock(db, label = "ML_Heap.store2") { - private_data.write_slice(db, session_name, i, content) - } - } + if (slices > 0) progress.echo("Storing " + session.name + " ...") + for (i <- 0 until slices) { + val j = i + 1 + val offset = step * i + val limit = if (j < slices) step * j else size + val content = + Bytes.read_file(session.the_heap, offset = offset, limit = limit) + .compress(cache = cache) + private_data.transaction_lock(db, label = "ML_Heap.store2") { + private_data.write_slice(db, session.name, i, content) + } + } + + val opt_digest = + for { + path <- session.heap + digest <- read_file_digest(path) + } yield digest - private_data.transaction_lock(db, label = "ML_Heap.store3") { - private_data.finish_entry(db, session_name, size, digest) - } - } - catch { case exn: Throwable => - private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") { - private_data.clean_entry(db, session_name) - } - throw exn - } + val opt_log_db = + for { + path <- session.log_db + uuid <- proper_string(Store.read_build_uuid(path, session.name)) + } yield Log_DB(uuid, Bytes.read(path)) + + if (opt_log_db.isDefined) progress.echo("Storing " + session.name + ".db ...") + + private_data.transaction_lock(db, label = "ML_Heap.store3") { + private_data.finish_entry(db, session.name, size, opt_digest, opt_log_db) + } } - digest + catch { case exn: Throwable => + private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") { + private_data.clean_entry(db, session.name) + } + throw exn + } } def restore( database: Option[SQL.Database], - heaps: List[Path], - cache: Compress.Cache = Compress.Cache.none + sessions: List[Store.Session], + cache: Compress.Cache = Compress.Cache.none, + progress: Progress = new Progress ): Unit = { database match { - case Some(db) if heaps.nonEmpty => + case Some(db) if sessions.exists(_.defined) => private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") { - val db_digests = private_data.read_digests(db, heaps.map(_.file_name)) - for (heap <- heaps) { - val session_name = heap.file_name + /* heap */ + + val defined_heaps = + for (session <- sessions; heap <- session.heap) + yield session.name -> heap + + val db_digests = private_data.read_digests(db, defined_heaps.map(_._1)) + + for ((session_name, heap) <- defined_heaps) { val file_digest = read_file_digest(heap) val db_digest = db_digests.get(session_name) if (db_digest.isDefined && db_digest != file_digest) { + progress.echo("Restoring " + session_name + " ...") + val base_dir = Isabelle_System.make_directory(heap.expand.dir) Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp => Bytes.write(tmp, Bytes.empty) @@ -213,10 +267,25 @@ Isabelle_System.chmod("a+r", tmp) Isabelle_System.move_file(tmp, heap) } - else error("Incoherent content for session heap " + quote(session_name)) + else error("Incoherent content for session heap " + heap) } } } + + + /* log_db */ + + for (session <- sessions; path <- session.log_db) { + val file_uuid = Store.read_build_uuid(path, session.name) + private_data.read_log_db(db, session.name, old_uuid = file_uuid) match { + case Some(log_db) if file_uuid.isEmpty => + progress.echo("Restoring " + session.name + ".db ...") + Isabelle_System.make_directory(path.expand.dir) + Bytes.write(path, log_db.content) + case Some(_) => error("Incoherent content for session database " + path) + case None => + } + } } case _ => }