more robust (synchronous) management of Export.Entry: Future.fork happens inside the data structure;
authorwenzelm
Mon, 07 May 2018 17:37:03 +0200
changeset 68103 c5764b8b2a87
parent 68102 813b5d0904c6
child 68104 3795f67716e6
more robust (synchronous) management of Export.Entry: Future.fork happens inside the data structure; tuned;
src/Pure/PIDE/session.scala
src/Pure/Thy/export.scala
--- a/src/Pure/PIDE/session.scala	Mon May 07 17:20:39 2018 +0200
+++ b/src/Pure/PIDE/session.scala	Mon May 07 17:37:03 2018 +0200
@@ -191,7 +191,6 @@
   private case object Stop
   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   private case class Protocol_Command(name: String, args: List[String])
-  private case class Add_Export(args: Markup.Export.Args, bytes: Bytes, output: Prover.Output)
   private case class Update_Options(options: Options)
   private case object Consolidate_Execution
   private case object Prune_History
@@ -401,26 +400,23 @@
 
     /* prover output */
 
-    def bad_output(output: Prover.Output)
-    {
-      if (verbose)
-        Output.warning("Ignoring bad prover output: " + output.message.toString)
-    }
-
-    def change_command(f: Document.State => (Command.State, Document.State), output: Prover.Output)
-    {
-      try {
-        val st = global_state.change_result(f)
-        change_buffer.invoke(false, List(st.command))
-      }
-      catch { case _: Document.State.Fail => bad_output(output) }
-    }
-
     def handle_output(output: Prover.Output)
     //{{{
     {
-      def accumulate(state_id: Document_ID.Generic, message: XML.Elem): Unit =
-        change_command(_.accumulate(state_id, message, xml_cache), output)
+      def bad_output()
+      {
+        if (verbose)
+          Output.warning("Ignoring bad prover output: " + output.message.toString)
+      }
+
+      def change_command(f: Document.State => (Command.State, Document.State))
+      {
+        try {
+          val st = global_state.change_result(f)
+          change_buffer.invoke(false, List(st.command))
+        }
+        catch { case _: Document.State.Fail => bad_output() }
+      }
 
       output match {
         case msg: Prover.Protocol_Output =>
@@ -432,17 +428,16 @@
 
               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
-                accumulate(state_id, xml_cache.elem(message))
+                change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache))
 
               case Protocol.Theory_Timing(_, _) =>
                 // FIXME
 
               case Markup.Export(args)
               if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined =>
-                if (args.compress) {
-                  Future.fork { manager.send(Add_Export(args, msg.bytes.compress(), output)) }
-                }
-                else manager.send(Add_Export(args, msg.bytes, output))
+                val id = Value.Long.unapply(args.id.get).get
+                val entry = (args.serial, Export.make_entry("", args, msg.bytes))
+                change_command(_.add_export(id, entry))
 
               case Markup.Assign_Update =>
                 msg.text match {
@@ -452,8 +447,8 @@
                       change_buffer.invoke(true, cmds)
                       manager.send(Session.Change_Flush)
                     }
-                    catch { case _: Document.State.Fail => bad_output(output) }
-                  case _ => bad_output(output)
+                    catch { case _: Document.State.Fail => bad_output() }
+                  case _ => bad_output()
                 }
                 delay_prune.invoke()
 
@@ -464,8 +459,8 @@
                       global_state.change(_.removed_versions(removed))
                       manager.send(Session.Change_Flush)
                     }
-                    catch { case _: Document.State.Fail => bad_output(output) }
-                  case _ => bad_output(output)
+                    catch { case _: Document.State.Fail => bad_output() }
+                  case _ => bad_output()
                 }
 
               case Markup.ML_Statistics(props) =>
@@ -474,13 +469,13 @@
               case Markup.Task_Statistics(props) =>
                 // FIXME
 
