simplified commands_changed_buffer (in contrast to a8331fb5c959): rely on better performance of Consumer_Thread/Mailbox and more direct Timer (like session_actor.receiver);
authorwenzelm
Thu Apr 24 18:04:18 2014 +0200 (2014-04-24)
changeset 56705937826d702d5
parent 56704 c2f0ddd14747
child 56706 f2f53f7046f4
simplified commands_changed_buffer (in contrast to a8331fb5c959): rely on better performance of Consumer_Thread/Mailbox and more direct Timer (like session_actor.receiver);
src/Pure/PIDE/session.scala
     1.1 --- a/src/Pure/PIDE/session.scala	Thu Apr 24 16:52:17 2014 +0200
     1.2 +++ b/src/Pure/PIDE/session.scala	Thu Apr 24 18:04:18 2014 +0200
     1.3 @@ -152,13 +152,42 @@
     1.4  
     1.5    /** buffered command changes (delay_first discipline) **/
     1.6  
     1.7 -  private val commands_changed_buffer =
     1.8 -    Consumer_Thread.fork[Session.Commands_Changed]("commands_changed_buffer", daemon = true)
     1.9 +  private val commands_changed_buffer: Consumer_Thread[(Boolean, List[Command])] =
    1.10    {
    1.11 -    case changed => commands_changed.event(changed); true
    1.12 +    object changed
    1.13 +    {
    1.14 +      private var assignment: Boolean = false
    1.15 +      private var nodes: Set[Document.Node.Name] = Set.empty
    1.16 +      private var commands: Set[Command] = Set.empty
    1.17 +
    1.18 +      def flush(): Unit = synchronized {
    1.19 +        if (assignment || !nodes.isEmpty || !commands.isEmpty)
    1.20 +          commands_changed.event(Session.Commands_Changed(assignment, nodes, commands))
    1.21 +        assignment = false
    1.22 +        nodes = Set.empty
    1.23 +        commands = Set.empty
    1.24 +      }
    1.25 +
    1.26 +      def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
    1.27 +        assignment |= assign
    1.28 +        for (cmd <- cmds) {
    1.29 +          nodes += cmd.node_name
    1.30 +          commands += cmd
    1.31 +        }
    1.32 +      }
    1.33 +    }
    1.34 +
    1.35 +    val timer = new Timer("commands_changed_buffer", true)
    1.36 +    timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
    1.37 +
    1.38 +    Consumer_Thread.fork[(Boolean, List[Command])]("commands_changed_buffer", daemon = true)(
    1.39 +      consume = { case (assign, cmds) => changed.invoke(assign, cmds); true },
    1.40 +      finish = () => { timer.cancel(); changed.flush() }
    1.41 +    )
    1.42    }
    1.43  
    1.44  
    1.45 +
    1.46    /** pipelined change parsing **/
    1.47  
    1.48    private case class Text_Edits(
    1.49 @@ -261,6 +290,7 @@
    1.50            buffer = new mutable.ListBuffer[Prover.Message]
    1.51          }
    1.52        }
    1.53 +
    1.54        def invoke(msg: Prover.Message): Unit = synchronized {
    1.55          msg match {
    1.56            case _: Prover.Input =>
    1.57 @@ -287,49 +317,6 @@
    1.58      var prover: Option[Prover] = None
    1.59  
    1.60  
    1.61 -    /* delayed command changes */
    1.62 -
    1.63 -    object delay_commands_changed
    1.64 -    {
    1.65 -      private var changed_assignment: Boolean = false
    1.66 -      private var changed_nodes: Set[Document.Node.Name] = Set.empty
    1.67 -      private var changed_commands: Set[Command] = Set.empty
    1.68 -
    1.69 -      private var flush_time: Option[Time] = None
    1.70 -
    1.71 -      def flush_timeout: Time =
    1.72 -        flush_time match {
    1.73 -          case None => Time.seconds(5.0)
    1.74 -          case Some(time) => (time - Time.now()) max Time.zero
    1.75 -        }
    1.76 -
    1.77 -      def flush()
    1.78 -      {
    1.79 -        if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
    1.80 -          commands_changed_buffer.send(
    1.81 -            Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands))
    1.82 -        changed_assignment = false
    1.83 -        changed_nodes = Set.empty
    1.84 -        changed_commands = Set.empty
    1.85 -        flush_time = None
    1.86 -      }
    1.87 -
    1.88 -      def invoke(assign: Boolean, commands: List[Command])
    1.89 -      {
    1.90 -        changed_assignment |= assign
    1.91 -        for (command <- commands) {
    1.92 -          changed_nodes += command.node_name
    1.93 -          changed_commands += command
    1.94 -        }
    1.95 -        val now = Time.now()
    1.96 -        flush_time match {
    1.97 -          case None => flush_time = Some(now + output_delay)
    1.98 -          case Some(time) => if (now >= time) flush()
    1.99 -        }
   1.100 -      }
   1.101 -    }
   1.102 -
   1.103 -
   1.104      /* raw edits */
   1.105  
   1.106      def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   1.107 @@ -400,7 +387,7 @@
   1.108        {
   1.109          try {
   1.110            val st = global_state.change_result(_.accumulate(state_id, message))
   1.111 -          delay_commands_changed.invoke(false, List(st.command))
   1.112 +          commands_changed_buffer.send((false, List(st.command)))
   1.113          }
   1.114          catch {
   1.115            case _: Document.State.Fail => bad_output()
   1.116 @@ -424,7 +411,7 @@
   1.117                    case Protocol.Assign_Update(id, update) =>
   1.118                      try {
   1.119                        val cmds = global_state.change_result(_.assign(id, update))
   1.120 -                      delay_commands_changed.invoke(true, cmds)
   1.121 +                      commands_changed_buffer.send((true, cmds))
   1.122                      }
   1.123                      catch { case _: Document.State.Fail => bad_output() }
   1.124                    case _ => bad_output()
   1.125 @@ -479,9 +466,7 @@
   1.126      //{{{
   1.127      var finished = false
   1.128      while (!finished) {
   1.129 -      receiveWithin(delay_commands_changed.flush_timeout.ms) {
   1.130 -        case TIMEOUT => delay_commands_changed.flush()
   1.131 -
   1.132 +      receive {
   1.133          case Start(name, args) if prover.isEmpty =>
   1.134            if (phase == Session.Inactive || phase == Session.Failed) {
   1.135              phase = Session.Startup