simplified commands_changed_buffer (in contrast to a8331fb5c959): rely on better performance of Consumer_Thread/Mailbox and more direct Timer (like session_actor.receiver);
authorwenzelm
Thu, 24 Apr 2014 18:04:18 +0200
changeset 56705 937826d702d5
parent 56704 c2f0ddd14747
child 56706 f2f53f7046f4
simplified commands_changed_buffer (in contrast to a8331fb5c959): rely on better performance of Consumer_Thread/Mailbox and more direct Timer (like session_actor.receiver);
src/Pure/PIDE/session.scala
--- a/src/Pure/PIDE/session.scala	Thu Apr 24 16:52:17 2014 +0200
+++ b/src/Pure/PIDE/session.scala	Thu Apr 24 18:04:18 2014 +0200
@@ -152,13 +152,42 @@
 
   /** buffered command changes (delay_first discipline) **/
 
-  private val commands_changed_buffer =
-    Consumer_Thread.fork[Session.Commands_Changed]("commands_changed_buffer", daemon = true)
+  private val commands_changed_buffer: Consumer_Thread[(Boolean, List[Command])] =
   {
-    case changed => commands_changed.event(changed); true
+    object changed
+    {
+      private var assignment: Boolean = false
+      private var nodes: Set[Document.Node.Name] = Set.empty
+      private var commands: Set[Command] = Set.empty
+
+      def flush(): Unit = synchronized {
+        if (assignment || !nodes.isEmpty || !commands.isEmpty)
+          commands_changed.event(Session.Commands_Changed(assignment, nodes, commands))
+        assignment = false
+        nodes = Set.empty
+        commands = Set.empty
+      }
+
+      def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
+        assignment |= assign
+        for (cmd <- cmds) {
+          nodes += cmd.node_name
+          commands += cmd
+        }
+      }
+    }
+
+    val timer = new Timer("commands_changed_buffer", true)
+    timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
+
+    Consumer_Thread.fork[(Boolean, List[Command])]("commands_changed_buffer", daemon = true)(
+      consume = { case (assign, cmds) => changed.invoke(assign, cmds); true },
+      finish = () => { timer.cancel(); changed.flush() }
+    )
   }
 
 
+
   /** pipelined change parsing **/
 
   private case class Text_Edits(
@@ -261,6 +290,7 @@
           buffer = new mutable.ListBuffer[Prover.Message]
         }
       }
+
       def invoke(msg: Prover.Message): Unit = synchronized {
         msg match {
           case _: Prover.Input =>
@@ -287,49 +317,6 @@
     var prover: Option[Prover] = None
 
 
-    /* delayed command changes */
-
-    object delay_commands_changed
-    {
-      private var changed_assignment: Boolean = false
-      private var changed_nodes: Set[Document.Node.Name] = Set.empty
-      private var changed_commands: Set[Command] = Set.empty
-
-      private var flush_time: Option[Time] = None
-
-      def flush_timeout: Time =
-        flush_time match {
-          case None => Time.seconds(5.0)
-          case Some(time) => (time - Time.now()) max Time.zero
-        }
-
-      def flush()
-      {
-        if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
-          commands_changed_buffer.send(
-            Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands))
-        changed_assignment = false
-        changed_nodes = Set.empty
-        changed_commands = Set.empty
-        flush_time = None
-      }
-
-      def invoke(assign: Boolean, commands: List[Command])
-      {
-        changed_assignment |= assign
-        for (command <- commands) {
-          changed_nodes += command.node_name
-          changed_commands += command
-        }
-        val now = Time.now()
-        flush_time match {
-          case None => flush_time = Some(now + output_delay)
-          case Some(time) => if (now >= time) flush()
-        }
-      }
-    }
-
-
     /* raw edits */
 
     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
@@ -400,7 +387,7 @@
       {
         try {
           val st = global_state.change_result(_.accumulate(state_id, message))
-          delay_commands_changed.invoke(false, List(st.command))
+          commands_changed_buffer.send((false, List(st.command)))
         }
         catch {
           case _: Document.State.Fail => bad_output()
@@ -424,7 +411,7 @@
                   case Protocol.Assign_Update(id, update) =>
                     try {
                       val cmds = global_state.change_result(_.assign(id, update))
-                      delay_commands_changed.invoke(true, cmds)
+                      commands_changed_buffer.send((true, cmds))
                     }
                     catch { case _: Document.State.Fail => bad_output() }
                   case _ => bad_output()
@@ -479,9 +466,7 @@
     //{{{
     var finished = false
     while (!finished) {
-      receiveWithin(delay_commands_changed.flush_timeout.ms) {
-        case TIMEOUT => delay_commands_changed.flush()
-
+      receive {
         case Start(name, args) if prover.isEmpty =>
           if (phase == Session.Inactive || phase == Session.Failed) {
             phase = Session.Startup