more robust (synchronous) management of Export.Entry: Future.fork happens inside the data structure;
tuned;
--- a/src/Pure/PIDE/session.scala Mon May 07 17:20:39 2018 +0200
+++ b/src/Pure/PIDE/session.scala Mon May 07 17:37:03 2018 +0200
@@ -191,7 +191,6 @@
private case object Stop
private case class Cancel_Exec(exec_id: Document_ID.Exec)
private case class Protocol_Command(name: String, args: List[String])
- private case class Add_Export(args: Markup.Export.Args, bytes: Bytes, output: Prover.Output)
private case class Update_Options(options: Options)
private case object Consolidate_Execution
private case object Prune_History
@@ -401,26 +400,23 @@
/* prover output */
- def bad_output(output: Prover.Output)
- {
- if (verbose)
- Output.warning("Ignoring bad prover output: " + output.message.toString)
- }
-
- def change_command(f: Document.State => (Command.State, Document.State), output: Prover.Output)
- {
- try {
- val st = global_state.change_result(f)
- change_buffer.invoke(false, List(st.command))
- }
- catch { case _: Document.State.Fail => bad_output(output) }
- }
-
def handle_output(output: Prover.Output)
//{{{
{
- def accumulate(state_id: Document_ID.Generic, message: XML.Elem): Unit =
- change_command(_.accumulate(state_id, message, xml_cache), output)
+ def bad_output()
+ {
+ if (verbose)
+ Output.warning("Ignoring bad prover output: " + output.message.toString)
+ }
+
+ def change_command(f: Document.State => (Command.State, Document.State))
+ {
+ try {
+ val st = global_state.change_result(f)
+ change_buffer.invoke(false, List(st.command))
+ }
+ catch { case _: Document.State.Fail => bad_output() }
+ }
output match {
case msg: Prover.Protocol_Output =>
@@ -432,17 +428,16 @@
case Protocol.Command_Timing(state_id, timing) if prover.defined =>
val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
- accumulate(state_id, xml_cache.elem(message))
+ change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache))
case Protocol.Theory_Timing(_, _) =>
// FIXME
case Markup.Export(args)
if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined =>
- if (args.compress) {
- Future.fork { manager.send(Add_Export(args, msg.bytes.compress(), output)) }
- }
- else manager.send(Add_Export(args, msg.bytes, output))
+ val id = Value.Long.unapply(args.id.get).get
+ val entry = (args.serial, Export.make_entry("", args, msg.bytes))
+ change_command(_.add_export(id, entry))
case Markup.Assign_Update =>
msg.text match {
@@ -452,8 +447,8 @@
change_buffer.invoke(true, cmds)
manager.send(Session.Change_Flush)
}
- catch { case _: Document.State.Fail => bad_output(output) }
- case _ => bad_output(output)
+ catch { case _: Document.State.Fail => bad_output() }
+ case _ => bad_output()
}
delay_prune.invoke()
@@ -464,8 +459,8 @@
global_state.change(_.removed_versions(removed))
manager.send(Session.Change_Flush)
}
- catch { case _: Document.State.Fail => bad_output(output) }
- case _ => bad_output(output)
+ catch { case _: Document.State.Fail => bad_output() }
+ case _ => bad_output()
}
case Markup.ML_Statistics(props) =>
@@ -474,13 +469,13 @@
case Markup.Task_Statistics(props) =>
// FIXME
- case _ => bad_output(output)
+ case _ => bad_output()
}
}
case _ =>
output.properties match {
case Position.Id(state_id) =>
- accumulate(state_id, output.message)
+ change_command(_.accumulate(state_id, output.message, xml_cache))
case _ if output.is_init =>
prover.get.options(session_options)
@@ -562,11 +557,6 @@
case Protocol_Command(name, args) if prover.defined =>
prover.get.protocol_command(name, args:_*)
- case Add_Export(args, bytes, output) =>
- val id = Value.Long.parse(args.id.get)
- val entry = (args.serial, Export.make_entry("", args, bytes))
- change_command(_.add_export(id, entry), output)
-
case change: Session.Change if prover.defined =>
val state = global_state.value
if (!state.removing_versions && state.is_assigned(change.previous))
--- a/src/Pure/Thy/export.scala Mon May 07 17:20:39 2018 +0200
+++ b/src/Pure/Thy/export.scala Mon May 07 17:37:03 2018 +0200
@@ -34,25 +34,27 @@
}
sealed case class Entry(
- session_name: String, theory_name: String, name: String, compressed: Boolean, body: Bytes)
+ session_name: String,
+ theory_name: String,
+ name: String,
+ compressed: Boolean,
+ body: Future[Bytes])
{
override def toString: String = theory_name + ":" + name
def message(msg: String): String =
msg + " " + quote(name) + " for theory " + quote(theory_name)
- lazy val compressed_body: Bytes = if (compressed) body else body.compress()
- lazy val uncompressed_body: Bytes = if (compressed) body.uncompress() else body
-
def write(db: SQL.Database)
{
+ val bytes = body.join
db.using_statement(Data.table.insert())(stmt =>
{
stmt.string(1) = session_name
stmt.string(2) = theory_name
stmt.string(3) = name
stmt.bool(4) = compressed
- stmt.bytes(5) = body
+ stmt.bytes(5) = bytes
stmt.execute()
})
}
@@ -60,8 +62,8 @@
def make_entry(session_name: String, args: Markup.Export.Args, body: Bytes): Entry =
{
- val bytes = if (args.compress) body.compress() else body
- Entry(session_name, args.theory_name, args.name, args.compress, bytes)
+ Entry(session_name, args.theory_name, args.name, args.compress,
+ if (args.compress) Future.fork(body.compress()) else Future.value(body))
}
def read_entry(db: SQL.Database, session_name: String, theory_name: String, name: String): Entry =
@@ -75,9 +77,9 @@
if (res.next()) {
val compressed = res.bool(Data.compressed)
val body = res.bytes(Data.body)
- Entry(session_name, theory_name, name, compressed, body)
+ Entry(session_name, theory_name, name, compressed, Future.value(body))
}
- else error(Entry(session_name, theory_name, name, false, Bytes.empty).message("Bad export"))
+ else error(Entry(session_name, theory_name, name, false, Future.value(Bytes.empty)).message("Bad export"))
})
}
@@ -93,10 +95,9 @@
private val export_errors = Synchronized[List[String]](Nil)
private val consumer =
- Consumer_Thread.fork(name = "export")(consume = (future: Future[Entry]) =>
+ Consumer_Thread.fork(name = "export")(consume = (entry: Entry) =>
{
- val entry = future.join
-
+ entry.body.join
db.transaction {
if (read_names(db, entry.session_name, entry.theory_name).contains(entry.name)) {
export_errors.change(errs => entry.message("Duplicate export") :: errs)
@@ -106,14 +107,8 @@
true
})
- def apply(session_name: String, args: Markup.Export.Args, body: Bytes)
- {
- consumer.send(
- if (args.compress)
- Future.fork(make_entry(session_name, args, body))
- else
- Future.value(make_entry(session_name, args, body)))
- }
+ def apply(session_name: String, args: Markup.Export.Args, body: Bytes): Unit =
+ consumer.send(make_entry(session_name, args, body))
def shutdown(close: Boolean = false): List[String] =
{