src/Pure/PIDE/session.scala
author wenzelm
Thu Apr 24 23:13:17 2014 +0200 (2014-04-24 ago)
changeset 56710 8f102c18eab7
parent 56709 e83356e94380
child 56711 ef3d00153e3b
permissions -rw-r--r--
more uniform warning/error handling, potentially with propagation to send_wait caller;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.util.{Timer, TimerTask}
    12 
    13 import scala.collection.mutable
    14 import scala.collection.immutable.Queue
    15 
    16 
    17 object Session
    18 {
    19   /* change */
    20 
    21   sealed case class Change(
    22     previous: Document.Version,
    23     doc_blobs: Document.Blobs,
    24     syntax_changed: Boolean,
    25     deps_changed: Boolean,
    26     doc_edits: List[Document.Edit_Command],
    27     version: Document.Version)
    28 
    29 
    30   /* events */
    31 
    32   //{{{
    33   case class Statistics(props: Properties.T)
    34   case class Global_Options(options: Options)
    35   case object Caret_Focus
    36   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    37   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    38   case class Commands_Changed(
    39     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    40 
    41   sealed abstract class Phase
    42   case object Inactive extends Phase
    43   case object Startup extends Phase  // transient
    44   case object Failed extends Phase
    45   case object Ready extends Phase
    46   case object Shutdown extends Phase  // transient
    47   //}}}
    48 
    49 
    50   /* protocol handlers */
    51 
    52   abstract class Protocol_Handler
    53   {
    54     def stop(prover: Prover): Unit = {}
    55     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
    56   }
    57 
    58   class Protocol_Handlers(
    59     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    60     functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty)
    61   {
    62     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
    63 
    64     def add(prover: Prover, name: String): Protocol_Handlers =
    65     {
    66       val (handlers1, functions1) =
    67         handlers.get(name) match {
    68           case Some(old_handler) =>
    69             System.err.println("Redefining protocol handler: " + name)
    70             old_handler.stop(prover)
    71             (handlers - name, functions -- old_handler.functions.keys)
    72           case None => (handlers, functions)
    73         }
    74 
    75       val (handlers2, functions2) =
    76         try {
    77           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
    78           val new_functions =
    79             for ((a, f) <- new_handler.functions.toList) yield
    80               (a, (msg: Prover.Protocol_Output) => f(prover, msg))
    81 
    82           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
    83           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
    84 
    85           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
    86         }
    87         catch {
    88           case exn: Throwable =>
    89             System.err.println(Exn.error_message(
    90               "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn)))
    91             (handlers1, functions1)
    92         }
    93 
    94       new Protocol_Handlers(handlers2, functions2)
    95     }
    96 
    97     def invoke(msg: Prover.Protocol_Output): Boolean =
    98       msg.properties match {
    99         case Markup.Function(a) if functions.isDefinedAt(a) =>
   100           try { functions(a)(msg) }
   101           catch {
   102             case exn: Throwable =>
   103               System.err.println(Exn.error_message(
   104                 "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn)))
   105             false
   106           }
   107         case _ => false
   108       }
   109 
   110     def stop(prover: Prover): Protocol_Handlers =
   111     {
   112       for ((_, handler) <- handlers) handler.stop(prover)
   113       new Protocol_Handlers()
   114     }
   115   }
   116 }
   117 
   118 
   119 class Session(val resources: Resources)
   120 {
   121   /* global flags */
   122 
   123   @volatile var timing: Boolean = false
   124   @volatile var verbose: Boolean = false
   125 
   126 
   127   /* tuning parameters */
   128 
   129   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   130   def message_delay: Time = Time.seconds(0.1)  // prover input/output messages
   131   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   132   def prune_size: Int = 0  // size of retained history
   133   def syslog_limit: Int = 100
   134   def reparse_limit: Int = 0
   135 
   136 
   137   /* pervasive event buses */
   138 
   139   val statistics = new Event_Bus[Session.Statistics]
   140   val global_options = new Event_Bus[Session.Global_Options]
   141   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
   142   val raw_edits = new Event_Bus[Session.Raw_Edits]
   143   val commands_changed = new Event_Bus[Session.Commands_Changed]
   144   val phase_changed = new Event_Bus[Session.Phase]
   145   val syslog_messages = new Event_Bus[Prover.Output]
   146   val raw_output_messages = new Event_Bus[Prover.Output]
   147   val all_messages = new Event_Bus[Prover.Message]  // potential bottle-neck
   148   val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
   149 
   150 
   151 
   152   /** buffered changes: to be dispatched to clients **/
   153 
   154   private case class Received_Change(assignment: Boolean, commands: List[Command])
   155 
   156   private val change_buffer: Consumer_Thread[Received_Change] =
   157   {
   158     object changed
   159     {
   160       private var assignment: Boolean = false
   161       private var nodes: Set[Document.Node.Name] = Set.empty
   162       private var commands: Set[Command] = Set.empty
   163 
   164       def flush(): Unit = synchronized {
   165         if (assignment || !nodes.isEmpty || !commands.isEmpty)
   166           commands_changed.event(Session.Commands_Changed(assignment, nodes, commands))
   167         assignment = false
   168         nodes = Set.empty
   169         commands = Set.empty
   170       }
   171 
   172       def invoke(change: Received_Change): Unit = synchronized {
   173         assignment |= change.assignment
   174         for (command <- change.commands) {
   175           nodes += command.node_name
   176           commands += command
   177         }
   178       }
   179     }
   180 
   181     val timer = new Timer("change_buffer", true)
   182     timer.schedule(new TimerTask { def run = changed.flush() }, output_delay.ms, output_delay.ms)
   183 
   184     Consumer_Thread.fork[Received_Change]("change_buffer", daemon = true)(
   185       consume = { case change => changed.invoke(change); true },
   186       finish = () => { timer.cancel(); changed.flush() }
   187     )
   188   }
   189 
   190 
   191 
   192   /** pipelined change parsing **/
   193 
   194   private case class Text_Edits(
   195     previous: Future[Document.Version],
   196     doc_blobs: Document.Blobs,
   197     text_edits: List[Document.Edit_Text],
   198     version_result: Promise[Document.Version])
   199 
   200   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   201   {
   202     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   203       val prev = previous.get_finished
   204       val change =
   205         Timing.timeit("parse_change", timing) {
   206           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   207         }
   208       version_result.fulfill(change.version)
   209       manager.send(change)
   210       true
   211   }
   212 
   213 
   214 
   215   /** main protocol manager **/
   216 
   217   /* global state */
   218 
   219   private val syslog = Synchronized(Queue.empty[XML.Elem])
   220   def current_syslog(): String = cat_lines(syslog.value.iterator.map(XML.content))
   221 
   222   @volatile private var _phase: Session.Phase = Session.Inactive
   223   private def phase_=(new_phase: Session.Phase)
   224   {
   225     _phase = new_phase
   226     phase_changed.event(new_phase)
   227   }
   228   def phase = _phase
   229   def is_ready: Boolean = phase == Session.Ready
   230 
   231   private val global_state = Synchronized(Document.State.init)
   232   def current_state(): Document.State = global_state.value
   233 
   234   def recent_syntax(): Prover.Syntax =
   235   {
   236     val version = current_state().recent_finished.version.get_finished
   237     version.syntax getOrElse resources.base_syntax
   238   }
   239 
   240   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   241       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   242     global_state.value.snapshot(name, pending_edits)
   243 
   244 
   245   /* protocol handlers */
   246 
   247   @volatile private var _protocol_handlers = new Session.Protocol_Handlers()
   248 
   249   def protocol_handler(name: String): Option[Session.Protocol_Handler] =
   250     _protocol_handlers.get(name)
   251 
   252 
   253   /* theory files */
   254 
   255   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   256   {
   257     val header1 =
   258       if (resources.loaded_theories(name.theory))
   259         header.error("Cannot update finished theory " + quote(name.theory))
   260       else header
   261     (name, Document.Node.Deps(header1))
   262   }
   263 
   264 
   265   /* internal messages */
   266 
   267   private case class Start(name: String, args: List[String])
   268   private case object Stop
   269   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   270   private case class Protocol_Command(name: String, args: List[String])
   271   private case class Messages(msgs: List[Prover.Message])
   272   private case class Update_Options(options: Options)
   273 
   274 
   275   /* buffered prover messages */
   276 
   277   private object receiver
   278   {
   279     private var buffer = new mutable.ListBuffer[Prover.Message]
   280 
   281     private def flush(): Unit = synchronized {
   282       if (!buffer.isEmpty) {
   283         val msgs = buffer.toList
   284         manager.send(Messages(msgs))
   285         buffer = new mutable.ListBuffer[Prover.Message]
   286       }
   287     }
   288 
   289     def invoke(msg: Prover.Message): Unit = synchronized {
   290       msg match {
   291         case _: Prover.Input =>
   292           buffer += msg
   293         case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
   294           flush()
   295         case output: Prover.Output =>
   296           buffer += msg
   297           if (output.is_syslog)
   298             syslog.change(queue =>
   299               {
   300                 val queue1 = queue.enqueue(output.message)
   301                 if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   302               })
   303       }
   304     }
   305 
   306     private val timer = new Timer("receiver", true)
   307     timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   308 
   309     def shutdown() { timer.cancel(); flush() }
   310   }
   311 
   312 
   313   /* postponed changes */
   314 
   315   private object postponed_changes
   316   {
   317     private var postponed: List[Session.Change] = Nil
   318 
   319     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   320 
   321     def flush(): Unit = synchronized {
   322       val state = global_state.value
   323       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   324       postponed = unassigned
   325       assigned.reverseIterator.foreach(change => manager.send(change))
   326     }
   327   }
   328 
   329 
   330   /* manager thread */
   331 
   332   private val manager: Consumer_Thread[Any] =
   333   {
   334     var prune_next = Time.now() + prune_delay
   335 
   336     var prover: Option[Prover] = None
   337 
   338 
   339     /* raw edits */
   340 
   341     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   342     //{{{
   343     {
   344       prover.get.discontinue_execution()
   345 
   346       val previous = global_state.value.history.tip.version
   347       val version = Future.promise[Document.Version]
   348       val change = global_state.change_result(_.continue_history(previous, edits, version))
   349 
   350       raw_edits.event(Session.Raw_Edits(doc_blobs, edits))
   351       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   352     }
   353     //}}}
   354 
   355 
   356     /* resulting changes */
   357 
   358     def handle_change(change: Session.Change)
   359     //{{{
   360     {
   361       def id_command(command: Command)
   362       {
   363         for {
   364           digest <- command.blobs_digests
   365           if !global_state.value.defined_blob(digest)
   366         } {
   367           change.doc_blobs.get(digest) match {
   368             case Some(blob) =>
   369               global_state.change(_.define_blob(digest))
   370               prover.get.define_blob(digest, blob.bytes)
   371             case None =>
   372               System.err.println("Missing blob for SHA1 digest " + digest)
   373           }
   374         }
   375 
   376         if (!global_state.value.defined_command(command.id)) {
   377           global_state.change(_.define_command(command))
   378           prover.get.define_command(command)
   379         }
   380       }
   381       change.doc_edits foreach {
   382         case (_, edit) =>
   383           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   384       }
   385 
   386       val assignment = global_state.value.the_assignment(change.previous).check_finished
   387       global_state.change(_.define_version(change.version, assignment))
   388       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   389       resources.commit(change)
   390     }
   391     //}}}
   392 
   393 
   394     /* prover output */
   395 
   396     def handle_output(output: Prover.Output)
   397     //{{{
   398     {
   399       def bad_output()
   400       {
   401         if (verbose)
   402           System.err.println("Ignoring prover output: " + output.message.toString)
   403       }
   404 
   405       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   406       {
   407         try {
   408           val st = global_state.change_result(_.accumulate(state_id, message))
   409           change_buffer.send(Received_Change(false, List(st.command)))
   410         }
   411         catch {
   412           case _: Document.State.Fail => bad_output()
   413         }
   414       }
   415 
   416       output match {
   417         case msg: Prover.Protocol_Output =>
   418           val handled = _protocol_handlers.invoke(msg)
   419           if (!handled) {
   420             msg.properties match {
   421               case Markup.Protocol_Handler(name) =>
   422                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
   423 
   424               case Protocol.Command_Timing(state_id, timing) =>
   425                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   426                 accumulate(state_id, prover.get.xml_cache.elem(message))
   427 
   428               case Markup.Assign_Update =>
   429                 msg.text match {
   430                   case Protocol.Assign_Update(id, update) =>
   431                     try {
   432                       val cmds = global_state.change_result(_.assign(id, update))
   433                       change_buffer.send(Received_Change(true, cmds))
   434                     }
   435                     catch { case _: Document.State.Fail => bad_output() }
   436                     postponed_changes.flush()
   437                   case _ => bad_output()
   438                 }
   439                 // FIXME separate timeout event/message!?
   440                 if (prover.isDefined && Time.now() > prune_next) {
   441                   val old_versions = global_state.change_result(_.prune_history(prune_size))
   442                   if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   443                   prune_next = Time.now() + prune_delay
   444                 }
   445 
   446               case Markup.Removed_Versions =>
   447                 msg.text match {
   448                   case Protocol.Removed(removed) =>
   449                     try {
   450                       global_state.change(_.removed_versions(removed))
   451                     }
   452                     catch { case _: Document.State.Fail => bad_output() }
   453                   case _ => bad_output()
   454                 }
   455 
   456               case Markup.ML_Statistics(props) =>
   457                 statistics.event(Session.Statistics(props))
   458 
   459               case Markup.Task_Statistics(props) =>
   460                 // FIXME
   461 
   462               case _ => bad_output()
   463             }
   464           }
   465         case _ =>
   466           output.properties match {
   467             case Position.Id(state_id) =>
   468               accumulate(state_id, output.message)
   469 
   470             case _ if output.is_init =>
   471               phase = Session.Ready
   472 
   473             case Markup.Return_Code(rc) if output.is_exit =>
   474               if (rc == 0) phase = Session.Inactive
   475               else phase = Session.Failed
   476 
   477             case _ => raw_output_messages.event(output)
   478           }
   479         }
   480     }
   481     //}}}
   482 
   483 
   484     /* main thread */
   485 
   486     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   487     {
   488       case arg: Any =>
   489         //{{{
   490         arg match {
   491           case Start(name, args) if prover.isEmpty =>
   492             if (phase == Session.Inactive || phase == Session.Failed) {
   493               phase = Session.Startup
   494               prover = Some(resources.start_prover(receiver.invoke _, name, args))
   495             }
   496 
   497           case Stop =>
   498             if (phase == Session.Ready) {
   499               _protocol_handlers = _protocol_handlers.stop(prover.get)
   500               global_state.change(_ => Document.State.init)  // FIXME event bus!?
   501               phase = Session.Shutdown
   502               prover.get.terminate
   503               prover = None
   504               phase = Session.Inactive
   505             }
   506 
   507           case Update_Options(options) =>
   508             if (prover.isDefined && is_ready) {
   509               prover.get.options(options)
   510               handle_raw_edits(Document.Blobs.empty, Nil)
   511             }
   512             global_options.event(Session.Global_Options(options))
   513 
   514           case Cancel_Exec(exec_id) if prover.isDefined =>
   515             prover.get.cancel_exec(exec_id)
   516 
   517           case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   518             handle_raw_edits(doc_blobs, edits)
   519 
   520           case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   521             prover.get.dialog_result(serial, result)
   522             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   523 
   524           case Protocol_Command(name, args) if prover.isDefined =>
   525             prover.get.protocol_command(name, args:_*)
   526 
   527           case Messages(msgs) =>
   528             msgs foreach {
   529               case input: Prover.Input =>
   530                 all_messages.event(input)
   531 
   532               case output: Prover.Output =>
   533                 if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   534                 else handle_output(output)
   535                 if (output.is_syslog) syslog_messages.event(output)
   536                 all_messages.event(output)
   537             }
   538 
   539           case change: Session.Change if prover.isDefined =>
   540             if (global_state.value.is_assigned(change.previous))
   541               handle_change(change)
   542             else postponed_changes.store(change)
   543         }
   544         true
   545         //}}}
   546     }
   547   }
   548 
   549 
   550   /* actions */
   551 
   552   def start(name: String, args: List[String])
   553   { manager.send(Start(name, args)) }
   554 
   555   def stop()
   556   {
   557     manager.send_wait(Stop)
   558     receiver.shutdown()
   559     change_parser.shutdown()
   560     change_buffer.shutdown()
   561     manager.shutdown()
   562   }
   563 
   564   def protocol_command(name: String, args: String*)
   565   { manager.send(Protocol_Command(name, args.toList)) }
   566 
   567   def cancel_exec(exec_id: Document_ID.Exec)
   568   { manager.send(Cancel_Exec(exec_id)) }
   569 
   570   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   571   { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   572 
   573   def update_options(options: Options)
   574   { manager.send_wait(Update_Options(options)) }
   575 
   576   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   577   { manager.send(Session.Dialog_Result(id, serial, result)) }
   578 }