src/Pure/PIDE/session.scala
author wenzelm
Mon Mar 31 15:05:24 2014 +0200 (2014-03-31 ago)
changeset 56335 8953d4cc060a
parent 56316 b1cf8ddc2e04
child 56385 76acce58aeab
permissions -rw-r--r--
store blob content within document node: aux. files that were once open are made persistent;
proper structural equality for Command.File and Symbol.Index;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.util.{Timer, TimerTask}
    12 
    13 import scala.collection.mutable
    14 import scala.collection.immutable.Queue
    15 import scala.actors.TIMEOUT
    16 import scala.actors.Actor._
    17 
    18 
    19 object Session
    20 {
    21   /* change */
    22 
    23   sealed case class Change(
    24     previous: Document.Version,
    25     doc_blobs: Document.Blobs,
    26     syntax_changed: Boolean,
    27     deps_changed: Boolean,
    28     doc_edits: List[Document.Edit_Command],
    29     version: Document.Version)
    30 
    31 
    32   /* events */
    33 
    34   //{{{
    35   case class Statistics(props: Properties.T)
    36   case class Global_Options(options: Options)
    37   case object Caret_Focus
    38   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    39   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    40   case class Commands_Changed(
    41     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    42 
    43   sealed abstract class Phase
    44   case object Inactive extends Phase
    45   case object Startup extends Phase  // transient
    46   case object Failed extends Phase
    47   case object Ready extends Phase
    48   case object Shutdown extends Phase  // transient
    49   //}}}
    50 
    51 
    52   /* protocol handlers */
    53 
    54   type Prover = Isabelle_Process with Protocol
    55 
    56   abstract class Protocol_Handler
    57   {
    58     def stop(prover: Prover): Unit = {}
    59     val functions: Map[String, (Prover, Isabelle_Process.Protocol_Output) => Boolean]
    60   }
    61 
    62   class Protocol_Handlers(
    63     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    64     functions: Map[String, Isabelle_Process.Protocol_Output => Boolean] = Map.empty)
    65   {
    66     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
    67 
    68     def add(prover: Prover, name: String): Protocol_Handlers =
    69     {
    70       val (handlers1, functions1) =
    71         handlers.get(name) match {
    72           case Some(old_handler) =>
    73             System.err.println("Redefining protocol handler: " + name)
    74             old_handler.stop(prover)
    75             (handlers - name, functions -- old_handler.functions.keys)
    76           case None => (handlers, functions)
    77         }
    78 
    79       val (handlers2, functions2) =
    80         try {
    81           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
    82           val new_functions =
    83             for ((a, f) <- new_handler.functions.toList) yield
    84               (a, (msg: Isabelle_Process.Protocol_Output) => f(prover, msg))
    85 
    86           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
    87           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
    88 
    89           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
    90         }
    91         catch {
    92           case exn: Throwable =>
    93             System.err.println("Failed to initialize protocol handler: " +
    94               name + "\n" + Exn.message(exn))
    95             (handlers1, functions1)
    96         }
    97 
    98       new Protocol_Handlers(handlers2, functions2)
    99     }
   100 
   101     def invoke(msg: Isabelle_Process.Protocol_Output): Boolean =
   102       msg.properties match {
   103         case Markup.Function(a) if functions.isDefinedAt(a) =>
   104           try { functions(a)(msg) }
   105           catch {
   106             case exn: Throwable =>
   107               System.err.println("Failed invocation of protocol function: " +
   108                 quote(a) + "\n" + Exn.message(exn))
   109             false
   110           }
   111         case _ => false
   112       }
   113 
   114     def stop(prover: Prover): Protocol_Handlers =
   115     {
   116       for ((_, handler) <- handlers) handler.stop(prover)
   117       new Protocol_Handlers()
   118     }
   119   }
   120 }
   121 
   122 
   123 class Session(val resources: Resources)
   124 {
   125   /* global flags */
   126 
   127   @volatile var timing: Boolean = false
   128   @volatile var verbose: Boolean = false
   129 
   130 
   131   /* tuning parameters */
   132 
   133   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   134   def message_delay: Time = Time.seconds(0.1)  // prover input/output messages
   135   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   136   def prune_size: Int = 0  // size of retained history
   137   def syslog_limit: Int = 100
   138   def reparse_limit: Int = 0
   139 
   140 
   141   /* pervasive event buses */
   142 
   143   val statistics = new Event_Bus[Session.Statistics]
   144   val global_options = new Event_Bus[Session.Global_Options]
   145   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
   146   val raw_edits = new Event_Bus[Session.Raw_Edits]
   147   val commands_changed = new Event_Bus[Session.Commands_Changed]
   148   val phase_changed = new Event_Bus[Session.Phase]
   149   val syslog_messages = new Event_Bus[Isabelle_Process.Output]
   150   val raw_output_messages = new Event_Bus[Isabelle_Process.Output]
   151   val all_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
   152   val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
   153 
   154 
   155   /** buffered command changes (delay_first discipline) **/
   156 
   157   //{{{
   158   private case object Stop
   159 
   160   private val (_, commands_changed_buffer) =
   161     Simple_Thread.actor("commands_changed_buffer", daemon = true)
   162   {
   163     var finished = false
   164     while (!finished) {
   165       receive {
   166         case Stop => finished = true; reply(())
   167         case changed: Session.Commands_Changed => commands_changed.event(changed)
   168         case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
   169       }
   170     }
   171   }
   172   //}}}
   173 
   174 
   175   /** pipelined change parsing **/
   176 
   177   //{{{
   178   private case class Text_Edits(
   179     previous: Future[Document.Version],
   180     doc_blobs: Document.Blobs,
   181     text_edits: List[Document.Edit_Text],
   182     version_result: Promise[Document.Version])
   183 
   184   private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
   185   {
   186     var finished = false
   187     while (!finished) {
   188       receive {
   189         case Stop => finished = true; reply(())
   190 
   191         case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   192           val prev = previous.get_finished
   193           val change =
   194             Timing.timeit("parse_change", timing) {
   195               resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   196             }
   197           version_result.fulfill(change.version)
   198           sender ! change
   199 
   200         case bad => System.err.println("change_parser: ignoring bad message " + bad)
   201       }
   202     }
   203   }
   204   //}}}
   205 
   206 
   207 
   208   /** main protocol actor **/
   209 
   210   /* global state */
   211 
   212   private val syslog = Volatile(Queue.empty[XML.Elem])
   213   def current_syslog(): String = cat_lines(syslog().iterator.map(XML.content))
   214 
   215   @volatile private var _phase: Session.Phase = Session.Inactive
   216   private def phase_=(new_phase: Session.Phase)
   217   {
   218     _phase = new_phase
   219     phase_changed.event(new_phase)
   220   }
   221   def phase = _phase
   222   def is_ready: Boolean = phase == Session.Ready
   223 
   224   private val global_state = Volatile(Document.State.init)
   225   def current_state(): Document.State = global_state()
   226 
   227   def recent_syntax(): Outer_Syntax =
   228   {
   229     val version = current_state().recent_finished.version.get_finished
   230     if (version.is_init) resources.base_syntax
   231     else version.syntax
   232   }
   233 
   234   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   235       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   236     global_state().snapshot(name, pending_edits)
   237 
   238 
   239   /* protocol handlers */
   240 
   241   @volatile private var _protocol_handlers = new Session.Protocol_Handlers()
   242 
   243   def protocol_handler(name: String): Option[Session.Protocol_Handler] =
   244     _protocol_handlers.get(name)
   245 
   246 
   247   /* theory files */
   248 
   249   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   250   {
   251     val header1 =
   252       if (resources.loaded_theories(name.theory))
   253         header.error("Cannot update finished theory " + quote(name.theory))
   254       else header
   255     (name, Document.Node.Deps(header1))
   256   }
   257 
   258 
   259   /* actor messages */
   260 
   261   private case class Start(args: List[String])
   262   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   263   private case class Protocol_Command(name: String, args: List[String])
   264   private case class Messages(msgs: List[Isabelle_Process.Message])
   265   private case class Update_Options(options: Options)
   266 
   267   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   268   {
   269     val this_actor = self
   270 
   271     var prune_next = System.currentTimeMillis() + prune_delay.ms
   272 
   273 
   274     /* buffered prover messages */
   275 
   276     object receiver
   277     {
   278       private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   279 
   280       private def flush(): Unit = synchronized {
   281         if (!buffer.isEmpty) {
   282           val msgs = buffer.toList
   283           this_actor ! Messages(msgs)
   284           buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   285         }
   286       }
   287       def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
   288         msg match {
   289           case _: Isabelle_Process.Input =>
   290             buffer += msg
   291           case output: Isabelle_Process.Protocol_Output if output.properties == Markup.Flush =>
   292             flush()
   293           case output: Isabelle_Process.Output =>
   294             buffer += msg
   295             if (output.is_syslog)
   296               syslog >> (queue =>
   297                 {
   298                   val queue1 = queue.enqueue(output.message)
   299                   if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   300                 })
   301         }
   302       }
   303 
   304       private val timer = new Timer("session_actor.receiver", true)
   305       timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   306 
   307       def cancel() { timer.cancel() }
   308     }
   309 
   310     var prover: Option[Session.Prover] = None
   311 
   312 
   313     /* delayed command changes */
   314 
   315     object delay_commands_changed
   316     {
   317       private var changed_assignment: Boolean = false
   318       private var changed_nodes: Set[Document.Node.Name] = Set.empty
   319       private var changed_commands: Set[Command] = Set.empty
   320 
   321       private var flush_time: Option[Long] = None
   322 
   323       def flush_timeout: Long =
   324         flush_time match {
   325           case None => 5000L
   326           case Some(time) => (time - System.currentTimeMillis()) max 0
   327         }
   328 
   329       def flush()
   330       {
   331         if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
   332           commands_changed_buffer !
   333             Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
   334         changed_assignment = false
   335         changed_nodes = Set.empty
   336         changed_commands = Set.empty
   337         flush_time = None
   338       }
   339 
   340       def invoke(assign: Boolean, commands: List[Command])
   341       {
   342         changed_assignment |= assign
   343         for (command <- commands) {
   344           changed_nodes += command.node_name
   345           changed_commands += command
   346         }
   347         val now = System.currentTimeMillis()
   348         flush_time match {
   349           case None => flush_time = Some(now + output_delay.ms)
   350           case Some(time) => if (now >= time) flush()
   351         }
   352       }
   353     }
   354 
   355 
   356     /* raw edits */
   357 
   358     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   359     //{{{
   360     {
   361       prover.get.discontinue_execution()
   362 
   363       val previous = global_state().history.tip.version
   364       val version = Future.promise[Document.Version]
   365       val change = global_state >>> (_.continue_history(previous, edits, version))
   366 
   367       raw_edits.event(Session.Raw_Edits(doc_blobs, edits))
   368       change_parser ! Text_Edits(previous, doc_blobs, edits, version)
   369     }
   370     //}}}
   371 
   372 
   373     /* resulting changes */
   374 
   375     def handle_change(change: Session.Change)
   376     //{{{
   377     {
   378       def id_command(command: Command)
   379       {
   380         for {
   381           digest <- command.blobs_digests
   382           if !global_state().defined_blob(digest)
   383         } {
   384           change.doc_blobs.get(digest) match {
   385             case Some(blob) =>
   386               global_state >> (_.define_blob(digest))
   387               prover.get.define_blob(digest, blob.bytes)
   388             case None =>
   389               System.err.println("Missing blob for SHA1 digest " + digest)
   390           }
   391         }
   392 
   393         if (!global_state().defined_command(command.id)) {
   394           global_state >> (_.define_command(command))
   395           prover.get.define_command(command)
   396         }
   397       }
   398       change.doc_edits foreach {
   399         case (_, edit) =>
   400           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   401       }
   402 
   403       val assignment = global_state().the_assignment(change.previous).check_finished
   404       global_state >> (_.define_version(change.version, assignment))
   405       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   406       resources.commit(change)
   407     }
   408     //}}}
   409 
   410 
   411     /* prover output */
   412 
   413     def handle_output(output: Isabelle_Process.Output)
   414     //{{{
   415     {
   416       def bad_output()
   417       {
   418         if (verbose)
   419           System.err.println("Ignoring prover output: " + output.message.toString)
   420       }
   421 
   422       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   423       {
   424         try {
   425           val st = global_state >>> (_.accumulate(state_id, message))
   426           delay_commands_changed.invoke(false, List(st.command))
   427         }
   428         catch {
   429           case _: Document.State.Fail => bad_output()
   430         }
   431       }
   432 
   433       output match {
   434         case msg: Isabelle_Process.Protocol_Output =>
   435           val handled = _protocol_handlers.invoke(msg)
   436           if (!handled) {
   437             msg.properties match {
   438               case Markup.Protocol_Handler(name) =>
   439                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
   440 
   441               case Protocol.Command_Timing(state_id, timing) =>
   442                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   443                 accumulate(state_id, prover.get.xml_cache.elem(message))
   444 
   445               case Markup.Assign_Update =>
   446                 msg.text match {
   447                   case Protocol.Assign_Update(id, update) =>
   448                     try {
   449                       val cmds = global_state >>> (_.assign(id, update))
   450                       delay_commands_changed.invoke(true, cmds)
   451                     }
   452                     catch { case _: Document.State.Fail => bad_output() }
   453                   case _ => bad_output()
   454                 }
   455                 // FIXME separate timeout event/message!?
   456                 if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   457                   val old_versions = global_state >>> (_.prune_history(prune_size))
   458                   if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   459                   prune_next = System.currentTimeMillis() + prune_delay.ms
   460                 }
   461 
   462               case Markup.Removed_Versions =>
   463                 msg.text match {
   464                   case Protocol.Removed(removed) =>
   465                     try {
   466                       global_state >> (_.removed_versions(removed))
   467                     }
   468                     catch { case _: Document.State.Fail => bad_output() }
   469                   case _ => bad_output()
   470                 }
   471 
   472               case Markup.ML_Statistics(props) =>
   473                 statistics.event(Session.Statistics(props))
   474 
   475               case Markup.Task_Statistics(props) =>
   476                 // FIXME
   477 
   478               case _ => bad_output()
   479             }
   480           }
   481         case _ =>
   482           output.properties match {
   483             case Position.Id(state_id) =>
   484               accumulate(state_id, output.message)
   485 
   486             case _ if output.is_init =>
   487               phase = Session.Ready
   488 
   489             case Markup.Return_Code(rc) if output.is_exit =>
   490               if (rc == 0) phase = Session.Inactive
   491               else phase = Session.Failed
   492 
   493             case _ => raw_output_messages.event(output)
   494           }
   495         }
   496     }
   497     //}}}
   498 
   499 
   500     /* main loop */
   501 
   502     //{{{
   503     var finished = false
   504     while (!finished) {
   505       receiveWithin(delay_commands_changed.flush_timeout) {
   506         case TIMEOUT => delay_commands_changed.flush()
   507 
   508         case Start(args) if prover.isEmpty =>
   509           if (phase == Session.Inactive || phase == Session.Failed) {
   510             phase = Session.Startup
   511             prover = Some(new Isabelle_Process(receiver.invoke _, args) with Protocol)
   512           }
   513 
   514         case Stop =>
   515           if (phase == Session.Ready) {
   516             _protocol_handlers = _protocol_handlers.stop(prover.get)
   517             global_state >> (_ => Document.State.init)  // FIXME event bus!?
   518             phase = Session.Shutdown
   519             prover.get.terminate
   520             prover = None
   521             phase = Session.Inactive
   522           }
   523           finished = true
   524           receiver.cancel()
   525           reply(())
   526 
   527         case Update_Options(options) if prover.isDefined =>
   528           if (is_ready) {
   529             prover.get.options(options)
   530             handle_raw_edits(Document.Blobs.empty, Nil)
   531           }
   532           global_options.event(Session.Global_Options(options))
   533           reply(())
   534 
   535         case Cancel_Exec(exec_id) if prover.isDefined =>
   536           prover.get.cancel_exec(exec_id)
   537 
   538         case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   539           handle_raw_edits(doc_blobs, edits)
   540           reply(())
   541 
   542         case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   543           prover.get.dialog_result(serial, result)
   544           handle_output(new Isabelle_Process.Output(Protocol.Dialog_Result(id, serial, result)))
   545 
   546         case Protocol_Command(name, args) if prover.isDefined =>
   547           prover.get.protocol_command(name, args:_*)
   548 
   549         case Messages(msgs) =>
   550           msgs foreach {
   551             case input: Isabelle_Process.Input =>
   552               all_messages.event(input)
   553 
   554             case output: Isabelle_Process.Output =>
   555               if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   556               else handle_output(output)
   557               if (output.is_syslog) syslog_messages.event(output)
   558               all_messages.event(output)
   559           }
   560 
   561         case change: Session.Change
   562         if prover.isDefined && global_state().is_assigned(change.previous) =>
   563           handle_change(change)
   564 
   565         case bad if !bad.isInstanceOf[Session.Change] =>
   566           System.err.println("session_actor: ignoring bad message " + bad)
   567       }
   568     }
   569     //}}}
   570   }
   571 
   572 
   573   /* actions */
   574 
   575   def start(args: List[String])
   576   {
   577     session_actor ! Start(args)
   578   }
   579 
   580   def stop()
   581   {
   582     commands_changed_buffer !? Stop
   583     change_parser !? Stop
   584     session_actor !? Stop
   585   }
   586 
   587   def protocol_command(name: String, args: String*)
   588   { session_actor ! Protocol_Command(name, args.toList) }
   589 
   590   def cancel_exec(exec_id: Document_ID.Exec) { session_actor ! Cancel_Exec(exec_id) }
   591 
   592   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   593   { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(doc_blobs, edits) }
   594 
   595   def update_options(options: Options)
   596   { session_actor !? Update_Options(options) }
   597 
   598   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   599   { session_actor ! Session.Dialog_Result(id, serial, result) }
   600 }