simplified change_buffer (again, see 937826d702d5): no thread, just timer, rely on asynchronous commands_changed.post;
authorwenzelm
Fri, 25 Apr 2014 13:55:50 +0200
changeset 56719 80eb2192516a
parent 56718 096139bcfadd
child 56720 e1317a26f8c0
simplified change_buffer (again, see 937826d702d5): no thread, just timer, rely on asynchronous commands_changed.post;
src/Pure/PIDE/session.scala
--- a/src/Pure/PIDE/session.scala	Fri Apr 25 13:29:56 2014 +0200
+++ b/src/Pure/PIDE/session.scala	Fri Apr 25 13:55:50 2014 +0200
@@ -182,46 +182,6 @@
 
 
 
-  /** buffered changes: to be dispatched to clients **/
-
-  private case class Received_Change(assignment: Boolean, commands: List[Command])
-
-  private val change_buffer: Consumer_Thread[Received_Change] =
-  {
-    object changed
-    {
-      private var assignment: Boolean = false
-      private var nodes: Set[Document.Node.Name] = Set.empty
-      private var commands: Set[Command] = Set.empty
-
-      def flush(): Unit = synchronized {
-        if (assignment || !nodes.isEmpty || !commands.isEmpty)
-          commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
-        assignment = false
-        nodes = Set.empty
-        commands = Set.empty
-      }
-
-      def invoke(change: Received_Change): Unit = synchronized {
-        assignment |= change.assignment
-        for (command <- change.commands) {
-          nodes += command.node_name
-          commands += command
-        }
-      }
-    }
-
-    val timer = new Timer("change_buffer", true)
-    timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
-
-    Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)(
-      consume = { case change => changed.invoke(change); true },
-      finish = () => { timer.cancel(); changed.flush() }
-    )
-  }
-
-
-
   /** pipelined change parsing **/
 
   private case class Text_Edits(
@@ -305,6 +265,41 @@
   private case class Update_Options(options: Options)
 
 
+  /* buffered changes */
+
+  private object change_buffer
+  {
+    private var assignment: Boolean = false
+    private var nodes: Set[Document.Node.Name] = Set.empty
+    private var commands: Set[Command] = Set.empty
+
+    def flush(): Unit = synchronized {
+      if (assignment || !nodes.isEmpty || !commands.isEmpty)
+        commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
+      assignment = false
+      nodes = Set.empty
+      commands = Set.empty
+    }
+
+    def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
+      assignment |= assign
+      for (command <- cmds) {
+        nodes += command.node_name
+        commands += command
+      }
+    }
+
+    private val timer = new Timer("change_buffer", true)
+    timer.schedule(new TimerTask { def run = flush() }, output_delay.ms, output_delay.ms)
+
+    def shutdown()
+    {
+      timer.cancel()
+      flush()
+    }
+  }
+
+
   /* buffered prover messages */
 
   private object receiver
@@ -443,7 +438,7 @@
       {
         try {
           val st = global_state.change_result(_.accumulate(state_id, message))
-          change_buffer.send(Received_Change(false, List(st.command)))
+          change_buffer.invoke(false, List(st.command))
         }
         catch {
           case _: Document.State.Fail => bad_output()
@@ -467,7 +462,7 @@
                   case Protocol.Assign_Update(id, update) =>
                     try {
                       val cmds = global_state.change_result(_.assign(id, update))
-                      change_buffer.send(Received_Change(true, cmds))
+                      change_buffer.invoke(true, cmds)
                     }
                     catch { case _: Document.State.Fail => bad_output() }
                     postponed_changes.flush()