converted main session manager to Consumer_Thread: messages need to be consumed immediately, postponed_changes replaces implicit actor mailbox scanning;
authorwenzelm
Thu, 24 Apr 2014 22:10:00 +0200
changeset 56706 f2f53f7046f4
parent 56705 937826d702d5
child 56707 aa4631879df8
converted main session manager to Consumer_Thread: messages need to be consumed immediately, postponed_changes replaces implicit actor mailbox scanning;
src/Pure/PIDE/session.scala
--- a/src/Pure/PIDE/session.scala	Thu Apr 24 18:04:18 2014 +0200
+++ b/src/Pure/PIDE/session.scala	Thu Apr 24 22:10:00 2014 +0200
@@ -12,8 +12,6 @@
 
 import scala.collection.mutable
 import scala.collection.immutable.Queue
-import scala.actors.{Actor, TIMEOUT}
-import scala.actors.Actor._
 
 
 object Session
@@ -150,9 +148,12 @@
   val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
 
 
-  /** buffered command changes (delay_first discipline) **/
+
+  /** buffered changes: to be dispatched to clients **/
 
-  private val commands_changed_buffer: Consumer_Thread[(Boolean, List[Command])] =
+  private case class Received_Change(assignment: Boolean, commands: List[Command])
+
+  private val change_buffer: Consumer_Thread[Received_Change] =
   {
     object changed
     {
@@ -168,20 +169,20 @@
         commands = Set.empty
       }
 
-      def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
-        assignment |= assign
-        for (cmd <- cmds) {
-          nodes += cmd.node_name
-          commands += cmd
+      def invoke(change: Received_Change): Unit = synchronized {
+        assignment |= change.assignment
+        for (command <- change.commands) {
+          nodes += command.node_name
+          commands += command
         }
       }
     }
 
-    val timer = new Timer("commands_changed_buffer", true)
+    val timer = new Timer("change_buffer", true)
     timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
 
-    Consumer_Thread.fork[(Boolean, List[Command])]("commands_changed_buffer", daemon = true)(
-      consume = { case (assign, cmds) => changed.invoke(assign, cmds); true },
+    Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)(
+      consume = { case change => changed.invoke(change); true },
       finish = () => { timer.cancel(); changed.flush() }
     )
   }
@@ -205,13 +206,13 @@
           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
         }
       version_result.fulfill(change.version)
-      session_actor ! change
+      manager.send(change)
       true
   }
 
 
 
-  /** main protocol actor **/
+  /** main protocol manager **/
 
   /* global state */
 
@@ -261,58 +262,75 @@
   }
 
 
-  /* actor messages */
+  /* internal messages */
 
-  private case object Stop
   private case class Start(name: String, args: List[String])
   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   private case class Protocol_Command(name: String, args: List[String])
   private case class Messages(msgs: List[Prover.Message])
   private case class Update_Options(options: Options)
 
