# HG changeset patch # User wenzelm # Date 1525640588 -7200 # Node ID 888d35a198664958a65323638babb05a531283c7 # Parent 0c7820590236240f24aa6bf9c0e9d15ed9ca0594 store exports in session database, with asynchronous / parallel compression; diff -r 0c7820590236 -r 888d35a19866 src/Pure/Thy/export.scala --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Pure/Thy/export.scala Sun May 06 23:03:08 2018 +0200 @@ -0,0 +1,119 @@ +/* Title: Pure/Thy/export.scala + Author: Makarius + +Manage theory exports. +*/ + +package isabelle + +object Export +{ + /* SQL data model */ + + object Data + { + val session_name = SQL.Column.string("session_name").make_primary_key + val theory_name = SQL.Column.string("theory_name").make_primary_key + val name = SQL.Column.string("name").make_primary_key + val compressed = SQL.Column.bool("compressed") + val body = SQL.Column.bytes("body") + + val table = + SQL.Table("isabelle_exports", List(session_name, theory_name, name, compressed, body)) + + def where_equal(session_name: String, theory_name: String): SQL.Source = + "WHERE " + Data.session_name.equal(session_name) + + " AND " + Data.theory_name.equal(theory_name) + } + + def read_names(db: SQL.Database, session_name: String, theory_name: String): List[String] = + { + val select = Data.table.select(List(Data.name), Data.where_equal(session_name, theory_name)) + db.using_statement(select)(stmt => + stmt.execute_query().iterator(res => res.string(Data.name)).toList) + } + + sealed case class Entry( + session_name: String, theory_name: String, name: String, compressed: Boolean, body: Bytes) + { + override def toString: String = theory_name + ":" + name + + def message(msg: String): String = + msg + " " + quote(name) + " for theory " + quote(theory_name) + + lazy val compressed_body: Bytes = if (compressed) body else body.compress() + lazy val uncompressed_body: Bytes = if (compressed) body.uncompress() else body + + def write(db: SQL.Database) + { + db.using_statement(Data.table.insert())(stmt => + { + stmt.string(1) = session_name + stmt.string(2) = theory_name + stmt.string(3) = name + stmt.bool(4) = compressed + stmt.bytes(5) = body + stmt.execute() + }) + } + } + + def read_entry(db: SQL.Database, session_name: String, theory_name: String, name: String): Entry = + { + val select = + Data.table.select(List(Data.compressed, Data.body), + Data.where_equal(session_name, theory_name) + " AND " + Data.name.equal(name)) + db.using_statement(select)(stmt => + { + val res = stmt.execute_query() + if (res.next()) { + val compressed = res.bool(Data.compressed) + val body = res.bytes(Data.body) + Entry(session_name, theory_name, name, compressed, body) + } + else error(Entry(session_name, theory_name, name, false, Bytes.empty).message("Bad export")) + }) + } + + + /* database consumer thread */ + + def consumer(db: SQL.Database): Consumer = new Consumer(db) + + class Consumer private[Export](db: SQL.Database) + { + db.create_table(Data.table) + + private val export_errors = Synchronized[List[String]](Nil) + + private val consumer = + Consumer_Thread.fork(name = "export")(consume = (future: Future[Entry]) => + { + val entry = future.join + + db.transaction { + if (read_names(db, entry.session_name, entry.theory_name).contains(entry.name)) { + export_errors.change(errs => entry.message("Duplicate export") :: errs) + } + else entry.write(db) + } + true + }) + + def apply(session_name: String, args: Markup.Export.Args, body: Bytes) + { + consumer.send( + if (args.compress) + Future.fork(Entry(session_name, args.theory_name, args.name, true, body.compress())) + else + Future.value(Entry(session_name, args.theory_name, args.name, false, body))) + } + + def shutdown(close: Boolean = false): List[String] = + { + consumer.shutdown() + if (close) db.close() + export_errors.value.reverse + } + } +} diff -r 0c7820590236 -r 888d35a19866 src/Pure/Tools/build.scala --- a/src/Pure/Tools/build.scala Sun May 06 23:01:45 2018 +0200 +++ b/src/Pure/Tools/build.scala Sun May 06 23:03:08 2018 +0200 @@ -195,6 +195,8 @@ private val graph_file = Isabelle_System.tmp_file("session_graph", "pdf") isabelle.graphview.Graph_File.write(options, graph_file, deps(name).session_graph_display) + private val export_consumer = Export.consumer(SQLite.open_database(store.output_database(name))) + private val future_result: Future[Process_Result] = Future.thread("build") { val parent = info.parent.getOrElse("") @@ -286,7 +288,7 @@ text <- Library.try_unprefix("\fexport = ", line) (args, body) <- Markup.Export.dest_inline(XML.Decode.properties(YXML.parse_body(text))) - } { } // FIXME + } { export_consumer(name, args, body) } }, progress_limit = options.int("process_output_limit") match { @@ -310,8 +312,14 @@ def join: Process_Result = { val result = future_result.join + val export_result = + export_consumer.shutdown(close = true).map(Output.error_message_text(_)) match { + case Nil => result + case errs if result.ok => result.copy(rc = 1).errors(errs) + case errs => result.errors(errs) + } - if (result.ok) + if (export_result.ok) Present.finish(progress, store.browser_info, graph_file, info, name) graph_file.delete @@ -322,11 +330,11 @@ case Some(request) => !request.cancel } - if (result.interrupted) { - if (was_timeout) result.error(Output.error_message_text("Timeout")).was_timeout - else result.error(Output.error_message_text("Interrupt")) + if (export_result.interrupted) { + if (was_timeout) export_result.error(Output.error_message_text("Timeout")).was_timeout + else export_result.error(Output.error_message_text("Interrupt")) } - else result + else export_result } } diff -r 0c7820590236 -r 888d35a19866 src/Pure/build-jars --- a/src/Pure/build-jars Sun May 06 23:01:45 2018 +0200 +++ b/src/Pure/build-jars Sun May 06 23:03:08 2018 +0200 @@ -129,6 +129,7 @@ System/system_channel.scala System/tty_loop.scala Thy/bibtex.scala + Thy/export.scala Thy/html.scala Thy/latex.scala Thy/present.scala