store exports in session database, with asynchronous / parallel compression;
authorwenzelm
Sun, 06 May 2018 23:03:08 +0200
changeset 68092 888d35a19866
parent 68091 0c7820590236
child 68093 b98c5877b0f3
store exports in session database, with asynchronous / parallel compression;
src/Pure/Thy/export.scala
src/Pure/Tools/build.scala
src/Pure/build-jars
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Thy/export.scala	Sun May 06 23:03:08 2018 +0200
@@ -0,0 +1,119 @@
+/*  Title:      Pure/Thy/export.scala
+    Author:     Makarius
+
+Manage theory exports.
+*/
+
+package isabelle
+
+object Export
+{
+  /* SQL data model */
+
+  object Data
+  {
+    val session_name = SQL.Column.string("session_name").make_primary_key
+    val theory_name = SQL.Column.string("theory_name").make_primary_key
+    val name = SQL.Column.string("name").make_primary_key
+    val compressed = SQL.Column.bool("compressed")
+    val body = SQL.Column.bytes("body")
+
+    val table =
+      SQL.Table("isabelle_exports", List(session_name, theory_name, name, compressed, body))
+
+    def where_equal(session_name: String, theory_name: String): SQL.Source =
+      "WHERE " + Data.session_name.equal(session_name) +
+      " AND " + Data.theory_name.equal(theory_name)
+  }
+
+  def read_names(db: SQL.Database, session_name: String, theory_name: String): List[String] =
+  {
+    val select = Data.table.select(List(Data.name), Data.where_equal(session_name, theory_name))
+    db.using_statement(select)(stmt =>
+      stmt.execute_query().iterator(res => res.string(Data.name)).toList)
+  }
+
+  sealed case class Entry(
+    session_name: String, theory_name: String, name: String, compressed: Boolean, body: Bytes)
+  {
+    override def toString: String = theory_name + ":" + name
+
+    def message(msg: String): String =
+      msg + " " + quote(name) + " for theory " + quote(theory_name)
+
+    lazy val compressed_body: Bytes = if (compressed) body else body.compress()
+    lazy val uncompressed_body: Bytes = if (compressed) body.uncompress() else body
+
+    def write(db: SQL.Database)
+    {
+      db.using_statement(Data.table.insert())(stmt =>
+      {
+        stmt.string(1) = session_name
+        stmt.string(2) = theory_name
+        stmt.string(3) = name
+        stmt.bool(4) = compressed
+        stmt.bytes(5) = body
+        stmt.execute()
+      })
+    }
+  }
+
+  def read_entry(db: SQL.Database, session_name: String, theory_name: String, name: String): Entry =
+  {
+    val select =
+      Data.table.select(List(Data.compressed, Data.body),
+        Data.where_equal(session_name, theory_name) + " AND " + Data.name.equal(name))
+    db.using_statement(select)(stmt =>
+    {
+      val res = stmt.execute_query()
+      if (res.next()) {
+        val compressed = res.bool(Data.compressed)
+        val body = res.bytes(Data.body)
+        Entry(session_name, theory_name, name, compressed, body)
+      }
+      else error(Entry(session_name, theory_name, name, false, Bytes.empty).message("Bad export"))
+    })
+  }
+
+
+  /* database consumer thread */
+
+  def consumer(db: SQL.Database): Consumer = new Consumer(db)
+
+  class Consumer private[Export](db: SQL.Database)
+  {
+    db.create_table(Data.table)
+
+    private val export_errors = Synchronized[List[String]](Nil)
+
+    private val consumer =
+      Consumer_Thread.fork(name = "export")(consume = (future: Future[Entry]) =>
+        {
+          val entry = future.join
+
+          db.transaction {
+            if (read_names(db, entry.session_name, entry.theory_name).contains(entry.name)) {
+              export_errors.change(errs => entry.message("Duplicate export") :: errs)
+            }
+            else entry.write(db)
+          }
+          true
+        })
+
+    def apply(session_name: String, args: Markup.Export.Args, body: Bytes)
+    {
+      consumer.send(
+        if (args.compress)
+          Future.fork(Entry(session_name, args.theory_name, args.name, true, body.compress()))
+        else
+          Future.value(Entry(session_name, args.theory_name, args.name, false, body)))
+    }
+
+    def shutdown(close: Boolean = false): List[String] =
+    {
+      consumer.shutdown()
+      if (close) db.close()
+      export_errors.value.reverse
+    }
+  }
+}
--- a/src/Pure/Tools/build.scala	Sun May 06 23:01:45 2018 +0200
+++ b/src/Pure/Tools/build.scala	Sun May 06 23:03:08 2018 +0200
@@ -195,6 +195,8 @@
     private val graph_file = Isabelle_System.tmp_file("session_graph", "pdf")
     isabelle.graphview.Graph_File.write(options, graph_file, deps(name).session_graph_display)
 
+    private val export_consumer = Export.consumer(SQLite.open_database(store.output_database(name)))
+
     private val future_result: Future[Process_Result] =
       Future.thread("build") {
         val parent = info.parent.getOrElse("")
@@ -286,7 +288,7 @@
                     text <- Library.try_unprefix("\fexport = ", line)
                     (args, body) <-
                       Markup.Export.dest_inline(XML.Decode.properties(YXML.parse_body(text)))
-                  } { } // FIXME
+                  } { export_consumer(name, args, body) }
               },
             progress_limit =
               options.int("process_output_limit") match {
@@ -310,8 +312,14 @@
     def join: Process_Result =
     {
       val result = future_result.join
+      val export_result =
+        export_consumer.shutdown(close = true).map(Output.error_message_text(_)) match {
+          case Nil => result
+          case errs if result.ok => result.copy(rc = 1).errors(errs)
+          case errs => result.errors(errs)
+        }
 
-      if (result.ok)
+      if (export_result.ok)
         Present.finish(progress, store.browser_info, graph_file, info, name)
 
       graph_file.delete
@@ -322,11 +330,11 @@
           case Some(request) => !request.cancel
         }
 
-      if (result.interrupted) {
-        if (was_timeout) result.error(Output.error_message_text("Timeout")).was_timeout
-        else result.error(Output.error_message_text("Interrupt"))
+      if (export_result.interrupted) {
+        if (was_timeout) export_result.error(Output.error_message_text("Timeout")).was_timeout
+        else export_result.error(Output.error_message_text("Interrupt"))
       }
-      else result
+      else export_result
     }
   }
 
--- a/src/Pure/build-jars	Sun May 06 23:01:45 2018 +0200
+++ b/src/Pure/build-jars	Sun May 06 23:03:08 2018 +0200
@@ -129,6 +129,7 @@
   System/system_channel.scala
   System/tty_loop.scala
   Thy/bibtex.scala
+  Thy/export.scala
   Thy/html.scala
   Thy/latex.scala
   Thy/present.scala