src/Pure/PIDE/session.scala
author wenzelm
Fri Apr 25 13:55:50 2014 +0200 (2014-04-25)
changeset 56719 80eb2192516a
parent 56715 52125652e82a
child 56733 f7700146678d
permissions -rw-r--r--
simplified change_buffer (again, see 937826d702d5): no thread, just timer, rely on asynchronous commands_changed.post;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.util.{Timer, TimerTask}
    12 
    13 import scala.collection.mutable
    14 import scala.collection.immutable.Queue
    15 
    16 
    17 object Session
    18 {
    19   /* outlets */
    20 
    21   object Consumer
    22   {
    23     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    24       new Consumer[A](name, consume)
    25   }
    26   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    27 
    28   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    29   {
    30     private val consumers = Synchronized(List.empty[Consumer[A]])
    31 
    32     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    33     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    34 
    35     def post(a: A)
    36     {
    37       for (c <- consumers.value.iterator) {
    38         dispatcher.send(() =>
    39           try { c.consume(a) }
    40           catch {
    41             case exn: Throwable =>
    42               System.err.println("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    43           })
    44       }
    45     }
    46   }
    47 
    48 
    49   /* change */
    50 
    51   sealed case class Change(
    52     previous: Document.Version,
    53     doc_blobs: Document.Blobs,
    54     syntax_changed: Boolean,
    55     deps_changed: Boolean,
    56     doc_edits: List[Document.Edit_Command],
    57     version: Document.Version)
    58 
    59 
    60   /* events */
    61 
    62   //{{{
    63   case class Statistics(props: Properties.T)
    64   case class Global_Options(options: Options)
    65   case object Caret_Focus
    66   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    67   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    68   case class Commands_Changed(
    69     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    70 
    71   sealed abstract class Phase
    72   case object Inactive extends Phase
    73   case object Startup extends Phase  // transient
    74   case object Failed extends Phase
    75   case object Ready extends Phase
    76   case object Shutdown extends Phase  // transient
    77   //}}}
    78 
    79 
    80   /* protocol handlers */
    81 
    82   abstract class Protocol_Handler
    83   {
    84     def stop(prover: Prover): Unit = {}
    85     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
    86   }
    87 
    88   class Protocol_Handlers(
    89     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    90     functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty)
    91   {
    92     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
    93 
    94     def add(prover: Prover, name: String): Protocol_Handlers =
    95     {
    96       val (handlers1, functions1) =
    97         handlers.get(name) match {
    98           case Some(old_handler) =>
    99             System.err.println("Redefining protocol handler: " + name)
   100             old_handler.stop(prover)
   101             (handlers - name, functions -- old_handler.functions.keys)
   102           case None => (handlers, functions)
   103         }
   104 
   105       val (handlers2, functions2) =
   106         try {
   107           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
   108           val new_functions =
   109             for ((a, f) <- new_handler.functions.toList) yield
   110               (a, (msg: Prover.Protocol_Output) => f(prover, msg))
   111 
   112           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
   113           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
   114 
   115           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
   116         }
   117         catch {
   118           case exn: Throwable =>
   119             System.err.println(Exn.error_message(
   120               "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn)))
   121             (handlers1, functions1)
   122         }
   123 
   124       new Protocol_Handlers(handlers2, functions2)
   125     }
   126 
   127     def invoke(msg: Prover.Protocol_Output): Boolean =
   128       msg.properties match {
   129         case Markup.Function(a) if functions.isDefinedAt(a) =>
   130           try { functions(a)(msg) }
   131           catch {
   132             case exn: Throwable =>
   133               System.err.println(Exn.error_message(
   134                 "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn)))
   135             false
   136           }
   137         case _ => false
   138       }
   139 
   140     def stop(prover: Prover): Protocol_Handlers =
   141     {
   142       for ((_, handler) <- handlers) handler.stop(prover)
   143       new Protocol_Handlers()
   144     }
   145   }
   146 }
   147 
   148 
   149 class Session(val resources: Resources)
   150 {
   151   /* global flags */
   152 
   153   @volatile var timing: Boolean = false
   154   @volatile var verbose: Boolean = false
   155 
   156 
   157   /* tuning parameters */
   158 
   159   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   160   def message_delay: Time = Time.seconds(0.1)  // prover input/output messages
   161   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   162   def prune_size: Int = 0  // size of retained history
   163   def syslog_limit: Int = 100
   164   def reparse_limit: Int = 0
   165 
   166 
   167   /* outlets */
   168 
   169   private val dispatcher =
   170     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   171 
   172   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   173   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   174   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   175   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   176   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   177   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   178   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   179   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   180   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
   181   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   182 
   183 
   184 
   185   /** pipelined change parsing **/
   186 
   187   private case class Text_Edits(
   188     previous: Future[Document.Version],
   189     doc_blobs: Document.Blobs,
   190     text_edits: List[Document.Edit_Text],
   191     version_result: Promise[Document.Version])
   192 
   193   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   194   {
   195     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   196       val prev = previous.get_finished
   197       val change =
   198         Timing.timeit("parse_change", timing) {
   199           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   200         }
   201       version_result.fulfill(change.version)
   202       manager.send(change)
   203       true
   204   }
   205 
   206 
   207 
   208   /** main protocol manager **/
   209 
   210   /* global state */
   211 
   212   private val syslog = Synchronized(Queue.empty[XML.Elem])
   213   def current_syslog(): String = cat_lines(syslog.value.iterator.map(XML.content))
   214 
   215   @volatile private var _phase: Session.Phase = Session.Inactive
   216   private def phase_=(new_phase: Session.Phase)
   217   {
   218     _phase = new_phase
   219     phase_changed.post(new_phase)
   220   }
   221   def phase = _phase
   222   def is_ready: Boolean = phase == Session.Ready
   223 
   224   private val global_state = Synchronized(Document.State.init)
   225   def current_state(): Document.State = global_state.value
   226 
   227   def recent_syntax(): Prover.Syntax =
   228   {
   229     val version = current_state().recent_finished.version.get_finished
   230     version.syntax getOrElse resources.base_syntax
   231   }
   232 
   233   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   234       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   235     global_state.value.snapshot(name, pending_edits)
   236 
   237 
   238   /* protocol handlers */
   239 
   240   @volatile private var _protocol_handlers = new Session.Protocol_Handlers()
   241 
   242   def protocol_handler(name: String): Option[Session.Protocol_Handler] =
   243     _protocol_handlers.get(name)
   244 
   245 
   246   /* theory files */
   247 
   248   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   249   {
   250     val header1 =
   251       if (resources.loaded_theories(name.theory))
   252         header.error("Cannot update finished theory " + quote(name.theory))
   253       else header
   254     (name, Document.Node.Deps(header1))
   255   }
   256 
   257 
   258   /* internal messages */
   259 
   260   private case class Start(name: String, args: List[String])
   261   private case object Stop
   262   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   263   private case class Protocol_Command(name: String, args: List[String])
   264   private case class Messages(msgs: List[Prover.Message])
   265   private case class Update_Options(options: Options)
   266 
   267 
   268   /* buffered changes */
   269 
   270   private object change_buffer
   271   {
   272     private var assignment: Boolean = false
   273     private var nodes: Set[Document.Node.Name] = Set.empty
   274     private var commands: Set[Command] = Set.empty
   275 
   276     def flush(): Unit = synchronized {
   277       if (assignment || !nodes.isEmpty || !commands.isEmpty)
   278         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   279       assignment = false
   280       nodes = Set.empty
   281       commands = Set.empty
   282     }
   283 
   284     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   285       assignment |= assign
   286       for (command <- cmds) {
   287         nodes += command.node_name
   288         commands += command
   289       }
   290     }
   291 
   292     private val timer = new Timer("change_buffer", true)
   293     timer.schedule(new TimerTask { def run = flush() }, output_delay.ms, output_delay.ms)
   294 
   295     def shutdown()
   296     {
   297       timer.cancel()
   298       flush()
   299     }
   300   }
   301 
   302 
   303   /* buffered prover messages */
   304 
   305   private object receiver
   306   {
   307     private var buffer = new mutable.ListBuffer[Prover.Message]
   308 
   309     private def flush(): Unit = synchronized {
   310       if (!buffer.isEmpty) {
   311         val msgs = buffer.toList
   312         manager.send(Messages(msgs))
   313         buffer = new mutable.ListBuffer[Prover.Message]
   314       }
   315     }
   316 
   317     def invoke(msg: Prover.Message): Unit = synchronized {
   318       msg match {
   319         case _: Prover.Input =>
   320           buffer += msg
   321         case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
   322           flush()
   323         case output: Prover.Output =>
   324           buffer += msg
   325           if (output.is_syslog)
   326             syslog.change(queue =>
   327               {
   328                 val queue1 = queue.enqueue(output.message)
   329                 if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   330               })
   331       }
   332     }
   333 
   334     private val timer = new Timer("receiver", true)
   335     timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   336 
   337     def shutdown() { timer.cancel(); flush() }
   338   }
   339 
   340 
   341   /* postponed changes */
   342 
   343   private object postponed_changes
   344   {
   345     private var postponed: List[Session.Change] = Nil
   346 
   347     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   348 
   349     def flush(): Unit = synchronized {
   350       val state = global_state.value
   351       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   352       postponed = unassigned
   353       assigned.reverseIterator.foreach(change => manager.send(change))
   354     }
   355   }
   356 
   357 
   358   /* manager thread */
   359 
   360   private val manager: Consumer_Thread[Any] =
   361   {
   362     var prune_next = Time.now() + prune_delay
   363 
   364     var prover: Option[Prover] = None
   365 
   366 
   367     /* raw edits */
   368 
   369     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   370     //{{{
   371     {
   372       require(prover.isDefined)
   373 
   374       prover.get.discontinue_execution()
   375 
   376       val previous = global_state.value.history.tip.version
   377       val version = Future.promise[Document.Version]
   378       global_state.change(_.continue_history(previous, edits, version))
   379 
   380       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   381       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   382     }
   383     //}}}
   384 
   385 
   386     /* resulting changes */
   387 
   388     def handle_change(change: Session.Change)
   389     //{{{
   390     {
   391       require(prover.isDefined)
   392 
   393       def id_command(command: Command)
   394       {
   395         for {
   396           digest <- command.blobs_digests
   397           if !global_state.value.defined_blob(digest)
   398         } {
   399           change.doc_blobs.get(digest) match {
   400             case Some(blob) =>
   401               global_state.change(_.define_blob(digest))
   402               prover.get.define_blob(digest, blob.bytes)
   403             case None =>
   404               System.err.println("Missing blob for SHA1 digest " + digest)
   405           }
   406         }
   407 
   408         if (!global_state.value.defined_command(command.id)) {
   409           global_state.change(_.define_command(command))
   410           prover.get.define_command(command)
   411         }
   412       }
   413       change.doc_edits foreach {
   414         case (_, edit) =>
   415           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   416       }
   417 
   418       val assignment = global_state.value.the_assignment(change.previous).check_finished
   419       global_state.change(_.define_version(change.version, assignment))
   420       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   421       resources.commit(change)
   422     }
   423     //}}}
   424 
   425 
   426     /* prover output */
   427 
   428     def handle_output(output: Prover.Output)
   429     //{{{
   430     {
   431       def bad_output()
   432       {
   433         if (verbose)
   434           System.err.println("Ignoring bad prover output: " + output.message.toString)
   435       }
   436 
   437       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   438       {
   439         try {
   440           val st = global_state.change_result(_.accumulate(state_id, message))
   441           change_buffer.invoke(false, List(st.command))
   442         }
   443         catch {
   444           case _: Document.State.Fail => bad_output()
   445         }
   446       }
   447 
   448       output match {
   449         case msg: Prover.Protocol_Output =>
   450           val handled = _protocol_handlers.invoke(msg)
   451           if (!handled) {
   452             msg.properties match {
   453               case Markup.Protocol_Handler(name) if prover.isDefined =>
   454                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
   455 
   456               case Protocol.Command_Timing(state_id, timing) if prover.isDefined =>
   457                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   458                 accumulate(state_id, prover.get.xml_cache.elem(message))
   459 
   460               case Markup.Assign_Update =>
   461                 msg.text match {
   462                   case Protocol.Assign_Update(id, update) =>
   463                     try {
   464                       val cmds = global_state.change_result(_.assign(id, update))
   465                       change_buffer.invoke(true, cmds)
   466                     }
   467                     catch { case _: Document.State.Fail => bad_output() }
   468                     postponed_changes.flush()
   469                   case _ => bad_output()
   470                 }
   471                 // FIXME separate timeout event/message!?
   472                 if (prover.isDefined && Time.now() > prune_next) {
   473                   val old_versions = global_state.change_result(_.prune_history(prune_size))
   474                   if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   475                   prune_next = Time.now() + prune_delay
   476                 }
   477 
   478               case Markup.Removed_Versions =>
   479                 msg.text match {
   480                   case Protocol.Removed(removed) =>
   481                     try {
   482                       global_state.change(_.removed_versions(removed))
   483                     }
   484                     catch { case _: Document.State.Fail => bad_output() }
   485                   case _ => bad_output()
   486                 }
   487 
   488               case Markup.ML_Statistics(props) =>
   489                 statistics.post(Session.Statistics(props))
   490 
   491               case Markup.Task_Statistics(props) =>
   492                 // FIXME
   493 
   494               case _ => bad_output()
   495             }
   496           }
   497         case _ =>
   498           output.properties match {
   499             case Position.Id(state_id) =>
   500               accumulate(state_id, output.message)
   501 
   502             case _ if output.is_init =>
   503               phase = Session.Ready
   504 
   505             case Markup.Return_Code(rc) if output.is_exit =>
   506               prover = None
   507               if (rc == 0) phase = Session.Inactive
   508               else phase = Session.Failed
   509 
   510             case _ => raw_output_messages.post(output)
   511           }
   512         }
   513     }
   514     //}}}
   515 
   516 
   517     /* main thread */
   518 
   519     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   520     {
   521       case arg: Any =>
   522         //{{{
   523         arg match {
   524           case Start(name, args) if prover.isEmpty =>
   525             if (phase == Session.Inactive || phase == Session.Failed) {
   526               phase = Session.Startup
   527               prover = Some(resources.start_prover(receiver.invoke _, name, args))
   528             }
   529 
   530           case Stop =>
   531             if (prover.isDefined && is_ready) {
   532               _protocol_handlers = _protocol_handlers.stop(prover.get)
   533               global_state.change(_ => Document.State.init)  // FIXME event bus!?
   534               phase = Session.Shutdown
   535               prover.get.terminate
   536             }
   537 
   538           case Update_Options(options) =>
   539             if (prover.isDefined && is_ready) {
   540               prover.get.options(options)
   541               handle_raw_edits(Document.Blobs.empty, Nil)
   542             }
   543             global_options.post(Session.Global_Options(options))
   544 
   545           case Cancel_Exec(exec_id) if prover.isDefined =>
   546             prover.get.cancel_exec(exec_id)
   547 
   548           case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   549             handle_raw_edits(doc_blobs, edits)
   550 
   551           case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   552             prover.get.dialog_result(serial, result)
   553             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   554 
   555           case Protocol_Command(name, args) if prover.isDefined =>
   556             prover.get.protocol_command(name, args:_*)
   557 
   558           case Messages(msgs) =>
   559             msgs foreach {
   560               case input: Prover.Input =>
   561                 all_messages.post(input)
   562 
   563               case output: Prover.Output =>
   564                 if (output.is_stdout || output.is_stderr) raw_output_messages.post(output)
   565                 else handle_output(output)
   566                 if (output.is_syslog) syslog_messages.post(output)
   567                 all_messages.post(output)
   568             }
   569 
   570           case change: Session.Change if prover.isDefined =>
   571             if (global_state.value.is_assigned(change.previous))
   572               handle_change(change)
   573             else postponed_changes.store(change)
   574         }
   575         true
   576         //}}}
   577     }
   578   }
   579 
   580 
   581   /* actions */
   582 
   583   def start(name: String, args: List[String])
   584   { manager.send(Start(name, args)) }
   585 
   586   def stop()
   587   {
   588     manager.send_wait(Stop)
   589     receiver.shutdown()
   590     change_parser.shutdown()
   591     change_buffer.shutdown()
   592     manager.shutdown()
   593     dispatcher.shutdown()
   594   }
   595 
   596   def protocol_command(name: String, args: String*)
   597   { manager.send(Protocol_Command(name, args.toList)) }
   598 
   599   def cancel_exec(exec_id: Document_ID.Exec)
   600   { manager.send(Cancel_Exec(exec_id)) }
   601 
   602   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   603   { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   604 
   605   def update_options(options: Options)
   606   { manager.send_wait(Update_Options(options)) }
   607 
   608   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   609   { manager.send(Session.Dialog_Result(id, serial, result)) }
   610 }