build local log_db, with store/restore via optional database server;
authorwenzelm
Wed, 21 Feb 2024 19:36:53 +0100
changeset 79682 1fa1b32b0379
parent 79681 df1059ea8846
child 79683 ade429ddb1fc
build local log_db, with store/restore via optional database server; tuned messages;
src/Pure/Build/build.scala
src/Pure/Build/build_benchmark.scala
src/Pure/Build/build_job.scala
src/Pure/Build/build_process.scala
src/Pure/Build/store.scala
src/Pure/ML/ml_heap.scala
--- a/src/Pure/Build/build.scala	Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/build.scala	Wed Feb 21 19:36:53 2024 +0100
@@ -115,7 +115,7 @@
 
     def build_options(options: Options, build_cluster: Boolean = false): Options = {
       val options1 = options + "completion_limit=0" + "editor_tracing_messages=0"
-      if (build_cluster) options1 + "build_database_server" + "build_database" else options1
+      if (build_cluster) options1 + "build_database" else options1
     }
 
     final def build_store(options: Options,
@@ -550,10 +550,8 @@
       var force = false
       var list_builds = false
       var options =
-        Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS :::
-          List(
-            Options.Spec.make("build_database_server"),
-            Options.Spec.make("build_database")))
+        Options.init(specs =
+          Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database")))
       var remove_builds = false
 
       val getopts = Getopts("""
@@ -653,10 +651,8 @@
       val dirs = new mutable.ListBuffer[Path]
       var max_jobs: Option[Int] = None
       var options =
-        Options.init(specs = Options.Spec.ISABELLE_BUILD_OPTIONS :::
-          List(
-            Options.Spec.make("build_database_server"),
-            Options.Spec.make("build_database")))
+        Options.init(specs =
+          Options.Spec.ISABELLE_BUILD_OPTIONS ::: List(Options.Spec.make("build_database")))
       var quiet = false
       var verbose = false
 
--- a/src/Pure/Build/build_benchmark.scala	Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/build_benchmark.scala	Wed Feb 21 19:36:53 2024 +0100
@@ -51,13 +51,13 @@
         val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress)
         val session = sessions(benchmark_session)
 
-        val heaps = session.ancestors.map(store.output_heap)
-        ML_Heap.restore(database_server, heaps, cache = store.cache.compress)
+        val hierachy = session.ancestors.map(store.output_session(_, store_heap = true))
+        ML_Heap.restore(database_server, hierachy, cache = store.cache.compress)
 
         val local_options = options + "build_database_server=false" + "build_database=false"
 
         benchmark_requirements(local_options, progress)
-        ML_Heap.restore(database_server, heaps, cache = store.cache.compress)
+        ML_Heap.restore(database_server, hierachy, cache = store.cache.compress)
 
         def get_shasum(session_name: String): SHA1.Shasum = {
           val ancestor_shasums = sessions(session_name).ancestors.map(get_shasum)
--- a/src/Pure/Build/build_job.scala	Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/build_job.scala	Wed Feb 21 19:36:53 2024 +0100
@@ -120,6 +120,7 @@
         val options = Host.node_options(info.options, node_info)
 
         val store = build_context.store
+        val store_session = store.output_session(session_name, store_heap = store_heap)
 
         using_optional(store.maybe_open_database_server(server = server)) { database_server =>
 
@@ -467,15 +468,12 @@
 
           /* output heap */
 
-          val output_shasum = {
-            val heap = store.output_heap(session_name)
-            if (process_result.ok && store_heap && heap.is_file) {
-              val slice = Space.MiB(options.real("build_database_slice"))
-              val digest = ML_Heap.store(database_server, session_name, heap, slice)
-              SHA1.shasum(digest, session_name)
+          val output_shasum =
+            store_session.heap match {
+              case Some(path) if process_result.ok =>
+                SHA1.shasum(ML_Heap.write_file_digest(path), session_name)
+              case _ => SHA1.no_shasum
             }
-            else SHA1.no_shasum
-          }
 
           val log_lines = process_result.out_lines.filterNot(Protocol_Message.Marker.test)
 
@@ -516,6 +514,17 @@
               true
             }
 
+          using_optional(store.maybe_open_heaps_database(database_server, server = server)) {
+            heaps_database =>
+              for (db <- database_server orElse heaps_database) {
+                ML_Heap.clean_entry(db, session_name)
+                if (process_result.ok) {
+                  val slice = Space.MiB(options.real("build_database_slice"))
+                  ML_Heap.store(db, store_session, slice, progress = progress)
+                }
+              }
+          }
+
           // messages
           process_result.err_lines.foreach(progress.echo(_))
 
--- a/src/Pure/Build/build_process.scala	Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/build_process.scala	Wed Feb 21 19:36:53 2024 +0100
@@ -857,6 +857,10 @@
     try { store.maybe_open_database_server(server = server) }
     catch { case exn: Throwable => close(); throw exn }
 
+  protected val _heaps_database: Option[SQL.Database] =
+    try { store.maybe_open_heaps_database(_database_server, server = server) }
+    catch { case exn: Throwable => close(); throw exn }
+
   protected val _build_database: Option[SQL.Database] =
     try {
       for (db <- store.maybe_open_build_database(server = server)) yield {
@@ -926,6 +930,7 @@
 
   def close(): Unit = synchronized {
     Option(_database_server).flatten.foreach(_.close())
+    Option(_heaps_database).flatten.foreach(_.close())
     Option(_build_database).flatten.foreach(_.close())
     Option(_host_database).foreach(_.close())
     Option(_build_cluster).foreach(_.close())
@@ -1015,8 +1020,11 @@
     val cancelled = progress.stopped || !ancestor_results.forall(_.ok)
 
     if (!skipped && !cancelled) {
-      val heaps = (session_name :: ancestor_results.map(_.name)).map(store.output_heap)
-      ML_Heap.restore(_database_server, heaps, cache = store.cache.compress)
+      val hierarchy =
+        (session_name :: ancestor_results.map(_.name))
+          .map(store.output_session(_, store_heap = true))
+      ML_Heap.restore(_database_server orElse _heaps_database,
+        hierarchy, cache = store.cache.compress)
     }
 
     val result_name = (session_name, worker_uuid, build_uuid)
--- a/src/Pure/Build/store.scala	Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/Build/store.scala	Wed Feb 21 19:36:53 2024 +0100
@@ -190,6 +190,15 @@
             uuid)
         })
 
+    def read_build_uuid(db: SQL.Database, name: String): String =
+      db.execute_query_statementO[String](
+        Session_Info.table.select(List(Session_Info.uuid),
+          sql = Session_Info.session_name.where_equal(name)),
+        { res =>
+            try { Option(res.string(Session_Info.uuid)).getOrElse("") }
+            catch { case _: SQLException => "" }
+        }).getOrElse("")
+
     def write_session_info(
       db: SQL.Database,
       cache: Compress.Cache,
@@ -249,6 +258,10 @@
       )
     }
   }
+
+  def read_build_uuid(path: Path, session: String): String =
+    try { using(SQLite.open_database(path))(private_data.read_build_uuid(_, session)) }
+    catch { case _: SQLException => "" }
 }
 
 class Store private(
@@ -301,6 +314,12 @@
     new Store.Session(name, heap, log_db, input_dirs)
   }
 
+  def output_session(name: String, store_heap: Boolean = false): Store.Session = {
+    val heap = if (store_heap) Some(output_heap(name)) else None
+    val log_db = if (!build_database_server) Some(output_log_db(name)) else None
+    new Store.Session(name, heap, log_db, List(output_dir))
+  }
+
 
   /* heap */
 
@@ -343,8 +362,20 @@
       ssh_port = options.int("build_database_ssh_port"),
       ssh_user = options.string("build_database_ssh_user"))
 
-  def maybe_open_database_server(server: SSH.Server = SSH.no_server): Option[SQL.Database] =
-    if (build_database_server) Some(open_database_server(server = server)) else None
+  def maybe_open_database_server(
+    server: SSH.Server = SSH.no_server,
+    guard: Boolean = build_database_server
+  ): Option[SQL.Database] = {
+    if (guard) Some(open_database_server(server = server)) else None
+  }
+
+  def maybe_open_heaps_database(
+    database_server: Option[SQL.Database],
+    server: SSH.Server = SSH.no_server
+  ): Option[SQL.Database] = {
+    if (database_server.isDefined) None
+    else store.maybe_open_database_server(server = server, guard = build_cluster)
+  }
 
   def open_build_database(path: Path, server: SSH.Server = SSH.no_server): SQL.Database =
     if (build_database_server || build_cluster) open_database_server(server = server)
@@ -395,9 +426,7 @@
   ): Option[Boolean] = {
     val relevant_db =
       database_server match {
-        case Some(db) =>
-          ML_Heap.clean_entry(db, name)
-          clean_session_info(db, name)
+        case Some(db) => clean_session_info(db, name)
         case None => false
       }
 
--- a/src/Pure/ML/ml_heap.scala	Wed Feb 21 11:43:30 2024 +0100
+++ b/src/Pure/ML/ml_heap.scala	Wed Feb 21 19:36:53 2024 +0100
@@ -43,6 +43,8 @@
 
   /* SQL data model */
 
+  sealed case class Log_DB(uuid: String, content: Bytes)
+
   object private_data extends SQL.Data("isabelle_heaps") {
     override lazy val tables = SQL.Tables(Base.table, Slices.table)
 
@@ -54,8 +56,10 @@
       val name = Generic.name
       val size = SQL.Column.long("size")
       val digest = SQL.Column.string("digest")
+      val uuid = SQL.Column.string("uuid")
+      val log_db = SQL.Column.bytes("log_db")
 
-      val table = make_table(List(name, size, digest))
+      val table = make_table(List(name, size, digest, uuid, log_db))
     }
 
     object Slices {
@@ -97,6 +101,20 @@
           sql = Generic.name.where_equal(name) + SQL.order_by(List(Slices.slice))),
         List.from[Bytes], _.bytes(Slices.content))
 
+    def read_log_db(db: SQL.Database, name: String, old_uuid: String = ""): Option[Log_DB] =
+      db.execute_query_statement(
+        Base.table.select(List(Base.uuid, Base.log_db), sql =
+          SQL.where_and(
+            Generic.name.equal(name),
+            if_proper(old_uuid, Base.uuid.ident + " <> " + SQL.string(old_uuid)))),
+        List.from[(String, Bytes)],
+        res => (res.string(Base.uuid), res.bytes(Base.log_db))
+      ).collectFirst(
+        {
+          case (uuid, content) if uuid.nonEmpty && !content.is_empty =>
+            Log_DB(uuid, content)
+        })
+
     def write_slice(db: SQL.Database, name: String, slice: Int, content: Bytes): Unit =
       db.execute_statement(Slices.table.insert(), body =
       { stmt =>
@@ -118,15 +136,26 @@
           stmt.string(1) = name
           stmt.long(2) = None
           stmt.string(3) = None
+          stmt.string(4) = None
+          stmt.bytes(5) = None
         })
 
-    def finish_entry(db: SQL.Database, name: String, size: Long, digest: SHA1.Digest): Unit =
+    def finish_entry(
+      db: SQL.Database,
+      name: String,
+      size: Long,
+      opt_digest: Option[SHA1.Digest],
+      opt_log_db: Option[Log_DB]
+    ): Unit =
       db.execute_statement(
-        Base.table.update(List(Base.size, Base.digest), sql = Base.name.where_equal(name)),
+        Base.table.update(List(Base.size, Base.digest, Base.uuid, Base.log_db),
+          sql = Base.name.where_equal(name)),
         body =
           { stmt =>
             stmt.long(1) = size
-            stmt.string(2) = digest.toString
+            stmt.string(2) = opt_digest.map(_.toString)
+            stmt.string(3) = opt_log_db.map(_.uuid)
+            stmt.bytes(4) = opt_log_db.map(_.content)
           })
   }
 
@@ -136,72 +165,97 @@
     }
 
   def read_digests(db: SQL.Database, names: Iterable[String]): Map[String, SHA1.Digest] =
-    private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") {
-      private_data.read_digests(db, names)
+    if (names.isEmpty) Map.empty
+    else {
+      private_data.transaction_lock(db, create = true, label = "ML_Heap.read_digests") {
+        private_data.read_digests(db, names)
+      }
     }
 
   def store(
-    database: Option[SQL.Database],
-    session_name: String,
-    heap: Path,
+    db: SQL.Database,
+    session: Store.Session,
     slice: Space,
-    cache: Compress.Cache = Compress.Cache.none
-  ): SHA1.Digest = {
-    val digest = write_file_digest(heap)
-    database match {
-      case None =>
-      case Some(db) =>
-        val size = File.size(heap) - sha1_prefix.length - SHA1.digest_length
+    cache: Compress.Cache = Compress.Cache.none,
+    progress: Progress = new Progress
+  ): Unit = {
+    val size =
+      session.heap match {
+        case Some(heap) => File.size(heap) - sha1_prefix.length - SHA1.digest_length
+        case None => 0L
+      }
 
-        val slice_size = slice.bytes max Space.MiB(1).bytes
-        val slices = (size.toDouble / slice_size.toDouble).ceil.toInt
-        val step = if (slices > 0) (size.toDouble / slices.toDouble).ceil.toLong else 0L
+    val slice_size = slice.bytes max Space.MiB(1).bytes
+    val slices = (size.toDouble / slice_size.toDouble).ceil.toInt
+    val step = if (slices > 0) (size.toDouble / slices.toDouble).ceil.toLong else 0L
 
-        try {
-          private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") {
-            private_data.init_entry(db, session_name)
-          }
+    try {
+      private_data.transaction_lock(db, create = true, label = "ML_Heap.store1") {
+        private_data.init_entry(db, session.name)
+      }
 
-          for (i <- 0 until slices) {
-            val j = i + 1
-            val offset = step * i
-            val limit = if (j < slices) step * j else size
-            val content =
-              Bytes.read_file(heap, offset = offset, limit = limit)
-                .compress(cache = cache)
-            private_data.transaction_lock(db, label = "ML_Heap.store2") {
-              private_data.write_slice(db, session_name, i, content)
-            }
-          }
+      if (slices > 0) progress.echo("Storing " + session.name + " ...")
+      for (i <- 0 until slices) {
+        val j = i + 1
+        val offset = step * i
+        val limit = if (j < slices) step * j else size
+        val content =
+          Bytes.read_file(session.the_heap, offset = offset, limit = limit)
+            .compress(cache = cache)
+        private_data.transaction_lock(db, label = "ML_Heap.store2") {
+          private_data.write_slice(db, session.name, i, content)
+        }
+      }
+
+      val opt_digest =
+        for {
+          path <- session.heap
+          digest <- read_file_digest(path)
+        } yield digest
 
-          private_data.transaction_lock(db, label = "ML_Heap.store3") {
-            private_data.finish_entry(db, session_name, size, digest)
-          }
-        }
-        catch { case exn: Throwable =>
-          private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") {
-            private_data.clean_entry(db, session_name)
-          }
-          throw exn
-        }
+      val opt_log_db =
+        for {
+          path <- session.log_db
+          uuid <- proper_string(Store.read_build_uuid(path, session.name))
+        } yield Log_DB(uuid, Bytes.read(path))
+
+      if (opt_log_db.isDefined) progress.echo("Storing " + session.name + ".db ...")
+
+      private_data.transaction_lock(db, label = "ML_Heap.store3") {
+        private_data.finish_entry(db, session.name, size, opt_digest, opt_log_db)
+      }
     }
-    digest
+    catch { case exn: Throwable =>
+      private_data.transaction_lock(db, create = true, label = "ML_Heap.store4") {
+        private_data.clean_entry(db, session.name)
+      }
+      throw exn
+    }
   }
 
   def restore(
     database: Option[SQL.Database],
-    heaps: List[Path],
-    cache: Compress.Cache = Compress.Cache.none
+    sessions: List[Store.Session],
+    cache: Compress.Cache = Compress.Cache.none,
+    progress: Progress = new Progress
   ): Unit = {
     database match {
-      case Some(db) if heaps.nonEmpty =>
+      case Some(db) if sessions.exists(_.defined) =>
         private_data.transaction_lock(db, create = true, label = "ML_Heap.restore") {
-          val db_digests = private_data.read_digests(db, heaps.map(_.file_name))
-          for (heap <- heaps) {
-            val session_name = heap.file_name
+          /* heap */
+
+          val defined_heaps =
+            for (session <- sessions; heap <- session.heap)
+              yield session.name -> heap
+
+          val db_digests = private_data.read_digests(db, defined_heaps.map(_._1))
+
+          for ((session_name, heap) <- defined_heaps) {
             val file_digest = read_file_digest(heap)
             val db_digest = db_digests.get(session_name)
             if (db_digest.isDefined && db_digest != file_digest) {
+              progress.echo("Restoring " + session_name + " ...")
+
               val base_dir = Isabelle_System.make_directory(heap.expand.dir)
               Isabelle_System.with_tmp_file(session_name + "_", base_dir = base_dir.file) { tmp =>
                 Bytes.write(tmp, Bytes.empty)
@@ -213,10 +267,25 @@
                   Isabelle_System.chmod("a+r", tmp)
                   Isabelle_System.move_file(tmp, heap)
                 }
-                else error("Incoherent content for session heap " + quote(session_name))
+                else error("Incoherent content for session heap " + heap)
               }
             }
           }
+
+
+          /* log_db */
+
+          for (session <- sessions; path <- session.log_db) {
+            val file_uuid = Store.read_build_uuid(path, session.name)
+            private_data.read_log_db(db, session.name, old_uuid = file_uuid) match {
+              case Some(log_db) if file_uuid.isEmpty =>
+                progress.echo("Restoring " + session.name + ".db ...")
+                Isabelle_System.make_directory(path.expand.dir)
+                Bytes.write(path, log_db.content)
+              case Some(_) => error("Incoherent content for session database " + path)
+              case None =>
+            }
+          }
         }
       case _ =>
     }