-              case _ => bad_output(output)
+              case _ => bad_output()
             }
           }
         case _ =>
           output.properties match {
             case Position.Id(state_id) =>
-              accumulate(state_id, output.message)
+              change_command(_.accumulate(state_id, output.message, xml_cache))
 
             case _ if output.is_init =>
               prover.get.options(session_options)
@@ -562,11 +557,6 @@
           case Protocol_Command(name, args) if prover.defined =>
             prover.get.protocol_command(name, args:_*)
 
-          case Add_Export(args, bytes, output) =>
-            val id = Value.Long.parse(args.id.get)
-            val entry = (args.serial, Export.make_entry("", args, bytes))
-            change_command(_.add_export(id, entry), output)
-
           case change: Session.Change if prover.defined =>
             val state = global_state.value
             if (!state.removing_versions && state.is_assigned(change.previous))
--- a/src/Pure/Thy/export.scala	Mon May 07 17:20:39 2018 +0200
+++ b/src/Pure/Thy/export.scala	Mon May 07 17:37:03 2018 +0200
@@ -34,25 +34,27 @@
   }
 
   sealed case class Entry(
-    session_name: String, theory_name: String, name: String, compressed: Boolean, body: Bytes)
+    session_name: String,
+    theory_name: String,
+    name: String,
+    compressed: Boolean,
+    body: Future[Bytes])
   {
     override def toString: String = theory_name + ":" + name
 
     def message(msg: String): String =
       msg + " " + quote(name) + " for theory " + quote(theory_name)
 
-    lazy val compressed_body: Bytes = if (compressed) body else body.compress()
-    lazy val uncompressed_body: Bytes = if (compressed) body.uncompress() else body
-
     def write(db: SQL.Database)
     {
+      val bytes = body.join
       db.using_statement(Data.table.insert())(stmt =>
       {
         stmt.string(1) = session_name
         stmt.string(2) = theory_name
         stmt.string(3) = name
         stmt.bool(4) = compressed
-        stmt.bytes(5) = body
+        stmt.bytes(5) = bytes
         stmt.execute()
       })
     }
@@ -60,8 +62,8 @@
 
   def make_entry(session_name: String, args: Markup.Export.Args, body: Bytes): Entry =
   {
-    val bytes = if (args.compress) body.compress() else body
-    Entry(session_name, args.theory_name, args.name, args.compress, bytes)
+    Entry(session_name, args.theory_name, args.name, args.compress,
+      if (args.compress) Future.fork(body.compress()) else Future.value(body))
   }
 
   def read_entry(db: SQL.Database, session_name: String, theory_name: String, name: String): Entry =
@@ -75,9 +77,9 @@
       if (res.next()) {
         val compressed = res.bool(Data.compressed)
         val body = res.bytes(Data.body)
-        Entry(session_name, theory_name, name, compressed, body)
+        Entry(session_name, theory_name, name, compressed, Future.value(body))
       }
-      else error(Entry(session_name, theory_name, name, false, Bytes.empty).message("Bad export"))
+      else error(Entry(session_name, theory_name, name, false, Future.value(Bytes.empty)).message("Bad export"))
     })
   }
 
@@ -93,10 +95,9 @@
     private val export_errors = Synchronized[List[String]](Nil)
 
     private val consumer =
-      Consumer_Thread.fork(name = "export")(consume = (future: Future[Entry]) =>
+      Consumer_Thread.fork(name = "export")(consume = (entry: Entry) =>
         {
-          val entry = future.join
-
+          entry.body.join
           db.transaction {
             if (read_names(db, entry.session_name, entry.theory_name).contains(entry.name)) {
               export_errors.change(errs => entry.message("Duplicate export") :: errs)
@@ -106,14 +107,8 @@
           true
         })
 
-    def apply(session_name: String, args: Markup.Export.Args, body: Bytes)
-    {
-      consumer.send(
-        if (args.compress)
-          Future.fork(make_entry(session_name, args, body))
-        else
-          Future.value(make_entry(session_name, args, body)))
-    }
+    def apply(session_name: String, args: Markup.Export.Args, body: Bytes): Unit =
+      consumer.send(make_entry(session_name, args, body))
 
     def shutdown(close: Boolean = false): List[String] =
     {