# HG changeset patch # User wenzelm # Date 1398370200 -7200 # Node ID f2f53f7046f4df62d0d2654ad66735d655352843 # Parent 937826d702d5d788871d7e3d661b23d8f1a88587 converted main session manager to Consumer_Thread: messages need to be consumed immediately, postponed_changes replaces implicit actor mailbox scanning; diff -r 937826d702d5 -r f2f53f7046f4 src/Pure/PIDE/session.scala --- a/src/Pure/PIDE/session.scala Thu Apr 24 18:04:18 2014 +0200 +++ b/src/Pure/PIDE/session.scala Thu Apr 24 22:10:00 2014 +0200 @@ -12,8 +12,6 @@ import scala.collection.mutable import scala.collection.immutable.Queue -import scala.actors.{Actor, TIMEOUT} -import scala.actors.Actor._ object Session @@ -150,9 +148,12 @@ val trace_events = new Event_Bus[Simplifier_Trace.Event.type] - /** buffered command changes (delay_first discipline) **/ + + /** buffered changes: to be dispatched to clients **/ - private val commands_changed_buffer: Consumer_Thread[(Boolean, List[Command])] = + private case class Received_Change(assignment: Boolean, commands: List[Command]) + + private val change_buffer: Consumer_Thread[Received_Change] = { object changed { @@ -168,20 +169,20 @@ commands = Set.empty } - def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized { - assignment |= assign - for (cmd <- cmds) { - nodes += cmd.node_name - commands += cmd + def invoke(change: Received_Change): Unit = synchronized { + assignment |= change.assignment + for (command <- change.commands) { + nodes += command.node_name + commands += command } } } - val timer = new Timer("commands_changed_buffer", true) + val timer = new Timer("change_buffer", true) timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms) - Consumer_Thread.fork[(Boolean, List[Command])]("commands_changed_buffer", daemon = true)( - consume = { case (assign, cmds) => changed.invoke(assign, cmds); true }, + Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)( + consume = { case change => changed.invoke(change); true }, finish = () => { timer.cancel(); changed.flush() } ) } @@ -205,13 +206,13 @@ resources.parse_change(reparse_limit, prev, doc_blobs, text_edits) } version_result.fulfill(change.version) - session_actor ! change + manager.send(change) true } - /** main protocol actor **/ + /** main protocol manager **/ /* global state */ @@ -261,58 +262,75 @@ } - /* actor messages */ + /* internal messages */ - private case object Stop private case class Start(name: String, args: List[String]) private case class Cancel_Exec(exec_id: Document_ID.Exec) private case class Protocol_Command(name: String, args: List[String]) private case class Messages(msgs: List[Prover.Message]) private case class Update_Options(options: Options) - private val session_actor: Actor = Simple_Thread.actor("session_actor", daemon = true) + + /* buffered prover messages */ + + private object receiver { - val this_actor = self + private var buffer = new mutable.ListBuffer[Prover.Message] + + private def flush(): Unit = synchronized { + if (!buffer.isEmpty) { + val msgs = buffer.toList + manager.send(Messages(msgs)) + buffer = new mutable.ListBuffer[Prover.Message] + } + } - var prune_next = Time.now() + prune_delay + def invoke(msg: Prover.Message): Unit = synchronized { + msg match { + case _: Prover.Input => + buffer += msg + case output: Prover.Protocol_Output if output.properties == Markup.Flush => + flush() + case output: Prover.Output => + buffer += msg + if (output.is_syslog) + syslog.change(queue => + { + val queue1 = queue.enqueue(output.message) + if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1 + }) + } + } + + private val timer = new Timer("receiver", true) + timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms) + + def cancel() { timer.cancel() } + } - /* buffered prover messages */ - - object receiver - { - private var buffer = new mutable.ListBuffer[Prover.Message] + /* postponed changes */ - private def flush(): Unit = synchronized { - if (!buffer.isEmpty) { - val msgs = buffer.toList - this_actor ! Messages(msgs) - buffer = new mutable.ListBuffer[Prover.Message] - } - } + private object postponed_changes + { + private var postponed: List[Session.Change] = Nil + + def store(change: Session.Change): Unit = synchronized { postponed ::= change } - def invoke(msg: Prover.Message): Unit = synchronized { - msg match { - case _: Prover.Input => - buffer += msg - case output: Prover.Protocol_Output if output.properties == Markup.Flush => - flush() - case output: Prover.Output => - buffer += msg - if (output.is_syslog) - syslog.change(queue => - { - val queue1 = queue.enqueue(output.message) - if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1 - }) - } - } + def flush(): Unit = synchronized { + val state = global_state.value + val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous)) + postponed = unassigned + assigned.reverseIterator.foreach(change => manager.send(change)) + } + } - private val timer = new Timer("session_actor.receiver", true) - timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms) + + /* manager thread */ - def cancel() { timer.cancel() } - } + private val manager: Consumer_Thread[Any] = + { + var prune_next = Time.now() + prune_delay var prover: Option[Prover] = None @@ -387,7 +405,7 @@ { try { val st = global_state.change_result(_.accumulate(state_id, message)) - commands_changed_buffer.send((false, List(st.command))) + change_buffer.send(Received_Change(false, List(st.command))) } catch { case _: Document.State.Fail => bad_output() @@ -411,9 +429,10 @@ case Protocol.Assign_Update(id, update) => try { val cmds = global_state.change_result(_.assign(id, update)) - commands_changed_buffer.send((true, cmds)) + change_buffer.send(Received_Change(true, cmds)) } catch { case _: Document.State.Fail => bad_output() } + postponed_changes.flush() case _ => bad_output() } // FIXME separate timeout event/message!? @@ -461,19 +480,63 @@ //}}} - /* main loop */ + /* main thread */ + + Consumer_Thread.fork[Any]("manager", daemon = true)( + consume = (arg: Any) => + { + //{{{ + arg match { + case Start(name, args) if prover.isEmpty => + if (phase == Session.Inactive || phase == Session.Failed) { + phase = Session.Startup + prover = Some(resources.start_prover(receiver.invoke _, name, args)) + } + + case Update_Options(options) => + if (prover.isDefined && is_ready) { + prover.get.options(options) + handle_raw_edits(Document.Blobs.empty, Nil) + } + global_options.event(Session.Global_Options(options)) + + case Cancel_Exec(exec_id) if prover.isDefined => + prover.get.cancel_exec(exec_id) + + case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined => + handle_raw_edits(doc_blobs, edits) - //{{{ - var finished = false - while (!finished) { - receive { - case Start(name, args) if prover.isEmpty => - if (phase == Session.Inactive || phase == Session.Failed) { - phase = Session.Startup - prover = Some(resources.start_prover(receiver.invoke _, name, args)) + case Session.Dialog_Result(id, serial, result) if prover.isDefined => + prover.get.dialog_result(serial, result) + handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result))) + + case Protocol_Command(name, args) if prover.isDefined => + prover.get.protocol_command(name, args:_*) + + case Messages(msgs) => + msgs foreach { + case input: Prover.Input => + all_messages.event(input) + + case output: Prover.Output => + if (output.is_stdout || output.is_stderr) raw_output_messages.event(output) + else handle_output(output) + if (output.is_syslog) syslog_messages.event(output) + all_messages.event(output) + } + + case change: Session.Change if prover.isDefined => + if (global_state.value.is_assigned(change.previous)) + handle_change(change) + else postponed_changes.store(change) + + case bad => System.err.println("Session.manager: ignoring bad message " + bad) } - - case Stop => + true + //}}} + }, + finish = () => + { if (phase == Session.Ready) { _protocol_handlers = _protocol_handlers.stop(prover.get) global_state.change(_ => Document.State.init) // FIXME event bus!? @@ -482,81 +545,36 @@ prover = None phase = Session.Inactive } - finished = true receiver.cancel() - reply(()) - - case Update_Options(options) => - if (prover.isDefined && is_ready) { - prover.get.options(options) - handle_raw_edits(Document.Blobs.empty, Nil) - } - global_options.event(Session.Global_Options(options)) - reply(()) - - case Cancel_Exec(exec_id) if prover.isDefined => - prover.get.cancel_exec(exec_id) - - case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined => - handle_raw_edits(doc_blobs, edits) - reply(()) - - case Session.Dialog_Result(id, serial, result) if prover.isDefined => - prover.get.dialog_result(serial, result) - handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result))) - - case Protocol_Command(name, args) if prover.isDefined => - prover.get.protocol_command(name, args:_*) - - case Messages(msgs) => - msgs foreach { - case input: Prover.Input => - all_messages.event(input) - - case output: Prover.Output => - if (output.is_stdout || output.is_stderr) raw_output_messages.event(output) - else handle_output(output) - if (output.is_syslog) syslog_messages.event(output) - all_messages.event(output) - } - - case change: Session.Change - if prover.isDefined && global_state.value.is_assigned(change.previous) => - handle_change(change) - - case bad if !bad.isInstanceOf[Session.Change] => - System.err.println("session_actor: ignoring bad message " + bad) - } - } - //}}} + } + ) } /* actions */ def start(name: String, args: List[String]) - { - session_actor ! Start(name, args) - } + { manager.send(Start(name, args)) } def stop() { - commands_changed_buffer.shutdown() change_parser.shutdown() - session_actor !? Stop + change_buffer.shutdown() + manager.shutdown() } def protocol_command(name: String, args: String*) - { session_actor ! Protocol_Command(name, args.toList) } + { manager.send(Protocol_Command(name, args.toList)) } - def cancel_exec(exec_id: Document_ID.Exec) { session_actor ! Cancel_Exec(exec_id) } + def cancel_exec(exec_id: Document_ID.Exec) + { manager.send(Cancel_Exec(exec_id)) } def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]) - { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(doc_blobs, edits) } + { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) } def update_options(options: Options) - { session_actor !? Update_Options(options) } + { manager.send_wait(Update_Options(options)) } def dialog_result(id: Document_ID.Generic, serial: Long, result: String) - { session_actor ! Session.Dialog_Result(id, serial, result) } + { manager.send(Session.Dialog_Result(id, serial, result)) } }