src/Pure/PIDE/session.scala
changeset 56715 52125652e82a
parent 56713 3438dfba58fe
child 56719 80eb2192516a
     1.1 --- a/src/Pure/PIDE/session.scala	Fri Apr 25 12:27:18 2014 +0200
     1.2 +++ b/src/Pure/PIDE/session.scala	Fri Apr 25 12:51:08 2014 +0200
     1.3 @@ -16,6 +16,36 @@
     1.4  
     1.5  object Session
     1.6  {
     1.7 +  /* outlets */
     1.8 +
     1.9 +  object Consumer
    1.10 +  {
    1.11 +    def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    1.12 +      new Consumer[A](name, consume)
    1.13 +  }
    1.14 +  final class Consumer[-A] private(val name: String, val consume: A => Unit)
    1.15 +
    1.16 +  class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    1.17 +  {
    1.18 +    private val consumers = Synchronized(List.empty[Consumer[A]])
    1.19 +
    1.20 +    def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    1.21 +    def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    1.22 +
    1.23 +    def post(a: A)
    1.24 +    {
    1.25 +      for (c <- consumers.value.iterator) {
    1.26 +        dispatcher.send(() =>
    1.27 +          try { c.consume(a) }
    1.28 +          catch {
    1.29 +            case exn: Throwable =>
    1.30 +              System.err.println("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    1.31 +          })
    1.32 +      }
    1.33 +    }
    1.34 +  }
    1.35 +
    1.36 +
    1.37    /* change */
    1.38  
    1.39    sealed case class Change(
    1.40 @@ -134,18 +164,21 @@
    1.41    def reparse_limit: Int = 0
    1.42  
    1.43  
    1.44 -  /* pervasive event buses */
    1.45 +  /* outlets */
    1.46 +
    1.47 +  private val dispatcher =
    1.48 +    Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
    1.49  
    1.50 -  val statistics = new Event_Bus[Session.Statistics]
    1.51 -  val global_options = new Event_Bus[Session.Global_Options]
    1.52 -  val caret_focus = new Event_Bus[Session.Caret_Focus.type]
    1.53 -  val raw_edits = new Event_Bus[Session.Raw_Edits]
    1.54 -  val commands_changed = new Event_Bus[Session.Commands_Changed]
    1.55 -  val phase_changed = new Event_Bus[Session.Phase]
    1.56 -  val syslog_messages = new Event_Bus[Prover.Output]
    1.57 -  val raw_output_messages = new Event_Bus[Prover.Output]
    1.58 -  val all_messages = new Event_Bus[Prover.Message]  // potential bottle-neck
    1.59 -  val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
    1.60 +  val statistics = new Session.Outlet[Session.Statistics](dispatcher)
    1.61 +  val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
    1.62 +  val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
    1.63 +  val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
    1.64 +  val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
    1.65 +  val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
    1.66 +  val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
    1.67 +  val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
    1.68 +  val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
    1.69 +  val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
    1.70  
    1.71  
    1.72  
    1.73 @@ -163,7 +196,7 @@
    1.74  
    1.75        def flush(): Unit = synchronized {
    1.76          if (assignment || !nodes.isEmpty || !commands.isEmpty)
    1.77 -          commands_changed.event(Session.Commands_Changed(assignment, nodes, commands))
    1.78 +          commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
    1.79          assignment = false
    1.80          nodes = Set.empty
    1.81          commands = Set.empty
    1.82 @@ -223,7 +256,7 @@
    1.83    private def phase_=(new_phase: Session.Phase)
    1.84    {
    1.85      _phase = new_phase
    1.86 -    phase_changed.event(new_phase)
    1.87 +    phase_changed.post(new_phase)
    1.88    }
    1.89    def phase = _phase
    1.90    def is_ready: Boolean = phase == Session.Ready
    1.91 @@ -349,7 +382,7 @@
    1.92        val version = Future.promise[Document.Version]
    1.93        global_state.change(_.continue_history(previous, edits, version))
    1.94  
    1.95 -      raw_edits.event(Session.Raw_Edits(doc_blobs, edits))
    1.96 +      raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
    1.97        change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
    1.98      }
    1.99      //}}}
   1.100 @@ -458,7 +491,7 @@
   1.101                  }
   1.102  
   1.103                case Markup.ML_Statistics(props) =>
   1.104 -                statistics.event(Session.Statistics(props))
   1.105 +                statistics.post(Session.Statistics(props))
   1.106  
   1.107                case Markup.Task_Statistics(props) =>
   1.108                  // FIXME
   1.109 @@ -479,7 +512,7 @@
   1.110                if (rc == 0) phase = Session.Inactive
   1.111                else phase = Session.Failed
   1.112  
   1.113 -            case _ => raw_output_messages.event(output)
   1.114 +            case _ => raw_output_messages.post(output)
   1.115            }
   1.116          }
   1.117      }
   1.118 @@ -512,7 +545,7 @@
   1.119                prover.get.options(options)
   1.120                handle_raw_edits(Document.Blobs.empty, Nil)
   1.121              }
   1.122 -            global_options.event(Session.Global_Options(options))
   1.123 +            global_options.post(Session.Global_Options(options))
   1.124  
   1.125            case Cancel_Exec(exec_id) if prover.isDefined =>
   1.126              prover.get.cancel_exec(exec_id)
   1.127 @@ -530,13 +563,13 @@
   1.128            case Messages(msgs) =>
   1.129              msgs foreach {
   1.130                case input: Prover.Input =>
   1.131 -                all_messages.event(input)
   1.132 +                all_messages.post(input)
   1.133  
   1.134                case output: Prover.Output =>
   1.135 -                if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   1.136 +                if (output.is_stdout || output.is_stderr) raw_output_messages.post(output)
   1.137                  else handle_output(output)
   1.138 -                if (output.is_syslog) syslog_messages.event(output)
   1.139 -                all_messages.event(output)
   1.140 +                if (output.is_syslog) syslog_messages.post(output)
   1.141 +                all_messages.post(output)
   1.142              }
   1.143  
   1.144            case change: Session.Change if prover.isDefined =>
   1.145 @@ -562,6 +595,7 @@
   1.146      change_parser.shutdown()
   1.147      change_buffer.shutdown()
   1.148      manager.shutdown()
   1.149 +    dispatcher.shutdown()
   1.150    }
   1.151  
   1.152    def protocol_command(name: String, args: String*)