src/Pure/PIDE/session.scala
author wenzelm
Fri Apr 25 12:51:08 2014 +0200 (2014-04-25 ago)
changeset 56715 52125652e82a
parent 56713 3438dfba58fe
child 56719 80eb2192516a
permissions -rw-r--r--
clarified Session.Consumer, with Session.Outlet managed by dispatcher thread;
eliminated old actors;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.util.{Timer, TimerTask}
    12 
    13 import scala.collection.mutable
    14 import scala.collection.immutable.Queue
    15 
    16 
    17 object Session
    18 {
    19   /* outlets */
    20 
    21   object Consumer
    22   {
    23     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    24       new Consumer[A](name, consume)
    25   }
    26   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    27 
    28   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    29   {
    30     private val consumers = Synchronized(List.empty[Consumer[A]])
    31 
    32     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    33     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    34 
    35     def post(a: A)
    36     {
    37       for (c <- consumers.value.iterator) {
    38         dispatcher.send(() =>
    39           try { c.consume(a) }
    40           catch {
    41             case exn: Throwable =>
    42               System.err.println("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    43           })
    44       }
    45     }
    46   }
    47 
    48 
    49   /* change */
    50 
    51   sealed case class Change(
    52     previous: Document.Version,
    53     doc_blobs: Document.Blobs,
    54     syntax_changed: Boolean,
    55     deps_changed: Boolean,
    56     doc_edits: List[Document.Edit_Command],
    57     version: Document.Version)
    58 
    59 
    60   /* events */
    61 
    62   //{{{
    63   case class Statistics(props: Properties.T)
    64   case class Global_Options(options: Options)
    65   case object Caret_Focus
    66   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    67   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    68   case class Commands_Changed(
    69     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    70 
    71   sealed abstract class Phase
    72   case object Inactive extends Phase
    73   case object Startup extends Phase  // transient
    74   case object Failed extends Phase
    75   case object Ready extends Phase
    76   case object Shutdown extends Phase  // transient
    77   //}}}
    78 
    79 
    80   /* protocol handlers */
    81 
    82   abstract class Protocol_Handler
    83   {
    84     def stop(prover: Prover): Unit = {}
    85     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
    86   }
    87 
    88   class Protocol_Handlers(
    89     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    90     functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty)
    91   {
    92     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
    93 
    94     def add(prover: Prover, name: String): Protocol_Handlers =
    95     {
    96       val (handlers1, functions1) =
    97         handlers.get(name) match {
    98           case Some(old_handler) =>
    99             System.err.println("Redefining protocol handler: " + name)
   100             old_handler.stop(prover)
   101             (handlers - name, functions -- old_handler.functions.keys)
   102           case None => (handlers, functions)
   103         }
   104 
   105       val (handlers2, functions2) =
   106         try {
   107           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
   108           val new_functions =
   109             for ((a, f) <- new_handler.functions.toList) yield
   110               (a, (msg: Prover.Protocol_Output) => f(prover, msg))
   111 
   112           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
   113           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
   114 
   115           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
   116         }
   117         catch {
   118           case exn: Throwable =>
   119             System.err.println(Exn.error_message(
   120               "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn)))
   121             (handlers1, functions1)
   122         }
   123 
   124       new Protocol_Handlers(handlers2, functions2)
   125     }
   126 
   127     def invoke(msg: Prover.Protocol_Output): Boolean =
   128       msg.properties match {
   129         case Markup.Function(a) if functions.isDefinedAt(a) =>
   130           try { functions(a)(msg) }
   131           catch {
   132             case exn: Throwable =>
   133               System.err.println(Exn.error_message(
   134                 "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn)))
   135             false
   136           }
   137         case _ => false
   138       }
   139 
   140     def stop(prover: Prover): Protocol_Handlers =
   141     {
   142       for ((_, handler) <- handlers) handler.stop(prover)
   143       new Protocol_Handlers()
   144     }
   145   }
   146 }
   147 
   148 
   149 class Session(val resources: Resources)
   150 {
   151   /* global flags */
   152 
   153   @volatile var timing: Boolean = false
   154   @volatile var verbose: Boolean = false
   155 
   156 
   157   /* tuning parameters */
   158 
   159   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   160   def message_delay: Time = Time.seconds(0.1)  // prover input/output messages
   161   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   162   def prune_size: Int = 0  // size of retained history
   163   def syslog_limit: Int = 100
   164   def reparse_limit: Int = 0
   165 
   166 
   167   /* outlets */
   168 
   169   private val dispatcher =
   170     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   171 
   172   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   173   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   174   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   175   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   176   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   177   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   178   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   179   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   180   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
   181   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   182 
   183 
   184 
   185   /** buffered changes: to be dispatched to clients **/
   186 
   187   private case class Received_Change(assignment: Boolean, commands: List[Command])
   188 
   189   private val change_buffer: Consumer_Thread[Received_Change] =
   190   {
   191     object changed
   192     {
   193       private var assignment: Boolean = false
   194       private var nodes: Set[Document.Node.Name] = Set.empty
   195       private var commands: Set[Command] = Set.empty
   196 
   197       def flush(): Unit = synchronized {
   198         if (assignment || !nodes.isEmpty || !commands.isEmpty)
   199           commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   200         assignment = false
   201         nodes = Set.empty
   202         commands = Set.empty
   203       }
   204 
   205       def invoke(change: Received_Change): Unit = synchronized {
   206         assignment |= change.assignment
   207         for (command <- change.commands) {
   208           nodes += command.node_name
   209           commands += command
   210         }
   211       }
   212     }
   213 
   214     val timer = new Timer("change_buffer", true)
   215     timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
   216 
   217     Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)(
   218       consume = { case change => changed.invoke(change); true },
   219       finish = () => { timer.cancel(); changed.flush() }
   220     )
   221   }
   222 
   223 
   224 
   225   /** pipelined change parsing **/
   226 
   227   private case class Text_Edits(
   228     previous: Future[Document.Version],
   229     doc_blobs: Document.Blobs,
   230     text_edits: List[Document.Edit_Text],
   231     version_result: Promise[Document.Version])
   232 
   233   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   234   {
   235     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   236       val prev = previous.get_finished
   237       val change =
   238         Timing.timeit("parse_change", timing) {
   239           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   240         }
   241       version_result.fulfill(change.version)
   242       manager.send(change)
   243       true
   244   }
   245 
   246 
   247 
   248   /** main protocol manager **/
   249 
   250   /* global state */
   251 
   252   private val syslog = Synchronized(Queue.empty[XML.Elem])
   253   def current_syslog(): String = cat_lines(syslog.value.iterator.map(XML.content))
   254 
   255   @volatile private var _phase: Session.Phase = Session.Inactive
   256   private def phase_=(new_phase: Session.Phase)
   257   {
   258     _phase = new_phase
   259     phase_changed.post(new_phase)
   260   }
   261   def phase = _phase
   262   def is_ready: Boolean = phase == Session.Ready
   263 
   264   private val global_state = Synchronized(Document.State.init)
   265   def current_state(): Document.State = global_state.value
   266 
   267   def recent_syntax(): Prover.Syntax =
   268   {
   269     val version = current_state().recent_finished.version.get_finished
   270     version.syntax getOrElse resources.base_syntax
   271   }
   272 
   273   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   274       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   275     global_state.value.snapshot(name, pending_edits)
   276 
   277 
   278   /* protocol handlers */
   279 
   280   @volatile private var _protocol_handlers = new Session.Protocol_Handlers()
   281 
   282   def protocol_handler(name: String): Option[Session.Protocol_Handler] =
   283     _protocol_handlers.get(name)
   284 
   285 
   286   /* theory files */
   287 
   288   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   289   {
   290     val header1 =
   291       if (resources.loaded_theories(name.theory))
   292         header.error("Cannot update finished theory " + quote(name.theory))
   293       else header
   294     (name, Document.Node.Deps(header1))
   295   }
   296 
   297 
   298   /* internal messages */
   299 
   300   private case class Start(name: String, args: List[String])
   301   private case object Stop
   302   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   303   private case class Protocol_Command(name: String, args: List[String])
   304   private case class Messages(msgs: List[Prover.Message])
   305   private case class Update_Options(options: Options)
   306 
   307 
   308   /* buffered prover messages */
   309 
   310   private object receiver
   311   {
   312     private var buffer = new mutable.ListBuffer[Prover.Message]
   313 
   314     private def flush(): Unit = synchronized {
   315       if (!buffer.isEmpty) {
   316         val msgs = buffer.toList
   317         manager.send(Messages(msgs))
   318         buffer = new mutable.ListBuffer[Prover.Message]
   319       }
   320     }
   321 
   322     def invoke(msg: Prover.Message): Unit = synchronized {
   323       msg match {
   324         case _: Prover.Input =>
   325           buffer += msg
   326         case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
   327           flush()
   328         case output: Prover.Output =>
   329           buffer += msg
   330           if (output.is_syslog)
   331             syslog.change(queue =>
   332               {
   333                 val queue1 = queue.enqueue(output.message)
   334                 if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   335               })
   336       }
   337     }
   338 
   339     private val timer = new Timer("receiver", true)
   340     timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   341 
   342     def shutdown() { timer.cancel(); flush() }
   343   }
   344 
   345 
   346   /* postponed changes */
   347 
   348   private object postponed_changes
   349   {
   350     private var postponed: List[Session.Change] = Nil
   351 
   352     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   353 
   354     def flush(): Unit = synchronized {
   355       val state = global_state.value
   356       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   357       postponed = unassigned
   358       assigned.reverseIterator.foreach(change => manager.send(change))
   359     }
   360   }
   361 
   362 
   363   /* manager thread */
   364 
   365   private val manager: Consumer_Thread[Any] =
   366   {
   367     var prune_next = Time.now() + prune_delay
   368 
   369     var prover: Option[Prover] = None
   370 
   371 
   372     /* raw edits */
   373 
   374     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   375     //{{{
   376     {
   377       require(prover.isDefined)
   378 
   379       prover.get.discontinue_execution()
   380 
   381       val previous = global_state.value.history.tip.version
   382       val version = Future.promise[Document.Version]
   383       global_state.change(_.continue_history(previous, edits, version))
   384 
   385       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   386       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   387     }
   388     //}}}
   389 
   390 
   391     /* resulting changes */
   392 
   393     def handle_change(change: Session.Change)
   394     //{{{
   395     {
   396       require(prover.isDefined)
   397 
   398       def id_command(command: Command)
   399       {
   400         for {
   401           digest <- command.blobs_digests
   402           if !global_state.value.defined_blob(digest)
   403         } {
   404           change.doc_blobs.get(digest) match {
   405             case Some(blob) =>
   406               global_state.change(_.define_blob(digest))
   407               prover.get.define_blob(digest, blob.bytes)
   408             case None =>
   409               System.err.println("Missing blob for SHA1 digest " + digest)
   410           }
   411         }
   412 
   413         if (!global_state.value.defined_command(command.id)) {
   414           global_state.change(_.define_command(command))
   415           prover.get.define_command(command)
   416         }
   417       }
   418       change.doc_edits foreach {
   419         case (_, edit) =>
   420           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   421       }
   422 
   423       val assignment = global_state.value.the_assignment(change.previous).check_finished
   424       global_state.change(_.define_version(change.version, assignment))
   425       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   426       resources.commit(change)
   427     }
   428     //}}}
   429 
   430 
   431     /* prover output */
   432 
   433     def handle_output(output: Prover.Output)
   434     //{{{
   435     {
   436       def bad_output()
   437       {
   438         if (verbose)
   439           System.err.println("Ignoring bad prover output: " + output.message.toString)
   440       }
   441 
   442       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   443       {
   444         try {
   445           val st = global_state.change_result(_.accumulate(state_id, message))
   446           change_buffer.send(Received_Change(false, List(st.command)))
   447         }
   448         catch {
   449           case _: Document.State.Fail => bad_output()
   450         }
   451       }
   452 
   453       output match {
   454         case msg: Prover.Protocol_Output =>
   455           val handled = _protocol_handlers.invoke(msg)
   456           if (!handled) {
   457             msg.properties match {
   458               case Markup.Protocol_Handler(name) if prover.isDefined =>
   459                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
   460 
   461               case Protocol.Command_Timing(state_id, timing) if prover.isDefined =>
   462                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   463                 accumulate(state_id, prover.get.xml_cache.elem(message))
   464 
   465               case Markup.Assign_Update =>
   466                 msg.text match {
   467                   case Protocol.Assign_Update(id, update) =>
   468                     try {
   469                       val cmds = global_state.change_result(_.assign(id, update))
   470                       change_buffer.send(Received_Change(true, cmds))
   471                     }
   472                     catch { case _: Document.State.Fail => bad_output() }
   473                     postponed_changes.flush()
   474                   case _ => bad_output()
   475                 }
   476                 // FIXME separate timeout event/message!?
   477                 if (prover.isDefined && Time.now() > prune_next) {
   478                   val old_versions = global_state.change_result(_.prune_history(prune_size))
   479                   if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   480                   prune_next = Time.now() + prune_delay
   481                 }
   482 
   483               case Markup.Removed_Versions =>
   484                 msg.text match {
   485                   case Protocol.Removed(removed) =>
   486                     try {
   487                       global_state.change(_.removed_versions(removed))
   488                     }
   489                     catch { case _: Document.State.Fail => bad_output() }
   490                   case _ => bad_output()
   491                 }
   492 
   493               case Markup.ML_Statistics(props) =>
   494                 statistics.post(Session.Statistics(props))
   495 
   496               case Markup.Task_Statistics(props) =>
   497                 // FIXME
   498 
   499               case _ => bad_output()
   500             }
   501           }
   502         case _ =>
   503           output.properties match {
   504             case Position.Id(state_id) =>
   505               accumulate(state_id, output.message)
   506 
   507             case _ if output.is_init =>
   508               phase = Session.Ready
   509 
   510             case Markup.Return_Code(rc) if output.is_exit =>
   511               prover = None
   512               if (rc == 0) phase = Session.Inactive
   513               else phase = Session.Failed
   514 
   515             case _ => raw_output_messages.post(output)
   516           }
   517         }
   518     }
   519     //}}}
   520 
   521 
   522     /* main thread */
   523 
   524     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   525     {
   526       case arg: Any =>
   527         //{{{
   528         arg match {
   529           case Start(name, args) if prover.isEmpty =>
   530             if (phase == Session.Inactive || phase == Session.Failed) {
   531               phase = Session.Startup
   532               prover = Some(resources.start_prover(receiver.invoke _, name, args))
   533             }
   534 
   535           case Stop =>
   536             if (prover.isDefined && is_ready) {
   537               _protocol_handlers = _protocol_handlers.stop(prover.get)
   538               global_state.change(_ => Document.State.init)  // FIXME event bus!?
   539               phase = Session.Shutdown
   540               prover.get.terminate
   541             }
   542 
   543           case Update_Options(options) =>
   544             if (prover.isDefined && is_ready) {
   545               prover.get.options(options)
   546               handle_raw_edits(Document.Blobs.empty, Nil)
   547             }
   548             global_options.post(Session.Global_Options(options))
   549 
   550           case Cancel_Exec(exec_id) if prover.isDefined =>
   551             prover.get.cancel_exec(exec_id)
   552 
   553           case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   554             handle_raw_edits(doc_blobs, edits)
   555 
   556           case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   557             prover.get.dialog_result(serial, result)
   558             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   559 
   560           case Protocol_Command(name, args) if prover.isDefined =>
   561             prover.get.protocol_command(name, args:_*)
   562 
   563           case Messages(msgs) =>
   564             msgs foreach {
   565               case input: Prover.Input =>
   566                 all_messages.post(input)
   567 
   568               case output: Prover.Output =>
   569                 if (output.is_stdout || output.is_stderr) raw_output_messages.post(output)
   570                 else handle_output(output)
   571                 if (output.is_syslog) syslog_messages.post(output)
   572                 all_messages.post(output)
   573             }
   574 
   575           case change: Session.Change if prover.isDefined =>
   576             if (global_state.value.is_assigned(change.previous))
   577               handle_change(change)
   578             else postponed_changes.store(change)
   579         }
   580         true
   581         //}}}
   582     }
   583   }
   584 
   585 
   586   /* actions */
   587 
   588   def start(name: String, args: List[String])
   589   { manager.send(Start(name, args)) }
   590 
   591   def stop()
   592   {
   593     manager.send_wait(Stop)
   594     receiver.shutdown()
   595     change_parser.shutdown()
   596     change_buffer.shutdown()
   597     manager.shutdown()
   598     dispatcher.shutdown()
   599   }
   600 
   601   def protocol_command(name: String, args: String*)
   602   { manager.send(Protocol_Command(name, args.toList)) }
   603 
   604   def cancel_exec(exec_id: Document_ID.Exec)
   605   { manager.send(Cancel_Exec(exec_id)) }
   606 
   607   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   608   { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   609 
   610   def update_options(options: Options)
   611   { manager.send_wait(Update_Options(options)) }
   612 
   613   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   614   { manager.send(Session.Dialog_Result(id, serial, result)) }
   615 }