converted main session manager to Consumer_Thread: messages need to be consumed immediately, postponed_changes replaces implicit actor mailbox scanning;
authorwenzelm
Thu Apr 24 22:10:00 2014 +0200 (2014-04-24)
changeset 56706f2f53f7046f4
parent 56705 937826d702d5
child 56707 aa4631879df8
converted main session manager to Consumer_Thread: messages need to be consumed immediately, postponed_changes replaces implicit actor mailbox scanning;
src/Pure/PIDE/session.scala
     1.1 --- a/src/Pure/PIDE/session.scala	Thu Apr 24 18:04:18 2014 +0200
     1.2 +++ b/src/Pure/PIDE/session.scala	Thu Apr 24 22:10:00 2014 +0200
     1.3 @@ -12,8 +12,6 @@
     1.4  
     1.5  import scala.collection.mutable
     1.6  import scala.collection.immutable.Queue
     1.7 -import scala.actors.{Actor, TIMEOUT}
     1.8 -import scala.actors.Actor._
     1.9  
    1.10  
    1.11  object Session
    1.12 @@ -150,9 +148,12 @@
    1.13    val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
    1.14  
    1.15  
    1.16 -  /** buffered command changes (delay_first discipline) **/
    1.17 +
    1.18 +  /** buffered changes: to be dispatched to clients **/
    1.19  
    1.20 -  private val commands_changed_buffer: Consumer_Thread[(Boolean, List[Command])] =
    1.21 +  private case class Received_Change(assignment: Boolean, commands: List[Command])
    1.22 +
    1.23 +  private val change_buffer: Consumer_Thread[Received_Change] =
    1.24    {
    1.25      object changed
    1.26      {
    1.27 @@ -168,20 +169,20 @@
    1.28          commands = Set.empty
    1.29        }
    1.30  
    1.31 -      def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
    1.32 -        assignment |= assign
    1.33 -        for (cmd <- cmds) {
    1.34 -          nodes += cmd.node_name
    1.35 -          commands += cmd
    1.36 +      def invoke(change: Received_Change): Unit = synchronized {
    1.37 +        assignment |= change.assignment
    1.38 +        for (command <- change.commands) {
    1.39 +          nodes += command.node_name
    1.40 +          commands += command
    1.41          }
    1.42        }
    1.43      }
    1.44  
    1.45 -    val timer = new Timer("commands_changed_buffer", true)
    1.46 +    val timer = new Timer("change_buffer", true)
    1.47      timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
    1.48  
    1.49 -    Consumer_Thread.fork[(Boolean, List[Command])]("commands_changed_buffer", daemon = true)(
    1.50 -      consume = { case (assign, cmds) => changed.invoke(assign, cmds); true },
    1.51 +    Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)(
    1.52 +      consume = { case change => changed.invoke(change); true },
    1.53        finish = () => { timer.cancel(); changed.flush() }
    1.54      )
    1.55    }
    1.56 @@ -205,13 +206,13 @@
    1.57            resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
    1.58          }
    1.59        version_result.fulfill(change.version)
    1.60 -      session_actor ! change
    1.61 +      manager.send(change)
    1.62        true
    1.63    }
    1.64  
    1.65  
    1.66  
    1.67 -  /** main protocol actor **/
    1.68 +  /** main protocol manager **/
    1.69  
    1.70    /* global state */
    1.71  
    1.72 @@ -261,58 +262,75 @@
    1.73    }
    1.74  
    1.75  
    1.76 -  /* actor messages */
    1.77 +  /* internal messages */
    1.78  
    1.79 -  private case object Stop
    1.80    private case class Start(name: String, args: List[String])
    1.81    private case class Cancel_Exec(exec_id: Document_ID.Exec)
    1.82    private case class Protocol_Command(name: String, args: List[String])
    1.83    private case class Messages(msgs: List[Prover.Message])
    1.84    private case class Update_Options(options: Options)
    1.85  
    1.86 -  private val session_actor: Actor = Simple_Thread.actor("session_actor", daemon = true)
    1.87 +
    1.88 +  /* buffered prover messages */
    1.89 +
    1.90 +  private object receiver
    1.91    {
    1.92 -    val this_actor = self
    1.93 +    private var buffer = new mutable.ListBuffer[Prover.Message]
    1.94 +
    1.95 +    private def flush(): Unit = synchronized {
    1.96 +      if (!buffer.isEmpty) {
    1.97 +        val msgs = buffer.toList
    1.98 +        manager.send(Messages(msgs))
    1.99 +        buffer = new mutable.ListBuffer[Prover.Message]
   1.100 +      }
   1.101 +    }
   1.102  
   1.103 -    var prune_next = Time.now() + prune_delay
   1.104 +    def invoke(msg: Prover.Message): Unit = synchronized {
   1.105 +      msg match {
   1.106 +        case _: Prover.Input =>
   1.107 +          buffer += msg
   1.108 +        case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
   1.109 +          flush()
   1.110 +        case output: Prover.Output =>
   1.111 +          buffer += msg
   1.112 +          if (output.is_syslog)
   1.113 +            syslog.change(queue =>
   1.114 +              {
   1.115 +                val queue1 = queue.enqueue(output.message)
   1.116 +                if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   1.117 +              })
   1.118 +      }
   1.119 +    }
   1.120 +
   1.121 +    private val timer = new Timer("receiver", true)
   1.122 +    timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   1.123 +
   1.124 +    def cancel() { timer.cancel() }
   1.125 +  }
   1.126  
   1.127  
   1.128 -    /* buffered prover messages */
   1.129 -
   1.130 -    object receiver
   1.131 -    {
   1.132 -      private var buffer = new mutable.ListBuffer[Prover.Message]
   1.133 +  /* postponed changes */
   1.134  
   1.135 -      private def flush(): Unit = synchronized {
   1.136 -        if (!buffer.isEmpty) {
   1.137 -          val msgs = buffer.toList
   1.138 -          this_actor ! Messages(msgs)
   1.139 -          buffer = new mutable.ListBuffer[Prover.Message]
   1.140 -        }
   1.141 -      }
   1.142 +  private object postponed_changes
   1.143 +  {
   1.144 +    private var postponed: List[Session.Change] = Nil
   1.145 +
   1.146 +    def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   1.147  
   1.148 -      def invoke(msg: Prover.Message): Unit = synchronized {
   1.149 -        msg match {
   1.150 -          case _: Prover.Input =>
   1.151 -            buffer += msg
   1.152 -          case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
   1.153 -            flush()
   1.154 -          case output: Prover.Output =>
   1.155 -            buffer += msg
   1.156 -            if (output.is_syslog)
   1.157 -              syslog.change(queue =>
   1.158 -                {
   1.159 -                  val queue1 = queue.enqueue(output.message)
   1.160 -                  if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   1.161 -                })
   1.162 -        }
   1.163 -      }
   1.164 +    def flush(): Unit = synchronized {
   1.165 +      val state = global_state.value
   1.166 +      val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   1.167 +      postponed = unassigned
   1.168 +      assigned.reverseIterator.foreach(change => manager.send(change))
   1.169 +    }
   1.170 +  }
   1.171  
   1.172 -      private val timer = new Timer("session_actor.receiver", true)
   1.173 -      timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   1.174 +
   1.175 +  /* manager thread */
   1.176  
   1.177 -      def cancel() { timer.cancel() }
   1.178 -    }
   1.179 +  private val manager: Consumer_Thread[Any] =
   1.180 +  {
   1.181 +    var prune_next = Time.now() + prune_delay
   1.182  
   1.183      var prover: Option[Prover] = None
   1.184  
   1.185 @@ -387,7 +405,7 @@
   1.186        {
   1.187          try {
   1.188            val st = global_state.change_result(_.accumulate(state_id, message))
   1.189 -          commands_changed_buffer.send((false, List(st.command)))
   1.190 +          change_buffer.send(Received_Change(false, List(st.command)))
   1.191          }
   1.192          catch {
   1.193            case _: Document.State.Fail => bad_output()
   1.194 @@ -411,9 +429,10 @@
   1.195                    case Protocol.Assign_Update(id, update) =>
   1.196                      try {
   1.197                        val cmds = global_state.change_result(_.assign(id, update))
   1.198 -                      commands_changed_buffer.send((true, cmds))
   1.199 +                      change_buffer.send(Received_Change(true, cmds))
   1.200                      }
   1.201                      catch { case _: Document.State.Fail => bad_output() }
   1.202 +                    postponed_changes.flush()
   1.203                    case _ => bad_output()
   1.204                  }
   1.205                  // FIXME separate timeout event/message!?
   1.206 @@ -461,19 +480,63 @@
   1.207      //}}}
   1.208  
   1.209  
   1.210 -    /* main loop */
   1.211 +    /* main thread */
   1.212 +
   1.213 +    Consumer_Thread.fork[Any]("manager", daemon = true)(
   1.214 +      consume = (arg: Any) =>
   1.215 +        {
   1.216 +          //{{{
   1.217 +          arg match {
   1.218 +            case Start(name, args) if prover.isEmpty =>
   1.219 +              if (phase == Session.Inactive || phase == Session.Failed) {
   1.220 +                phase = Session.Startup
   1.221 +                prover = Some(resources.start_prover(receiver.invoke _, name, args))
   1.222 +              }
   1.223 +
   1.224 +            case Update_Options(options) =>
   1.225 +              if (prover.isDefined && is_ready) {
   1.226 +                prover.get.options(options)
   1.227 +                handle_raw_edits(Document.Blobs.empty, Nil)
   1.228 +              }
   1.229 +              global_options.event(Session.Global_Options(options))
   1.230 +
   1.231 +            case Cancel_Exec(exec_id) if prover.isDefined =>
   1.232 +              prover.get.cancel_exec(exec_id)
   1.233 +
   1.234 +            case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   1.235 +              handle_raw_edits(doc_blobs, edits)
   1.236  
   1.237 -    //{{{
   1.238 -    var finished = false
   1.239 -    while (!finished) {
   1.240 -      receive {
   1.241 -        case Start(name, args) if prover.isEmpty =>
   1.242 -          if (phase == Session.Inactive || phase == Session.Failed) {
   1.243 -            phase = Session.Startup
   1.244 -            prover = Some(resources.start_prover(receiver.invoke _, name, args))
   1.245 +            case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   1.246 +              prover.get.dialog_result(serial, result)
   1.247 +              handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   1.248 +
   1.249 +            case Protocol_Command(name, args) if prover.isDefined =>
   1.250 +              prover.get.protocol_command(name, args:_*)
   1.251 +
   1.252 +            case Messages(msgs) =>
   1.253 +              msgs foreach {
   1.254 +                case input: Prover.Input =>
   1.255 +                  all_messages.event(input)
   1.256 +
   1.257 +                case output: Prover.Output =>
   1.258 +                  if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   1.259 +                  else handle_output(output)
   1.260 +                  if (output.is_syslog) syslog_messages.event(output)
   1.261 +                  all_messages.event(output)
   1.262 +              }
   1.263 +
   1.264 +            case change: Session.Change if prover.isDefined =>
   1.265 +              if (global_state.value.is_assigned(change.previous))
   1.266 +                handle_change(change)
   1.267 +              else postponed_changes.store(change)
   1.268 +
   1.269 +            case bad => System.err.println("Session.manager: ignoring bad message " + bad)
   1.270            }
   1.271 -
   1.272 -        case Stop =>
   1.273 +          true
   1.274 +          //}}}
   1.275 +        },
   1.276 +      finish = () =>
   1.277 +        {
   1.278            if (phase == Session.Ready) {
   1.279              _protocol_handlers = _protocol_handlers.stop(prover.get)
   1.280              global_state.change(_ => Document.State.init)  // FIXME event bus!?
   1.281 @@ -482,81 +545,36 @@
   1.282              prover = None
   1.283              phase = Session.Inactive
   1.284            }
   1.285 -          finished = true
   1.286            receiver.cancel()
   1.287 -          reply(())
   1.288 -
   1.289 -        case Update_Options(options) =>
   1.290 -          if (prover.isDefined && is_ready) {
   1.291 -            prover.get.options(options)
   1.292 -            handle_raw_edits(Document.Blobs.empty, Nil)
   1.293 -          }
   1.294 -          global_options.event(Session.Global_Options(options))
   1.295 -          reply(())
   1.296 -
   1.297 -        case Cancel_Exec(exec_id) if prover.isDefined =>
   1.298 -          prover.get.cancel_exec(exec_id)
   1.299 -
   1.300 -        case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   1.301 -          handle_raw_edits(doc_blobs, edits)
   1.302 -          reply(())
   1.303 -
   1.304 -        case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   1.305 -          prover.get.dialog_result(serial, result)
   1.306 -          handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   1.307 -
   1.308 -        case Protocol_Command(name, args) if prover.isDefined =>
   1.309 -          prover.get.protocol_command(name, args:_*)
   1.310 -
   1.311 -        case Messages(msgs) =>
   1.312 -          msgs foreach {
   1.313 -            case input: Prover.Input =>
   1.314 -              all_messages.event(input)
   1.315 -
   1.316 -            case output: Prover.Output =>
   1.317 -              if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   1.318 -              else handle_output(output)
   1.319 -              if (output.is_syslog) syslog_messages.event(output)
   1.320 -              all_messages.event(output)
   1.321 -          }
   1.322 -
   1.323 -        case change: Session.Change
   1.324 -        if prover.isDefined && global_state.value.is_assigned(change.previous) =>
   1.325 -          handle_change(change)
   1.326 -
   1.327 -        case bad if !bad.isInstanceOf[Session.Change] =>
   1.328 -          System.err.println("session_actor: ignoring bad message " + bad)
   1.329 -      }
   1.330 -    }
   1.331 -    //}}}
   1.332 +        }
   1.333 +    )
   1.334    }
   1.335  
   1.336  
   1.337    /* actions */
   1.338  
   1.339    def start(name: String, args: List[String])
   1.340 -  {
   1.341 -    session_actor ! Start(name, args)
   1.342 -  }
   1.343 +  { manager.send(Start(name, args)) }
   1.344  
   1.345    def stop()
   1.346    {
   1.347 -    commands_changed_buffer.shutdown()
   1.348      change_parser.shutdown()
   1.349 -    session_actor !? Stop
   1.350 +    change_buffer.shutdown()
   1.351 +    manager.shutdown()
   1.352    }
   1.353  
   1.354    def protocol_command(name: String, args: String*)
   1.355 -  { session_actor ! Protocol_Command(name, args.toList) }
   1.356 +  { manager.send(Protocol_Command(name, args.toList)) }
   1.357  
   1.358 -  def cancel_exec(exec_id: Document_ID.Exec) { session_actor ! Cancel_Exec(exec_id) }
   1.359 +  def cancel_exec(exec_id: Document_ID.Exec)
   1.360 +  { manager.send(Cancel_Exec(exec_id)) }
   1.361  
   1.362    def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   1.363 -  { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(doc_blobs, edits) }
   1.364 +  { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   1.365  
   1.366    def update_options(options: Options)
   1.367 -  { session_actor !? Update_Options(options) }
   1.368 +  { manager.send_wait(Update_Options(options)) }
   1.369  
   1.370    def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   1.371 -  { session_actor ! Session.Dialog_Result(id, serial, result) }
   1.372 +  { manager.send(Session.Dialog_Result(id, serial, result)) }
   1.373  }