# HG changeset patch # User wenzelm # Date 1398426950 -7200 # Node ID 80eb2192516a74c72d8552c687693180d4c0ba8f # Parent 096139bcfaddc60bfdc06cc3b5a7fc3efddceaa4 simplified change_buffer (again, see 937826d702d5): no thread, just timer, rely on asynchronous commands_changed.post; diff -r 096139bcfadd -r 80eb2192516a src/Pure/PIDE/session.scala --- a/src/Pure/PIDE/session.scala Fri Apr 25 13:29:56 2014 +0200 +++ b/src/Pure/PIDE/session.scala Fri Apr 25 13:55:50 2014 +0200 @@ -182,46 +182,6 @@ - /** buffered changes: to be dispatched to clients **/ - - private case class Received_Change(assignment: Boolean, commands: List[Command]) - - private val change_buffer: Consumer_Thread[Received_Change] = - { - object changed - { - private var assignment: Boolean = false - private var nodes: Set[Document.Node.Name] = Set.empty - private var commands: Set[Command] = Set.empty - - def flush(): Unit = synchronized { - if (assignment || !nodes.isEmpty || !commands.isEmpty) - commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) - assignment = false - nodes = Set.empty - commands = Set.empty - } - - def invoke(change: Received_Change): Unit = synchronized { - assignment |= change.assignment - for (command <- change.commands) { - nodes += command.node_name - commands += command - } - } - } - - val timer = new Timer("change_buffer", true) - timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms) - - Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)( - consume = { case change => changed.invoke(change); true }, - finish = () => { timer.cancel(); changed.flush() } - ) - } - - - /** pipelined change parsing **/ private case class Text_Edits( @@ -305,6 +265,41 @@ private case class Update_Options(options: Options) + /* buffered changes */ + + private object change_buffer + { + private var assignment: Boolean = false + private var nodes: Set[Document.Node.Name] = Set.empty + private var commands: Set[Command] = Set.empty + + def flush(): Unit = synchronized { + if (assignment || !nodes.isEmpty || !commands.isEmpty) + commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) + assignment = false + nodes = Set.empty + commands = Set.empty + } + + def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized { + assignment |= assign + for (command <- cmds) { + nodes += command.node_name + commands += command + } + } + + private val timer = new Timer("change_buffer", true) + timer.schedule(new TimerTask { def run = flush() }, output_delay.ms, output_delay.ms) + + def shutdown() + { + timer.cancel() + flush() + } + } + + /* buffered prover messages */ private object receiver @@ -443,7 +438,7 @@ { try { val st = global_state.change_result(_.accumulate(state_id, message)) - change_buffer.send(Received_Change(false, List(st.command))) + change_buffer.invoke(false, List(st.command)) } catch { case _: Document.State.Fail => bad_output() @@ -467,7 +462,7 @@ case Protocol.Assign_Update(id, update) => try { val cmds = global_state.change_result(_.assign(id, update)) - change_buffer.send(Received_Change(true, cmds)) + change_buffer.invoke(true, cmds) } catch { case _: Document.State.Fail => bad_output() } postponed_changes.flush()