buffer prover messages to prevent overloading of session_actor input channel -- which is critical due to synchronous messages wrt. GUI thread;
authorwenzelm
Tue, 06 Sep 2011 11:18:19 +0200
changeset 44733 329320fc88df
parent 44732 c58b69d888ac
child 44734 7313e2db3d39
buffer prover messages to prevent overloading of session_actor input channel -- which is critical due to synchronous messages wrt. GUI thread;
src/Pure/System/isabelle_process.scala
src/Pure/System/session.scala
--- a/src/Pure/System/isabelle_process.scala	Tue Sep 06 10:27:04 2011 +0200
+++ b/src/Pure/System/isabelle_process.scala	Tue Sep 06 11:18:19 2011 +0200
@@ -33,7 +33,7 @@
       ('H' : Int) -> Markup.RAW)
   }
 
-  abstract class Message
+  sealed abstract class Message
 
   class Input(name: String, args: List[String]) extends Message
   {
--- a/src/Pure/System/session.scala	Tue Sep 06 10:27:04 2011 +0200
+++ b/src/Pure/System/session.scala	Tue Sep 06 11:18:19 2011 +0200
@@ -7,8 +7,11 @@
 
 package isabelle
 
+
 import java.lang.System
+import java.util.{Timer, TimerTask}
 
+import scala.collection.mutable
 import scala.actors.TIMEOUT
 import scala.actors.Actor._
 
@@ -37,6 +40,7 @@
 {
   /* real time parameters */  // FIXME properties or settings (!?)
 
+  val message_delay = Time.seconds(0.01)  // prover messages
   val input_delay = Time.seconds(0.3)  // user input (e.g. text edits, cursor movement)
   val output_delay = Time.seconds(0.1)  // prover output (markup, common messages)
   val update_delay = Time.seconds(0.5)  // GUI layout updates
@@ -52,7 +56,7 @@
   val assignments = new Event_Bus[Session.Assignment.type]
   val commands_changed = new Event_Bus[Session.Commands_Changed]
   val phase_changed = new Event_Bus[Session.Phase]
-  val raw_messages = new Event_Bus[Isabelle_Process.Message]
+  val raw_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
 
 
 
@@ -141,16 +145,45 @@
     doc_edits: List[Document.Edit_Command],
     previous: Document.Version,
     version: Document.Version)
+  private case class Messages(msgs: List[Isabelle_Process.Message])
 
   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   {
     val this_actor = self
-    def receiver(msg: Isabelle_Process.Message) { this_actor ! msg }
-    var prover: Option[Isabelle_Process with Isar_Document] = None
 
     var prune_next = System.currentTimeMillis() + prune_delay.ms
 
 
+    /* buffered prover messages */
+
+    object receiver
+    {
+      private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
+
+      def flush(): Unit = synchronized {
+        if (!buffer.isEmpty) {
+          val msgs = buffer.toList
+          this_actor ! Messages(msgs)
+          buffer = new mutable.ListBuffer[Isabelle_Process.Message]
+        }
+      }
+      def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
+        buffer += msg
+        msg match {
+          case result: Isabelle_Process.Result if result.is_raw => flush()
+          case _ =>
+        }
+      }
+
+      private val timer = new Timer("session_actor.receiver", true)
+      timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
+
+      def cancel() { timer.cancel() }
+    }
+
+    var prover: Option[Isabelle_Process with Isar_Document] = None
+
+
     /* delayed command changes */
 
     object commands_changed_delay
@@ -372,7 +405,8 @@
         case Start(timeout, args) if prover.isEmpty =>
           if (phase == Session.Inactive || phase == Session.Failed) {
             phase = Session.Startup
-            prover = Some(new Isabelle_Process(timeout, receiver _, args:_*) with Isar_Document)
+            prover =
+              Some(new Isabelle_Process(timeout, receiver.invoke _, args:_*) with Isar_Document)
           }
 
         case Stop =>
@@ -384,6 +418,7 @@
             phase = Session.Inactive
           }
           finished = true
+          receiver.cancel()
           reply(())
 
         case Interrupt if prover.isDefined =>
@@ -409,12 +444,15 @@
         case change: Change_Node if prover.isDefined =>
           handle_change(change)
 
-        case input: Isabelle_Process.Input =>
-          raw_messages.event(input)
+        case Messages(msgs) =>
+          msgs foreach {
+            case input: Isabelle_Process.Input =>
+              raw_messages.event(input)
 
-        case result: Isabelle_Process.Result =>
-          handle_result(result)
-          raw_messages.event(result)
+            case result: Isabelle_Process.Result =>
+              handle_result(result)
+              raw_messages.event(result)
+          }
 
         case bad => System.err.println("session_actor: ignoring bad message " + bad)
       }