--- a/src/Pure/PIDE/session.scala Thu Apr 24 18:04:18 2014 +0200
+++ b/src/Pure/PIDE/session.scala Thu Apr 24 22:10:00 2014 +0200
@@ -12,8 +12,6 @@
import scala.collection.mutable
import scala.collection.immutable.Queue
-import scala.actors.{Actor, TIMEOUT}
-import scala.actors.Actor._
object Session
@@ -150,9 +148,12 @@
val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
- /** buffered command changes (delay_first discipline) **/
+
+ /** buffered changes: to be dispatched to clients **/
- private val commands_changed_buffer: Consumer_Thread[(Boolean, List[Command])] =
+ private case class Received_Change(assignment: Boolean, commands: List[Command])
+
+ private val change_buffer: Consumer_Thread[Received_Change] =
{
object changed
{
@@ -168,20 +169,20 @@
commands = Set.empty
}
- def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
- assignment |= assign
- for (cmd <- cmds) {
- nodes += cmd.node_name
- commands += cmd
+ def invoke(change: Received_Change): Unit = synchronized {
+ assignment |= change.assignment
+ for (command <- change.commands) {
+ nodes += command.node_name
+ commands += command
}
}
}
- val timer = new Timer("commands_changed_buffer", true)
+ val timer = new Timer("change_buffer", true)
timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
- Consumer_Thread.fork[(Boolean, List[Command])]("commands_changed_buffer", daemon = true)(
- consume = { case (assign, cmds) => changed.invoke(assign, cmds); true },
+ Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)(
+ consume = { case change => changed.invoke(change); true },
finish = () => { timer.cancel(); changed.flush() }
)
}
@@ -205,13 +206,13 @@
resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
}
version_result.fulfill(change.version)
- session_actor ! change
+ manager.send(change)
true
}
- /** main protocol actor **/
+ /** main protocol manager **/
/* global state */
@@ -261,58 +262,75 @@
}
- /* actor messages */
+ /* internal messages */
- private case object Stop
private case class Start(name: String, args: List[String])
private case class Cancel_Exec(exec_id: Document_ID.Exec)
private case class Protocol_Command(name: String, args: List[String])
private case class Messages(msgs: List[Prover.Message])
private case class Update_Options(options: Options)
- private val session_actor: Actor = Simple_Thread.actor("session_actor", daemon = true)
+
+ /* buffered prover messages */
+
+ private object receiver
{
- val this_actor = self
+ private var buffer = new mutable.ListBuffer[Prover.Message]
+
+ private def flush(): Unit = synchronized {
+ if (!buffer.isEmpty) {
+ val msgs = buffer.toList
+ manager.send(Messages(msgs))
+ buffer = new mutable.ListBuffer[Prover.Message]
+ }
+ }
- var prune_next = Time.now() + prune_delay
+ def invoke(msg: Prover.Message): Unit = synchronized {
+ msg match {
+ case _: Prover.Input =>
+ buffer += msg
+ case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
+ flush()
+ case output: Prover.Output =>
+ buffer += msg
+ if (output.is_syslog)
+ syslog.change(queue =>
+ {
+ val queue1 = queue.enqueue(output.message)
+ if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
+ })
+ }
+ }
+
+ private val timer = new Timer("receiver", true)
+ timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
+
+ def cancel() { timer.cancel() }
+ }
- /* buffered prover messages */
-
- object receiver
- {
- private var buffer = new mutable.ListBuffer[Prover.Message]
+ /* postponed changes */
- private def flush(): Unit = synchronized {
- if (!buffer.isEmpty) {
- val msgs = buffer.toList
- this_actor ! Messages(msgs)
- buffer = new mutable.ListBuffer[Prover.Message]
- }
- }
+ private object postponed_changes
+ {
+ private var postponed: List[Session.Change] = Nil
+
+ def store(change: Session.Change): Unit = synchronized { postponed ::= change }
- def invoke(msg: Prover.Message): Unit = synchronized {
- msg match {
- case _: Prover.Input =>
- buffer += msg
- case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
- flush()
- case output: Prover.Output =>
- buffer += msg
- if (output.is_syslog)
- syslog.change(queue =>
- {
- val queue1 = queue.enqueue(output.message)
- if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
- })
- }
- }
+ def flush(): Unit = synchronized {
+ val state = global_state.value
+ val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
+ postponed = unassigned
+ assigned.reverseIterator.foreach(change => manager.send(change))
+ }
+ }
- private val timer = new Timer("session_actor.receiver", true)
- timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
+
+ /* manager thread */
- def cancel() { timer.cancel() }
- }
+ private val manager: Consumer_Thread[Any] =
+ {
+ var prune_next = Time.now() + prune_delay
var prover: Option[Prover] = None
@@ -387,7 +405,7 @@
{
try {
val st = global_state.change_result(_.accumulate(state_id, message))
- commands_changed_buffer.send((false, List(st.command)))
+ change_buffer.send(Received_Change(false, List(st.command)))
}
catch {
case _: Document.State.Fail => bad_output()
@@ -411,9 +429,10 @@
case Protocol.Assign_Update(id, update) =>
try {
val cmds = global_state.change_result(_.assign(id, update))
- commands_changed_buffer.send((true, cmds))
+ change_buffer.send(Received_Change(true, cmds))
}
catch { case _: Document.State.Fail => bad_output() }
+ postponed_changes.flush()
case _ => bad_output()
}
// FIXME separate timeout event/message!?
@@ -461,19 +480,63 @@
//}}}
- /* main loop */
+ /* main thread */
+
+ Consumer_Thread.fork[Any]("manager", daemon = true)(
+ consume = (arg: Any) =>
+ {
+ //{{{
+ arg match {
+ case Start(name, args) if prover.isEmpty =>
+ if (phase == Session.Inactive || phase == Session.Failed) {
+ phase = Session.Startup
+ prover = Some(resources.start_prover(receiver.invoke _, name, args))
+ }
+
+ case Update_Options(options) =>
+ if (prover.isDefined && is_ready) {
+ prover.get.options(options)
+ handle_raw_edits(Document.Blobs.empty, Nil)
+ }
+ global_options.event(Session.Global_Options(options))
+
+ case Cancel_Exec(exec_id) if prover.isDefined =>
+ prover.get.cancel_exec(exec_id)
+
+ case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
+ handle_raw_edits(doc_blobs, edits)
- //{{{
- var finished = false
- while (!finished) {
- receive {
- case Start(name, args) if prover.isEmpty =>
- if (phase == Session.Inactive || phase == Session.Failed) {
- phase = Session.Startup
- prover = Some(resources.start_prover(receiver.invoke _, name, args))
+ case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
+ prover.get.dialog_result(serial, result)
+ handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
+
+ case Protocol_Command(name, args) if prover.isDefined =>
+ prover.get.protocol_command(name, args:_*)
+
+ case Messages(msgs) =>
+ msgs foreach {
+ case input: Prover.Input =>
+ all_messages.event(input)
+
+ case output: Prover.Output =>
+ if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
+ else handle_output(output)
+ if (output.is_syslog) syslog_messages.event(output)
+ all_messages.event(output)
+ }
+
+ case change: Session.Change if prover.isDefined =>
+ if (global_state.value.is_assigned(change.previous))
+ handle_change(change)
+ else postponed_changes.store(change)
+
+ case bad => System.err.println("Session.manager: ignoring bad message " + bad)
}
-
- case Stop =>
+ true
+ //}}}
+ },
+ finish = () =>
+ {
if (phase == Session.Ready) {
_protocol_handlers = _protocol_handlers.stop(prover.get)
global_state.change(_ => Document.State.init) // FIXME event bus!?
@@ -482,81 +545,36 @@
prover = None
phase = Session.Inactive
}
- finished = true
receiver.cancel()
- reply(())
-
- case Update_Options(options) =>
- if (prover.isDefined && is_ready) {
- prover.get.options(options)
- handle_raw_edits(Document.Blobs.empty, Nil)
- }
- global_options.event(Session.Global_Options(options))
- reply(())
-
- case Cancel_Exec(exec_id) if prover.isDefined =>
- prover.get.cancel_exec(exec_id)
-
- case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
- handle_raw_edits(doc_blobs, edits)
- reply(())
-
- case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
- prover.get.dialog_result(serial, result)
- handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
-
- case Protocol_Command(name, args) if prover.isDefined =>
- prover.get.protocol_command(name, args:_*)
-
- case Messages(msgs) =>
- msgs foreach {
- case input: Prover.Input =>
- all_messages.event(input)
-
- case output: Prover.Output =>
- if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
- else handle_output(output)
- if (output.is_syslog) syslog_messages.event(output)
- all_messages.event(output)
- }
-
- case change: Session.Change
- if prover.isDefined && global_state.value.is_assigned(change.previous) =>
- handle_change(change)
-
- case bad if !bad.isInstanceOf[Session.Change] =>
- System.err.println("session_actor: ignoring bad message " + bad)
- }
- }
- //}}}
+ }
+ )
}
/* actions */
def start(name: String, args: List[String])
- {
- session_actor ! Start(name, args)
- }
+ { manager.send(Start(name, args)) }
def stop()
{
- commands_changed_buffer.shutdown()
change_parser.shutdown()
- session_actor !? Stop
+ change_buffer.shutdown()
+ manager.shutdown()
}
def protocol_command(name: String, args: String*)
- { session_actor ! Protocol_Command(name, args.toList) }
+ { manager.send(Protocol_Command(name, args.toList)) }
- def cancel_exec(exec_id: Document_ID.Exec) { session_actor ! Cancel_Exec(exec_id) }
+ def cancel_exec(exec_id: Document_ID.Exec)
+ { manager.send(Cancel_Exec(exec_id)) }
def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
- { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(doc_blobs, edits) }
+ { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
def update_options(options: Options)
- { session_actor !? Update_Options(options) }
+ { manager.send_wait(Update_Options(options)) }
def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
- { session_actor ! Session.Dialog_Result(id, serial, result) }
+ { manager.send(Session.Dialog_Result(id, serial, result)) }
}