-  private val session_actor: Actor = Simple_Thread.actor("session_actor", daemon = true)
+
+  /* buffered prover messages */
+
+  private object receiver
   {
-    val this_actor = self
+    private var buffer = new mutable.ListBuffer[Prover.Message]
+
+    private def flush(): Unit = synchronized {
+      if (!buffer.isEmpty) {
+        val msgs = buffer.toList
+        manager.send(Messages(msgs))
+        buffer = new mutable.ListBuffer[Prover.Message]
+      }
+    }
 
-    var prune_next = Time.now() + prune_delay
+    def invoke(msg: Prover.Message): Unit = synchronized {
+      msg match {
+        case _: Prover.Input =>
+          buffer += msg
+        case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
+          flush()
+        case output: Prover.Output =>
+          buffer += msg
+          if (output.is_syslog)
+            syslog.change(queue =>
+              {
+                val queue1 = queue.enqueue(output.message)
+                if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
+              })
+      }
+    }
+
+    private val timer = new Timer("receiver", true)
+    timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
+
+    def cancel() { timer.cancel() }
+  }
 
 
-    /* buffered prover messages */
-
-    object receiver
-    {
-      private var buffer = new mutable.ListBuffer[Prover.Message]
+  /* postponed changes */
 
-      private def flush(): Unit = synchronized {
-        if (!buffer.isEmpty) {
-          val msgs = buffer.toList
-          this_actor ! Messages(msgs)
-          buffer = new mutable.ListBuffer[Prover.Message]
-        }
-      }
+  private object postponed_changes
+  {
+    private var postponed: List[Session.Change] = Nil
+
+    def store(change: Session.Change): Unit = synchronized { postponed ::= change }
 
-      def invoke(msg: Prover.Message): Unit = synchronized {
-        msg match {
-          case _: Prover.Input =>
-            buffer += msg
-          case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
-            flush()
-          case output: Prover.Output =>
-            buffer += msg
-            if (output.is_syslog)
-              syslog.change(queue =>
-                {
-                  val queue1 = queue.enqueue(output.message)
-                  if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
-                })
-        }
-      }
+    def flush(): Unit = synchronized {
+      val state = global_state.value
+      val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
+      postponed = unassigned
+      assigned.reverseIterator.foreach(change => manager.send(change))
+    }
+  }
 
-      private val timer = new Timer("session_actor.receiver", true)
-      timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
+
+  /* manager thread */
 
-      def cancel() { timer.cancel() }
-    }
+  private val manager: Consumer_Thread[Any] =
+  {
+    var prune_next = Time.now() + prune_delay
 
     var prover: Option[Prover] = None
 
@@ -387,7 +405,7 @@
       {
         try {
           val st = global_state.change_result(_.accumulate(state_id, message))
-          commands_changed_buffer.send((false, List(st.command)))
+          change_buffer.send(Received_Change(false, List(st.command)))
         }
         catch {
           case _: Document.State.Fail => bad_output()
@@ -411,9 +429,10 @@
                   case Protocol.Assign_Update(id, update) =>
                     try {
                       val cmds = global_state.change_result(_.assign(id, update))
-                      commands_changed_buffer.send((true, cmds))
+                      change_buffer.send(Received_Change(true, cmds))
                     }
                     catch { case _: Document.State.Fail => bad_output() }
+                    postponed_changes.flush()
                   case _ => bad_output()
                 }
                 // FIXME separate timeout event/message!?
@@ -461,19 +480,63 @@
     //}}}
 
 
-    /* main loop */
+    /* main thread */
+
+    Consumer_Thread.fork[Any]("manager", daemon = true)(
+      consume = (arg: Any) =>
+        {
+          //{{{
+          arg match {
+            case Start(name, args) if prover.isEmpty =>
+              if (phase == Session.Inactive || phase == Session.Failed) {
+                phase = Session.Startup
+                prover = Some(resources.start_prover(receiver.invoke _, name, args))
+              }
+
+            case Update_Options(options) =>
+              if (prover.isDefined && is_ready) {
+                prover.get.options(options)
+                handle_raw_edits(Document.Blobs.empty, Nil)
+              }
+              global_options.event(Session.Global_Options(options))
+
+            case Cancel_Exec(exec_id) if prover.isDefined =>
+              prover.get.cancel_exec(exec_id)
+
+            case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
+              handle_raw_edits(doc_blobs, edits)
 
-    //{{{
-    var finished = false
-    while (!finished) {
-      receive {
-        case Start(name, args) if prover.isEmpty =>
-          if (phase == Session.Inactive || phase == Session.Failed) {
-            phase = Session.Startup
-            prover = Some(resources.start_prover(receiver.invoke _, name, args))
+            case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
+              prover.get.dialog_result(serial, result)
+              handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
+
+            case Protocol_Command(name, args) if prover.isDefined =>
+              prover.get.protocol_command(name, args:_*)
+
+            case Messages(msgs) =>
+              msgs foreach {
+                case input: Prover.Input =>
+                  all_messages.event(input)
+
+                case output: Prover.Output =>
+                  if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
+                  else handle_output(output)
+                  if (output.is_syslog) syslog_messages.event(output)
+                  all_messages.event(output)
+              }
+
+            case change: Session.Change if prover.isDefined =>
+              if (global_state.value.is_assigned(change.previous))
+                handle_change(change)
+              else postponed_changes.store(change)
+
+            case bad => System.err.println("Session.manager: ignoring bad message " + bad)
           }
-
-        case Stop =>
+          true
+          //}}}
+        },
+      finish = () =>
+        {
           if (phase == Session.Ready) {
             _protocol_handlers = _protocol_handlers.stop(prover.get)
             global_state.change(_ => Document.State.init)  // FIXME event bus!?
@@ -482,81 +545,36 @@
             prover = None
             phase = Session.Inactive
           }
-          finished = true
           receiver.cancel()
-          reply(())
-
-        case Update_Options(options) =>
-          if (prover.isDefined && is_ready) {
-            prover.get.options(options)
-            handle_raw_edits(Document.Blobs.empty, Nil)
-          }
-          global_options.event(Session.Global_Options(options))
-          reply(())
-
-        case Cancel_Exec(exec_id) if prover.isDefined =>
-          prover.get.cancel_exec(exec_id)
-
-        case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
-          handle_raw_edits(doc_blobs, edits)
-          reply(())
-
-        case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
-          prover.get.dialog_result(serial, result)
-          handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
-
-        case Protocol_Command(name, args) if prover.isDefined =>
-          prover.get.protocol_command(name, args:_*)
-
-        case Messages(msgs) =>
-          msgs foreach {
-            case input: Prover.Input =>
-              all_messages.event(input)
-
-            case output: Prover.Output =>
-              if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
-              else handle_output(output)
-              if (output.is_syslog) syslog_messages.event(output)
-              all_messages.event(output)
-          }
-
-        case change: Session.Change
-        if prover.isDefined && global_state.value.is_assigned(change.previous) =>
-          handle_change(change)
-
-        case bad if !bad.isInstanceOf[Session.Change] =>
-          System.err.println("session_actor: ignoring bad message " + bad)
-      }
-    }
-    //}}}
+        }
+    )
   }
 
 
   /* actions */
 
   def start(name: String, args: List[String])
-  {
-    session_actor ! Start(name, args)
-  }
+  { manager.send(Start(name, args)) }
 
   def stop()
   {
-    commands_changed_buffer.shutdown()
     change_parser.shutdown()
-    session_actor !? Stop
+    change_buffer.shutdown()
+    manager.shutdown()
   }
 
   def protocol_command(name: String, args: String*)
-  { session_actor ! Protocol_Command(name, args.toList) }
+  { manager.send(Protocol_Command(name, args.toList)) }
 
-  def cancel_exec(exec_id: Document_ID.Exec) { session_actor ! Cancel_Exec(exec_id) }
+  def cancel_exec(exec_id: Document_ID.Exec)
+  { manager.send(Cancel_Exec(exec_id)) }
 
   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
-  { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(doc_blobs, edits) }
+  { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
 
   def update_options(options: Options)
-  { session_actor !? Update_Options(options) }
+  { manager.send_wait(Update_Options(options)) }
 
   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
-  { session_actor ! Session.Dialog_Result(id, serial, result) }
+  { manager.send(Session.Dialog_Result(id, serial, result)) }
 }