# HG changeset patch # User wenzelm # Date 1525707423 -7200 # Node ID c5764b8b2a87ef56d679785dfc46403402fe4efd # Parent 813b5d0904c66ce96cfcb463daa20fc89b85f821 more robust (synchronous) management of Export.Entry: Future.fork happens inside the data structure; tuned; diff -r 813b5d0904c6 -r c5764b8b2a87 src/Pure/PIDE/session.scala --- a/src/Pure/PIDE/session.scala Mon May 07 17:20:39 2018 +0200 +++ b/src/Pure/PIDE/session.scala Mon May 07 17:37:03 2018 +0200 @@ -191,7 +191,6 @@ private case object Stop private case class Cancel_Exec(exec_id: Document_ID.Exec) private case class Protocol_Command(name: String, args: List[String]) - private case class Add_Export(args: Markup.Export.Args, bytes: Bytes, output: Prover.Output) private case class Update_Options(options: Options) private case object Consolidate_Execution private case object Prune_History @@ -401,26 +400,23 @@ /* prover output */ - def bad_output(output: Prover.Output) - { - if (verbose) - Output.warning("Ignoring bad prover output: " + output.message.toString) - } - - def change_command(f: Document.State => (Command.State, Document.State), output: Prover.Output) - { - try { - val st = global_state.change_result(f) - change_buffer.invoke(false, List(st.command)) - } - catch { case _: Document.State.Fail => bad_output(output) } - } - def handle_output(output: Prover.Output) //{{{ { - def accumulate(state_id: Document_ID.Generic, message: XML.Elem): Unit = - change_command(_.accumulate(state_id, message, xml_cache), output) + def bad_output() + { + if (verbose) + Output.warning("Ignoring bad prover output: " + output.message.toString) + } + + def change_command(f: Document.State => (Command.State, Document.State)) + { + try { + val st = global_state.change_result(f) + change_buffer.invoke(false, List(st.command)) + } + catch { case _: Document.State.Fail => bad_output() } + } output match { case msg: Prover.Protocol_Output => @@ -432,17 +428,16 @@ case Protocol.Command_Timing(state_id, timing) if prover.defined => val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))) - accumulate(state_id, xml_cache.elem(message)) + change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache)) case Protocol.Theory_Timing(_, _) => // FIXME case Markup.Export(args) if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined => - if (args.compress) { - Future.fork { manager.send(Add_Export(args, msg.bytes.compress(), output)) } - } - else manager.send(Add_Export(args, msg.bytes, output)) + val id = Value.Long.unapply(args.id.get).get + val entry = (args.serial, Export.make_entry("", args, msg.bytes)) + change_command(_.add_export(id, entry)) case Markup.Assign_Update => msg.text match { @@ -452,8 +447,8 @@ change_buffer.invoke(true, cmds) manager.send(Session.Change_Flush) } - catch { case _: Document.State.Fail => bad_output(output) } - case _ => bad_output(output) + catch { case _: Document.State.Fail => bad_output() } + case _ => bad_output() } delay_prune.invoke() @@ -464,8 +459,8 @@ global_state.change(_.removed_versions(removed)) manager.send(Session.Change_Flush) } - catch { case _: Document.State.Fail => bad_output(output) } - case _ => bad_output(output) + catch { case _: Document.State.Fail => bad_output() } + case _ => bad_output() } case Markup.ML_Statistics(props) => @@ -474,13 +469,13 @@ case Markup.Task_Statistics(props) => // FIXME - case _ => bad_output(output) + case _ => bad_output() } } case _ => output.properties match { case Position.Id(state_id) => - accumulate(state_id, output.message) + change_command(_.accumulate(state_id, output.message, xml_cache)) case _ if output.is_init => prover.get.options(session_options) @@ -562,11 +557,6 @@ case Protocol_Command(name, args) if prover.defined => prover.get.protocol_command(name, args:_*) - case Add_Export(args, bytes, output) => - val id = Value.Long.parse(args.id.get) - val entry = (args.serial, Export.make_entry("", args, bytes)) - change_command(_.add_export(id, entry), output) - case change: Session.Change if prover.defined => val state = global_state.value if (!state.removing_versions && state.is_assigned(change.previous)) diff -r 813b5d0904c6 -r c5764b8b2a87 src/Pure/Thy/export.scala --- a/src/Pure/Thy/export.scala Mon May 07 17:20:39 2018 +0200 +++ b/src/Pure/Thy/export.scala Mon May 07 17:37:03 2018 +0200 @@ -34,25 +34,27 @@ } sealed case class Entry( - session_name: String, theory_name: String, name: String, compressed: Boolean, body: Bytes) + session_name: String, + theory_name: String, + name: String, + compressed: Boolean, + body: Future[Bytes]) { override def toString: String = theory_name + ":" + name def message(msg: String): String = msg + " " + quote(name) + " for theory " + quote(theory_name) - lazy val compressed_body: Bytes = if (compressed) body else body.compress() - lazy val uncompressed_body: Bytes = if (compressed) body.uncompress() else body - def write(db: SQL.Database) { + val bytes = body.join db.using_statement(Data.table.insert())(stmt => { stmt.string(1) = session_name stmt.string(2) = theory_name stmt.string(3) = name stmt.bool(4) = compressed - stmt.bytes(5) = body + stmt.bytes(5) = bytes stmt.execute() }) } @@ -60,8 +62,8 @@ def make_entry(session_name: String, args: Markup.Export.Args, body: Bytes): Entry = { - val bytes = if (args.compress) body.compress() else body - Entry(session_name, args.theory_name, args.name, args.compress, bytes) + Entry(session_name, args.theory_name, args.name, args.compress, + if (args.compress) Future.fork(body.compress()) else Future.value(body)) } def read_entry(db: SQL.Database, session_name: String, theory_name: String, name: String): Entry = @@ -75,9 +77,9 @@ if (res.next()) { val compressed = res.bool(Data.compressed) val body = res.bytes(Data.body) - Entry(session_name, theory_name, name, compressed, body) + Entry(session_name, theory_name, name, compressed, Future.value(body)) } - else error(Entry(session_name, theory_name, name, false, Bytes.empty).message("Bad export")) + else error(Entry(session_name, theory_name, name, false, Future.value(Bytes.empty)).message("Bad export")) }) } @@ -93,10 +95,9 @@ private val export_errors = Synchronized[List[String]](Nil) private val consumer = - Consumer_Thread.fork(name = "export")(consume = (future: Future[Entry]) => + Consumer_Thread.fork(name = "export")(consume = (entry: Entry) => { - val entry = future.join - + entry.body.join db.transaction { if (read_names(db, entry.session_name, entry.theory_name).contains(entry.name)) { export_errors.change(errs => entry.message("Duplicate export") :: errs) @@ -106,14 +107,8 @@ true }) - def apply(session_name: String, args: Markup.Export.Args, body: Bytes) - { - consumer.send( - if (args.compress) - Future.fork(make_entry(session_name, args, body)) - else - Future.value(make_entry(session_name, args, body))) - } + def apply(session_name: String, args: Markup.Export.Args, body: Bytes): Unit = + consumer.send(make_entry(session_name, args, body)) def shutdown(close: Boolean = false): List[String] = {