diff -r 061f83259922 -r 52125652e82a src/Pure/PIDE/session.scala --- a/src/Pure/PIDE/session.scala Fri Apr 25 12:27:18 2014 +0200 +++ b/src/Pure/PIDE/session.scala Fri Apr 25 12:51:08 2014 +0200 @@ -16,6 +16,36 @@ object Session { + /* outlets */ + + object Consumer + { + def apply[A](name: String)(consume: A => Unit): Consumer[A] = + new Consumer[A](name, consume) + } + final class Consumer[-A] private(val name: String, val consume: A => Unit) + + class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) + { + private val consumers = Synchronized(List.empty[Consumer[A]]) + + def += (c: Consumer[A]) { consumers.change(Library.update(c)) } + def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) } + + def post(a: A) + { + for (c <- consumers.value.iterator) { + dispatcher.send(() => + try { c.consume(a) } + catch { + case exn: Throwable => + System.err.println("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn)) + }) + } + } + } + + /* change */ sealed case class Change( @@ -134,18 +164,21 @@ def reparse_limit: Int = 0 - /* pervasive event buses */ + /* outlets */ + + private val dispatcher = + Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true } - val statistics = new Event_Bus[Session.Statistics] - val global_options = new Event_Bus[Session.Global_Options] - val caret_focus = new Event_Bus[Session.Caret_Focus.type] - val raw_edits = new Event_Bus[Session.Raw_Edits] - val commands_changed = new Event_Bus[Session.Commands_Changed] - val phase_changed = new Event_Bus[Session.Phase] - val syslog_messages = new Event_Bus[Prover.Output] - val raw_output_messages = new Event_Bus[Prover.Output] - val all_messages = new Event_Bus[Prover.Message] // potential bottle-neck - val trace_events = new Event_Bus[Simplifier_Trace.Event.type] + val statistics = new Session.Outlet[Session.Statistics](dispatcher) + val global_options = new Session.Outlet[Session.Global_Options](dispatcher) + val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher) + val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher) + val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher) + val phase_changed = new Session.Outlet[Session.Phase](dispatcher) + val syslog_messages = new Session.Outlet[Prover.Output](dispatcher) + val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher) + val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck + val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) @@ -163,7 +196,7 @@ def flush(): Unit = synchronized { if (assignment || !nodes.isEmpty || !commands.isEmpty) - commands_changed.event(Session.Commands_Changed(assignment, nodes, commands)) + commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) assignment = false nodes = Set.empty commands = Set.empty @@ -223,7 +256,7 @@ private def phase_=(new_phase: Session.Phase) { _phase = new_phase - phase_changed.event(new_phase) + phase_changed.post(new_phase) } def phase = _phase def is_ready: Boolean = phase == Session.Ready @@ -349,7 +382,7 @@ val version = Future.promise[Document.Version] global_state.change(_.continue_history(previous, edits, version)) - raw_edits.event(Session.Raw_Edits(doc_blobs, edits)) + raw_edits.post(Session.Raw_Edits(doc_blobs, edits)) change_parser.send(Text_Edits(previous, doc_blobs, edits, version)) } //}}} @@ -458,7 +491,7 @@ } case Markup.ML_Statistics(props) => - statistics.event(Session.Statistics(props)) + statistics.post(Session.Statistics(props)) case Markup.Task_Statistics(props) => // FIXME @@ -479,7 +512,7 @@ if (rc == 0) phase = Session.Inactive else phase = Session.Failed - case _ => raw_output_messages.event(output) + case _ => raw_output_messages.post(output) } } } @@ -512,7 +545,7 @@ prover.get.options(options) handle_raw_edits(Document.Blobs.empty, Nil) } - global_options.event(Session.Global_Options(options)) + global_options.post(Session.Global_Options(options)) case Cancel_Exec(exec_id) if prover.isDefined => prover.get.cancel_exec(exec_id) @@ -530,13 +563,13 @@ case Messages(msgs) => msgs foreach { case input: Prover.Input => - all_messages.event(input) + all_messages.post(input) case output: Prover.Output => - if (output.is_stdout || output.is_stderr) raw_output_messages.event(output) + if (output.is_stdout || output.is_stderr) raw_output_messages.post(output) else handle_output(output) - if (output.is_syslog) syslog_messages.event(output) - all_messages.event(output) + if (output.is_syslog) syslog_messages.post(output) + all_messages.post(output) } case change: Session.Change if prover.isDefined => @@ -562,6 +595,7 @@ change_parser.shutdown() change_buffer.shutdown() manager.shutdown() + dispatcher.shutdown() } def protocol_command(name: String, args: String*)