src/Pure/PIDE/session.scala
changeset 56715 52125652e82a
parent 56713 3438dfba58fe
child 56719 80eb2192516a
equal deleted inserted replaced
56714:061f83259922 56715:52125652e82a
    14 import scala.collection.immutable.Queue
    14 import scala.collection.immutable.Queue
    15 
    15 
    16 
    16 
    17 object Session
    17 object Session
    18 {
    18 {
       
    19   /* outlets */
       
    20 
       
    21   object Consumer
       
    22   {
       
    23     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
       
    24       new Consumer[A](name, consume)
       
    25   }
       
    26   final class Consumer[-A] private(val name: String, val consume: A => Unit)
       
    27 
       
    28   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
       
    29   {
       
    30     private val consumers = Synchronized(List.empty[Consumer[A]])
       
    31 
       
    32     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
       
    33     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
       
    34 
       
    35     def post(a: A)
       
    36     {
       
    37       for (c <- consumers.value.iterator) {
       
    38         dispatcher.send(() =>
       
    39           try { c.consume(a) }
       
    40           catch {
       
    41             case exn: Throwable =>
       
    42               System.err.println("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
       
    43           })
       
    44       }
       
    45     }
       
    46   }
       
    47 
       
    48 
    19   /* change */
    49   /* change */
    20 
    50 
    21   sealed case class Change(
    51   sealed case class Change(
    22     previous: Document.Version,
    52     previous: Document.Version,
    23     doc_blobs: Document.Blobs,
    53     doc_blobs: Document.Blobs,
   132   def prune_size: Int = 0  // size of retained history
   162   def prune_size: Int = 0  // size of retained history
   133   def syslog_limit: Int = 100
   163   def syslog_limit: Int = 100
   134   def reparse_limit: Int = 0
   164   def reparse_limit: Int = 0
   135 
   165 
   136 
   166 
   137   /* pervasive event buses */
   167   /* outlets */
   138 
   168 
   139   val statistics = new Event_Bus[Session.Statistics]
   169   private val dispatcher =
   140   val global_options = new Event_Bus[Session.Global_Options]
   170     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   141   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
   171 
   142   val raw_edits = new Event_Bus[Session.Raw_Edits]
   172   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   143   val commands_changed = new Event_Bus[Session.Commands_Changed]
   173   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   144   val phase_changed = new Event_Bus[Session.Phase]
   174   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   145   val syslog_messages = new Event_Bus[Prover.Output]
   175   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   146   val raw_output_messages = new Event_Bus[Prover.Output]
   176   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   147   val all_messages = new Event_Bus[Prover.Message]  // potential bottle-neck
   177   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   148   val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
   178   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
       
   179   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
       
   180   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
       
   181   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   149 
   182 
   150 
   183 
   151 
   184 
   152   /** buffered changes: to be dispatched to clients **/
   185   /** buffered changes: to be dispatched to clients **/
   153 
   186 
   161       private var nodes: Set[Document.Node.Name] = Set.empty
   194       private var nodes: Set[Document.Node.Name] = Set.empty
   162       private var commands: Set[Command] = Set.empty
   195       private var commands: Set[Command] = Set.empty
   163 
   196 
   164       def flush(): Unit = synchronized {
   197       def flush(): Unit = synchronized {
   165         if (assignment || !nodes.isEmpty || !commands.isEmpty)
   198         if (assignment || !nodes.isEmpty || !commands.isEmpty)
   166           commands_changed.event(Session.Commands_Changed(assignment, nodes, commands))
   199           commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   167         assignment = false
   200         assignment = false
   168         nodes = Set.empty
   201         nodes = Set.empty
   169         commands = Set.empty
   202         commands = Set.empty
   170       }
   203       }
   171 
   204 
   221 
   254 
   222   @volatile private var _phase: Session.Phase = Session.Inactive
   255   @volatile private var _phase: Session.Phase = Session.Inactive
   223   private def phase_=(new_phase: Session.Phase)
   256   private def phase_=(new_phase: Session.Phase)
   224   {
   257   {
   225     _phase = new_phase
   258     _phase = new_phase
   226     phase_changed.event(new_phase)
   259     phase_changed.post(new_phase)
   227   }
   260   }
   228   def phase = _phase
   261   def phase = _phase
   229   def is_ready: Boolean = phase == Session.Ready
   262   def is_ready: Boolean = phase == Session.Ready
   230 
   263 
   231   private val global_state = Synchronized(Document.State.init)
   264   private val global_state = Synchronized(Document.State.init)
   347 
   380 
   348       val previous = global_state.value.history.tip.version
   381       val previous = global_state.value.history.tip.version
   349       val version = Future.promise[Document.Version]
   382       val version = Future.promise[Document.Version]
   350       global_state.change(_.continue_history(previous, edits, version))
   383       global_state.change(_.continue_history(previous, edits, version))
   351 
   384 
   352       raw_edits.event(Session.Raw_Edits(doc_blobs, edits))
   385       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   353       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   386       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   354     }
   387     }
   355     //}}}
   388     //}}}
   356 
   389 
   357 
   390 
   456                     catch { case _: Document.State.Fail => bad_output() }
   489                     catch { case _: Document.State.Fail => bad_output() }
   457                   case _ => bad_output()
   490                   case _ => bad_output()
   458                 }
   491                 }
   459 
   492 
   460               case Markup.ML_Statistics(props) =>
   493               case Markup.ML_Statistics(props) =>
   461                 statistics.event(Session.Statistics(props))
   494                 statistics.post(Session.Statistics(props))
   462 
   495 
   463               case Markup.Task_Statistics(props) =>
   496               case Markup.Task_Statistics(props) =>
   464                 // FIXME
   497                 // FIXME
   465 
   498 
   466               case _ => bad_output()
   499               case _ => bad_output()
   477             case Markup.Return_Code(rc) if output.is_exit =>
   510             case Markup.Return_Code(rc) if output.is_exit =>
   478               prover = None
   511               prover = None
   479               if (rc == 0) phase = Session.Inactive
   512               if (rc == 0) phase = Session.Inactive
   480               else phase = Session.Failed
   513               else phase = Session.Failed
   481 
   514 
   482             case _ => raw_output_messages.event(output)
   515             case _ => raw_output_messages.post(output)
   483           }
   516           }
   484         }
   517         }
   485     }
   518     }
   486     //}}}
   519     //}}}
   487 
   520 
   510           case Update_Options(options) =>
   543           case Update_Options(options) =>
   511             if (prover.isDefined && is_ready) {
   544             if (prover.isDefined && is_ready) {
   512               prover.get.options(options)
   545               prover.get.options(options)
   513               handle_raw_edits(Document.Blobs.empty, Nil)
   546               handle_raw_edits(Document.Blobs.empty, Nil)
   514             }
   547             }
   515             global_options.event(Session.Global_Options(options))
   548             global_options.post(Session.Global_Options(options))
   516 
   549 
   517           case Cancel_Exec(exec_id) if prover.isDefined =>
   550           case Cancel_Exec(exec_id) if prover.isDefined =>
   518             prover.get.cancel_exec(exec_id)
   551             prover.get.cancel_exec(exec_id)
   519 
   552 
   520           case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   553           case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   528             prover.get.protocol_command(name, args:_*)
   561             prover.get.protocol_command(name, args:_*)
   529 
   562 
   530           case Messages(msgs) =>
   563           case Messages(msgs) =>
   531             msgs foreach {
   564             msgs foreach {
   532               case input: Prover.Input =>
   565               case input: Prover.Input =>
   533                 all_messages.event(input)
   566                 all_messages.post(input)
   534 
   567 
   535               case output: Prover.Output =>
   568               case output: Prover.Output =>
   536                 if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   569                 if (output.is_stdout || output.is_stderr) raw_output_messages.post(output)
   537                 else handle_output(output)
   570                 else handle_output(output)
   538                 if (output.is_syslog) syslog_messages.event(output)
   571                 if (output.is_syslog) syslog_messages.post(output)
   539                 all_messages.event(output)
   572                 all_messages.post(output)
   540             }
   573             }
   541 
   574 
   542           case change: Session.Change if prover.isDefined =>
   575           case change: Session.Change if prover.isDefined =>
   543             if (global_state.value.is_assigned(change.previous))
   576             if (global_state.value.is_assigned(change.previous))
   544               handle_change(change)
   577               handle_change(change)
   560     manager.send_wait(Stop)
   593     manager.send_wait(Stop)
   561     receiver.shutdown()
   594     receiver.shutdown()
   562     change_parser.shutdown()
   595     change_parser.shutdown()
   563     change_buffer.shutdown()
   596     change_buffer.shutdown()
   564     manager.shutdown()
   597     manager.shutdown()
       
   598     dispatcher.shutdown()
   565   }
   599   }
   566 
   600 
   567   def protocol_command(name: String, args: String*)
   601   def protocol_command(name: String, args: String*)
   568   { manager.send(Protocol_Command(name, args.toList)) }
   602   { manager.send(Protocol_Command(name, args.toList)) }
   569 
   603