simplified commands_changed_buffer (in contrast to a8331fb5c959): rely on better performance of Consumer_Thread/Mailbox and more direct Timer (like session_actor.receiver);
--- a/src/Pure/PIDE/session.scala Thu Apr 24 16:52:17 2014 +0200
+++ b/src/Pure/PIDE/session.scala Thu Apr 24 18:04:18 2014 +0200
@@ -152,13 +152,42 @@
/** buffered command changes (delay_first discipline) **/
- private val commands_changed_buffer =
- Consumer_Thread.fork[Session.Commands_Changed]("commands_changed_buffer", daemon = true)
+ private val commands_changed_buffer: Consumer_Thread[(Boolean, List[Command])] =
{
- case changed => commands_changed.event(changed); true
+ object changed
+ {
+ private var assignment: Boolean = false
+ private var nodes: Set[Document.Node.Name] = Set.empty
+ private var commands: Set[Command] = Set.empty
+
+ def flush(): Unit = synchronized {
+ if (assignment || !nodes.isEmpty || !commands.isEmpty)
+ commands_changed.event(Session.Commands_Changed(assignment, nodes, commands))
+ assignment = false
+ nodes = Set.empty
+ commands = Set.empty
+ }
+
+ def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
+ assignment |= assign
+ for (cmd <- cmds) {
+ nodes += cmd.node_name
+ commands += cmd
+ }
+ }
+ }
+
+ val timer = new Timer("commands_changed_buffer", true)
+ timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
+
+ Consumer_Thread.fork[(Boolean, List[Command])]("commands_changed_buffer", daemon = true)(
+ consume = { case (assign, cmds) => changed.invoke(assign, cmds); true },
+ finish = () => { timer.cancel(); changed.flush() }
+ )
}
+
/** pipelined change parsing **/
private case class Text_Edits(
@@ -261,6 +290,7 @@
buffer = new mutable.ListBuffer[Prover.Message]
}
}
+
def invoke(msg: Prover.Message): Unit = synchronized {
msg match {
case _: Prover.Input =>
@@ -287,49 +317,6 @@
var prover: Option[Prover] = None
- /* delayed command changes */
-
- object delay_commands_changed
- {
- private var changed_assignment: Boolean = false
- private var changed_nodes: Set[Document.Node.Name] = Set.empty
- private var changed_commands: Set[Command] = Set.empty
-
- private var flush_time: Option[Time] = None
-
- def flush_timeout: Time =
- flush_time match {
- case None => Time.seconds(5.0)
- case Some(time) => (time - Time.now()) max Time.zero
- }
-
- def flush()
- {
- if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
- commands_changed_buffer.send(
- Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands))
- changed_assignment = false
- changed_nodes = Set.empty
- changed_commands = Set.empty
- flush_time = None
- }
-
- def invoke(assign: Boolean, commands: List[Command])
- {
- changed_assignment |= assign
- for (command <- commands) {
- changed_nodes += command.node_name
- changed_commands += command
- }
- val now = Time.now()
- flush_time match {
- case None => flush_time = Some(now + output_delay)
- case Some(time) => if (now >= time) flush()
- }
- }
- }
-
-
/* raw edits */
def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
@@ -400,7 +387,7 @@
{
try {
val st = global_state.change_result(_.accumulate(state_id, message))
- delay_commands_changed.invoke(false, List(st.command))
+ commands_changed_buffer.send((false, List(st.command)))
}
catch {
case _: Document.State.Fail => bad_output()
@@ -424,7 +411,7 @@
case Protocol.Assign_Update(id, update) =>
try {
val cmds = global_state.change_result(_.assign(id, update))
- delay_commands_changed.invoke(true, cmds)
+ commands_changed_buffer.send((true, cmds))
}
catch { case _: Document.State.Fail => bad_output() }
case _ => bad_output()
@@ -479,9 +466,7 @@
//{{{
var finished = false
while (!finished) {
- receiveWithin(delay_commands_changed.flush_timeout.ms) {
- case TIMEOUT => delay_commands_changed.flush()
-
+ receive {
case Start(name, args) if prover.isEmpty =>
if (phase == Session.Inactive || phase == Session.Failed) {
phase = Session.Startup