src/Pure/PIDE/session.scala
author wenzelm
Sat Mar 29 10:17:09 2014 +0100 (2014-03-29 ago)
changeset 56315 c20053f67093
parent 56210 c7c85cdb725d
child 56316 b1cf8ddc2e04
permissions -rw-r--r--
tuned signature;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.util.{Timer, TimerTask}
    12 
    13 import scala.collection.mutable
    14 import scala.collection.immutable.Queue
    15 import scala.actors.TIMEOUT
    16 import scala.actors.Actor._
    17 
    18 
    19 object Session
    20 {
    21   /* change */
    22 
    23   sealed case class Change(
    24     previous: Document.Version,
    25     doc_blobs: Document.Blobs,
    26     syntax_changed: Boolean,
    27     doc_edits: List[Document.Edit_Command],
    28     version: Document.Version)
    29 
    30 
    31   /* events */
    32 
    33   //{{{
    34   case class Statistics(props: Properties.T)
    35   case class Global_Options(options: Options)
    36   case object Caret_Focus
    37   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    38   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    39   case class Commands_Changed(
    40     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    41 
    42   sealed abstract class Phase
    43   case object Inactive extends Phase
    44   case object Startup extends Phase  // transient
    45   case object Failed extends Phase
    46   case object Ready extends Phase
    47   case object Shutdown extends Phase  // transient
    48   //}}}
    49 
    50 
    51   /* protocol handlers */
    52 
    53   type Prover = Isabelle_Process with Protocol
    54 
    55   abstract class Protocol_Handler
    56   {
    57     def stop(prover: Prover): Unit = {}
    58     val functions: Map[String, (Prover, Isabelle_Process.Protocol_Output) => Boolean]
    59   }
    60 
    61   class Protocol_Handlers(
    62     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    63     functions: Map[String, Isabelle_Process.Protocol_Output => Boolean] = Map.empty)
    64   {
    65     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
    66 
    67     def add(prover: Prover, name: String): Protocol_Handlers =
    68     {
    69       val (handlers1, functions1) =
    70         handlers.get(name) match {
    71           case Some(old_handler) =>
    72             System.err.println("Redefining protocol handler: " + name)
    73             old_handler.stop(prover)
    74             (handlers - name, functions -- old_handler.functions.keys)
    75           case None => (handlers, functions)
    76         }
    77 
    78       val (handlers2, functions2) =
    79         try {
    80           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
    81           val new_functions =
    82             for ((a, f) <- new_handler.functions.toList) yield
    83               (a, (msg: Isabelle_Process.Protocol_Output) => f(prover, msg))
    84 
    85           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
    86           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
    87 
    88           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
    89         }
    90         catch {
    91           case exn: Throwable =>
    92             System.err.println("Failed to initialize protocol handler: " +
    93               name + "\n" + Exn.message(exn))
    94             (handlers1, functions1)
    95         }
    96 
    97       new Protocol_Handlers(handlers2, functions2)
    98     }
    99 
   100     def invoke(msg: Isabelle_Process.Protocol_Output): Boolean =
   101       msg.properties match {
   102         case Markup.Function(a) if functions.isDefinedAt(a) =>
   103           try { functions(a)(msg) }
   104           catch {
   105             case exn: Throwable =>
   106               System.err.println("Failed invocation of protocol function: " +
   107                 quote(a) + "\n" + Exn.message(exn))
   108             false
   109           }
   110         case _ => false
   111       }
   112 
   113     def stop(prover: Prover): Protocol_Handlers =
   114     {
   115       for ((_, handler) <- handlers) handler.stop(prover)
   116       new Protocol_Handlers()
   117     }
   118   }
   119 }
   120 
   121 
   122 class Session(val resources: Resources)
   123 {
   124   /* global flags */
   125 
   126   @volatile var timing: Boolean = false
   127   @volatile var verbose: Boolean = false
   128 
   129 
   130   /* tuning parameters */
   131 
   132   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   133   def message_delay: Time = Time.seconds(0.1)  // prover input/output messages
   134   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   135   def prune_size: Int = 0  // size of retained history
   136   def syslog_limit: Int = 100
   137   def reparse_limit: Int = 0
   138 
   139 
   140   /* pervasive event buses */
   141 
   142   val statistics = new Event_Bus[Session.Statistics]
   143   val global_options = new Event_Bus[Session.Global_Options]
   144   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
   145   val raw_edits = new Event_Bus[Session.Raw_Edits]
   146   val commands_changed = new Event_Bus[Session.Commands_Changed]
   147   val phase_changed = new Event_Bus[Session.Phase]
   148   val syslog_messages = new Event_Bus[Isabelle_Process.Output]
   149   val raw_output_messages = new Event_Bus[Isabelle_Process.Output]
   150   val all_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
   151   val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
   152 
   153 
   154   /** buffered command changes (delay_first discipline) **/
   155 
   156   //{{{
   157   private case object Stop
   158 
   159   private val (_, commands_changed_buffer) =
   160     Simple_Thread.actor("commands_changed_buffer", daemon = true)
   161   {
   162     var finished = false
   163     while (!finished) {
   164       receive {
   165         case Stop => finished = true; reply(())
   166         case changed: Session.Commands_Changed => commands_changed.event(changed)
   167         case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
   168       }
   169     }
   170   }
   171   //}}}
   172 
   173 
   174   /** pipelined change parsing **/
   175 
   176   //{{{
   177   private case class Text_Edits(
   178     previous: Future[Document.Version],
   179     doc_blobs: Document.Blobs,
   180     text_edits: List[Document.Edit_Text],
   181     version_result: Promise[Document.Version])
   182 
   183   private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
   184   {
   185     var finished = false
   186     while (!finished) {
   187       receive {
   188         case Stop => finished = true; reply(())
   189 
   190         case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   191           val prev = previous.get_finished
   192           val change =
   193             Timing.timeit("parse_edits", timing) {
   194               resources.parse_edits(reparse_limit, prev, doc_blobs, text_edits)
   195             }
   196           version_result.fulfill(change.version)
   197           sender ! change
   198 
   199         case bad => System.err.println("change_parser: ignoring bad message " + bad)
   200       }
   201     }
   202   }
   203   //}}}
   204 
   205 
   206 
   207   /** main protocol actor **/
   208 
   209   /* global state */
   210 
   211   private val syslog = Volatile(Queue.empty[XML.Elem])
   212   def current_syslog(): String = cat_lines(syslog().iterator.map(XML.content))
   213 
   214   @volatile private var _phase: Session.Phase = Session.Inactive
   215   private def phase_=(new_phase: Session.Phase)
   216   {
   217     _phase = new_phase
   218     phase_changed.event(new_phase)
   219   }
   220   def phase = _phase
   221   def is_ready: Boolean = phase == Session.Ready
   222 
   223   private val global_state = Volatile(Document.State.init)
   224   def current_state(): Document.State = global_state()
   225 
   226   def recent_syntax(): Outer_Syntax =
   227   {
   228     val version = current_state().recent_finished.version.get_finished
   229     if (version.is_init) resources.base_syntax
   230     else version.syntax
   231   }
   232 
   233   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   234       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   235     global_state().snapshot(name, pending_edits)
   236 
   237 
   238   /* protocol handlers */
   239 
   240   @volatile private var _protocol_handlers = new Session.Protocol_Handlers()
   241 
   242   def protocol_handler(name: String): Option[Session.Protocol_Handler] =
   243     _protocol_handlers.get(name)
   244 
   245 
   246   /* theory files */
   247 
   248   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   249   {
   250     val header1 =
   251       if (resources.loaded_theories(name.theory))
   252         header.error("Cannot update finished theory " + quote(name.theory))
   253       else header
   254     (name, Document.Node.Deps(header1))
   255   }
   256 
   257 
   258   /* actor messages */
   259 
   260   private case class Start(args: List[String])
   261   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   262   private case class Protocol_Command(name: String, args: List[String])
   263   private case class Messages(msgs: List[Isabelle_Process.Message])
   264   private case class Update_Options(options: Options)
   265 
   266   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   267   {
   268     val this_actor = self
   269 
   270     var prune_next = System.currentTimeMillis() + prune_delay.ms
   271 
   272 
   273     /* buffered prover messages */
   274 
   275     object receiver
   276     {
   277       private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   278 
   279       private def flush(): Unit = synchronized {
   280         if (!buffer.isEmpty) {
   281           val msgs = buffer.toList
   282           this_actor ! Messages(msgs)
   283           buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   284         }
   285       }
   286       def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
   287         msg match {
   288           case _: Isabelle_Process.Input =>
   289             buffer += msg
   290           case output: Isabelle_Process.Protocol_Output if output.properties == Markup.Flush =>
   291             flush()
   292           case output: Isabelle_Process.Output =>
   293             buffer += msg
   294             if (output.is_syslog)
   295               syslog >> (queue =>
   296                 {
   297                   val queue1 = queue.enqueue(output.message)
   298                   if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   299                 })
   300         }
   301       }
   302 
   303       private val timer = new Timer("session_actor.receiver", true)
   304       timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   305 
   306       def cancel() { timer.cancel() }
   307     }
   308 
   309     var prover: Option[Session.Prover] = None
   310 
   311 
   312     /* delayed command changes */
   313 
   314     object delay_commands_changed
   315     {
   316       private var changed_assignment: Boolean = false
   317       private var changed_nodes: Set[Document.Node.Name] = Set.empty
   318       private var changed_commands: Set[Command] = Set.empty
   319 
   320       private var flush_time: Option[Long] = None
   321 
   322       def flush_timeout: Long =
   323         flush_time match {
   324           case None => 5000L
   325           case Some(time) => (time - System.currentTimeMillis()) max 0
   326         }
   327 
   328       def flush()
   329       {
   330         if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
   331           commands_changed_buffer !
   332             Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
   333         changed_assignment = false
   334         changed_nodes = Set.empty
   335         changed_commands = Set.empty
   336         flush_time = None
   337       }
   338 
   339       def invoke(assign: Boolean, commands: List[Command])
   340       {
   341         changed_assignment |= assign
   342         for (command <- commands) {
   343           changed_nodes += command.node_name
   344           changed_commands += command
   345         }
   346         val now = System.currentTimeMillis()
   347         flush_time match {
   348           case None => flush_time = Some(now + output_delay.ms)
   349           case Some(time) => if (now >= time) flush()
   350         }
   351       }
   352     }
   353 
   354 
   355     /* raw edits */
   356 
   357     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   358     //{{{
   359     {
   360       prover.get.discontinue_execution()
   361 
   362       val previous = global_state().history.tip.version
   363       val version = Future.promise[Document.Version]
   364       val change = global_state >>> (_.continue_history(previous, edits, version))
   365 
   366       raw_edits.event(Session.Raw_Edits(doc_blobs, edits))
   367       change_parser ! Text_Edits(previous, doc_blobs, edits, version)
   368     }
   369     //}}}
   370 
   371 
   372     /* resulting changes */
   373 
   374     def handle_change(change: Session.Change)
   375     //{{{
   376     {
   377       def id_command(command: Command)
   378       {
   379         for {
   380           digest <- command.blobs_digests
   381           if !global_state().defined_blob(digest)
   382         } {
   383           change.doc_blobs.get(digest) match {
   384             case Some(blob) =>
   385               global_state >> (_.define_blob(digest))
   386               prover.get.define_blob(blob)
   387             case None =>
   388               System.err.println("Missing blob for SHA1 digest " + digest)
   389           }
   390         }
   391 
   392         if (!global_state().defined_command(command.id)) {
   393           global_state >> (_.define_command(command))
   394           prover.get.define_command(command)
   395         }
   396       }
   397       change.doc_edits foreach {
   398         case (_, edit) =>
   399           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   400       }
   401 
   402       val assignment = global_state().the_assignment(change.previous).check_finished
   403       global_state >> (_.define_version(change.version, assignment))
   404       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   405 
   406       if (change.syntax_changed) resources.syntax_changed()
   407     }
   408     //}}}
   409 
   410 
   411     /* prover output */
   412 
   413     def handle_output(output: Isabelle_Process.Output)
   414     //{{{
   415     {
   416       def bad_output()
   417       {
   418         if (verbose)
   419           System.err.println("Ignoring prover output: " + output.message.toString)
   420       }
   421 
   422       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   423       {
   424         try {
   425           val st = global_state >>> (_.accumulate(state_id, message))
   426           delay_commands_changed.invoke(false, List(st.command))
   427         }
   428         catch {
   429           case _: Document.State.Fail => bad_output()
   430         }
   431       }
   432 
   433       output match {
   434         case msg: Isabelle_Process.Protocol_Output =>
   435           val handled = _protocol_handlers.invoke(msg)
   436           if (!handled) {
   437             msg.properties match {
   438               case Markup.Protocol_Handler(name) =>
   439                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
   440 
   441               case Protocol.Command_Timing(state_id, timing) =>
   442                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   443                 accumulate(state_id, prover.get.xml_cache.elem(message))
   444 
   445               case Markup.Assign_Update =>
   446                 msg.text match {
   447                   case Protocol.Assign_Update(id, update) =>
   448                     try {
   449                       val cmds = global_state >>> (_.assign(id, update))
   450                       delay_commands_changed.invoke(true, cmds)
   451                     }
   452                     catch { case _: Document.State.Fail => bad_output() }
   453                   case _ => bad_output()
   454                 }
   455                 // FIXME separate timeout event/message!?
   456                 if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   457                   val old_versions = global_state >>> (_.prune_history(prune_size))
   458                   if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   459                   prune_next = System.currentTimeMillis() + prune_delay.ms
   460                 }
   461 
   462               case Markup.Removed_Versions =>
   463                 msg.text match {
   464                   case Protocol.Removed(removed) =>
   465                     try {
   466                       global_state >> (_.removed_versions(removed))
   467                     }
   468                     catch { case _: Document.State.Fail => bad_output() }
   469                   case _ => bad_output()
   470                 }
   471 
   472               case Markup.ML_Statistics(props) =>
   473                 statistics.event(Session.Statistics(props))
   474 
   475               case Markup.Task_Statistics(props) =>
   476                 // FIXME
   477 
   478               case _ => bad_output()
   479             }
   480           }
   481         case _ =>
   482           output.properties match {
   483             case Position.Id(state_id) =>
   484               accumulate(state_id, output.message)
   485 
   486             case _ if output.is_init =>
   487               phase = Session.Ready
   488 
   489             case Markup.Return_Code(rc) if output.is_exit =>
   490               if (rc == 0) phase = Session.Inactive
   491               else phase = Session.Failed
   492 
   493             case _ => raw_output_messages.event(output)
   494           }
   495         }
   496     }
   497     //}}}
   498 
   499 
   500     /* main loop */
   501 
   502     //{{{
   503     var finished = false
   504     while (!finished) {
   505       receiveWithin(delay_commands_changed.flush_timeout) {
   506         case TIMEOUT => delay_commands_changed.flush()
   507 
   508         case Start(args) if prover.isEmpty =>
   509           if (phase == Session.Inactive || phase == Session.Failed) {
   510             phase = Session.Startup
   511             prover = Some(new Isabelle_Process(receiver.invoke _, args) with Protocol)
   512           }
   513 
   514         case Stop =>
   515           if (phase == Session.Ready) {
   516             _protocol_handlers = _protocol_handlers.stop(prover.get)
   517             global_state >> (_ => Document.State.init)  // FIXME event bus!?
   518             phase = Session.Shutdown
   519             prover.get.terminate
   520             prover = None
   521             phase = Session.Inactive
   522           }
   523           finished = true
   524           receiver.cancel()
   525           reply(())
   526 
   527         case Update_Options(options) if prover.isDefined =>
   528           if (is_ready) {
   529             prover.get.options(options)
   530             handle_raw_edits(Document.Blobs.empty, Nil)
   531           }
   532           global_options.event(Session.Global_Options(options))
   533           reply(())
   534 
   535         case Cancel_Exec(exec_id) if prover.isDefined =>
   536           prover.get.cancel_exec(exec_id)
   537 
   538         case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   539           handle_raw_edits(doc_blobs, edits)
   540           reply(())
   541 
   542         case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   543           prover.get.dialog_result(serial, result)
   544           handle_output(new Isabelle_Process.Output(Protocol.Dialog_Result(id, serial, result)))
   545 
   546         case Protocol_Command(name, args) if prover.isDefined =>
   547           prover.get.protocol_command(name, args:_*)
   548 
   549         case Messages(msgs) =>
   550           msgs foreach {
   551             case input: Isabelle_Process.Input =>
   552               all_messages.event(input)
   553 
   554             case output: Isabelle_Process.Output =>
   555               if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   556               else handle_output(output)
   557               if (output.is_syslog) syslog_messages.event(output)
   558               all_messages.event(output)
   559           }
   560 
   561         case change: Session.Change
   562         if prover.isDefined && global_state().is_assigned(change.previous) =>
   563           handle_change(change)
   564 
   565         case bad if !bad.isInstanceOf[Session.Change] =>
   566           System.err.println("session_actor: ignoring bad message " + bad)
   567       }
   568     }
   569     //}}}
   570   }
   571 
   572 
   573   /* actions */
   574 
   575   def start(args: List[String])
   576   {
   577     session_actor ! Start(args)
   578   }
   579 
   580   def stop()
   581   {
   582     commands_changed_buffer !? Stop
   583     change_parser !? Stop
   584     session_actor !? Stop
   585   }
   586 
   587   def protocol_command(name: String, args: String*)
   588   { session_actor ! Protocol_Command(name, args.toList) }
   589 
   590   def cancel_exec(exec_id: Document_ID.Exec) { session_actor ! Cancel_Exec(exec_id) }
   591 
   592   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   593   { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(doc_blobs, edits) }
   594 
   595   def update_options(options: Options)
   596   { session_actor !? Update_Options(options) }
   597 
   598   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   599   { session_actor ! Session.Dialog_Result(id, serial, result) }
   600 }