src/Pure/PIDE/session.scala
author wenzelm
Sat Apr 26 00:20:26 2014 +0200 (2014-04-26 ago)
changeset 56735 9923e362789c
parent 56733 f7700146678d
child 56771 28d62a5b07e8
permissions -rw-r--r--
tuned -- potentially more robust;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.util.{Timer, TimerTask}
    12 
    13 import scala.collection.mutable
    14 import scala.collection.immutable.Queue
    15 
    16 
    17 object Session
    18 {
    19   /* outlets */
    20 
    21   object Consumer
    22   {
    23     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    24       new Consumer[A](name, consume)
    25   }
    26   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    27 
    28   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    29   {
    30     private val consumers = Synchronized(List.empty[Consumer[A]])
    31 
    32     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    33     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    34 
    35     def post(a: A)
    36     {
    37       for (c <- consumers.value.iterator) {
    38         dispatcher.send(() =>
    39           try { c.consume(a) }
    40           catch {
    41             case exn: Throwable =>
    42               System.err.println("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    43           })
    44       }
    45     }
    46   }
    47 
    48 
    49   /* change */
    50 
    51   sealed case class Change(
    52     previous: Document.Version,
    53     doc_blobs: Document.Blobs,
    54     syntax_changed: Boolean,
    55     deps_changed: Boolean,
    56     doc_edits: List[Document.Edit_Command],
    57     version: Document.Version)
    58 
    59 
    60   /* events */
    61 
    62   //{{{
    63   case class Statistics(props: Properties.T)
    64   case class Global_Options(options: Options)
    65   case object Caret_Focus
    66   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    67   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    68   case class Commands_Changed(
    69     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    70 
    71   sealed abstract class Phase
    72   case object Inactive extends Phase
    73   case object Startup extends Phase  // transient
    74   case object Failed extends Phase
    75   case object Ready extends Phase
    76   case object Shutdown extends Phase  // transient
    77   //}}}
    78 
    79 
    80   /* protocol handlers */
    81 
    82   abstract class Protocol_Handler
    83   {
    84     def stop(prover: Prover): Unit = {}
    85     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
    86   }
    87 
    88   class Protocol_Handlers(
    89     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    90     functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty)
    91   {
    92     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
    93 
    94     def add(prover: Prover, name: String): Protocol_Handlers =
    95     {
    96       val (handlers1, functions1) =
    97         handlers.get(name) match {
    98           case Some(old_handler) =>
    99             System.err.println("Redefining protocol handler: " + name)
   100             old_handler.stop(prover)
   101             (handlers - name, functions -- old_handler.functions.keys)
   102           case None => (handlers, functions)
   103         }
   104 
   105       val (handlers2, functions2) =
   106         try {
   107           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
   108           val new_functions =
   109             for ((a, f) <- new_handler.functions.toList) yield
   110               (a, (msg: Prover.Protocol_Output) => f(prover, msg))
   111 
   112           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
   113           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
   114 
   115           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
   116         }
   117         catch {
   118           case exn: Throwable =>
   119             System.err.println(Exn.error_message(
   120               "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn)))
   121             (handlers1, functions1)
   122         }
   123 
   124       new Protocol_Handlers(handlers2, functions2)
   125     }
   126 
   127     def invoke(msg: Prover.Protocol_Output): Boolean =
   128       msg.properties match {
   129         case Markup.Function(a) if functions.isDefinedAt(a) =>
   130           try { functions(a)(msg) }
   131           catch {
   132             case exn: Throwable =>
   133               System.err.println(Exn.error_message(
   134                 "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn)))
   135             false
   136           }
   137         case _ => false
   138       }
   139 
   140     def stop(prover: Prover): Protocol_Handlers =
   141     {
   142       for ((_, handler) <- handlers) handler.stop(prover)
   143       new Protocol_Handlers()
   144     }
   145   }
   146 }
   147 
   148 
   149 class Session(val resources: Resources)
   150 {
   151   /* global flags */
   152 
   153   @volatile var timing: Boolean = false
   154   @volatile var verbose: Boolean = false
   155 
   156 
   157   /* tuning parameters */
   158 
   159   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   160   def message_delay: Time = Time.seconds(0.1)  // prover input/output messages
   161   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   162   def prune_size: Int = 0  // size of retained history
   163   def syslog_limit: Int = 100
   164   def reparse_limit: Int = 0
   165 
   166 
   167   /* outlets */
   168 
   169   private val dispatcher =
   170     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   171 
   172   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   173   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   174   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   175   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   176   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   177   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   178   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   179   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   180   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
   181   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   182 
   183 
   184 
   185   /** pipelined change parsing **/
   186 
   187   private case class Text_Edits(
   188     previous: Future[Document.Version],
   189     doc_blobs: Document.Blobs,
   190     text_edits: List[Document.Edit_Text],
   191     version_result: Promise[Document.Version])
   192 
   193   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   194   {
   195     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   196       val prev = previous.get_finished
   197       val change =
   198         Timing.timeit("parse_change", timing) {
   199           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   200         }
   201       version_result.fulfill(change.version)
   202       manager.send(change)
   203       true
   204   }
   205 
   206 
   207 
   208   /** main protocol manager **/
   209 
   210   /* global state */
   211 
   212   private val syslog = Synchronized(Queue.empty[XML.Elem])
   213   def current_syslog(): String = cat_lines(syslog.value.iterator.map(XML.content))
   214 
   215   @volatile private var _phase: Session.Phase = Session.Inactive
   216   private def phase_=(new_phase: Session.Phase)
   217   {
   218     _phase = new_phase
   219     phase_changed.post(new_phase)
   220   }
   221   def phase = _phase
   222   def is_ready: Boolean = phase == Session.Ready
   223 
   224   private val global_state = Synchronized(Document.State.init)
   225   def current_state(): Document.State = global_state.value
   226 
   227   def recent_syntax(): Prover.Syntax =
   228   {
   229     val version = current_state().recent_finished.version.get_finished
   230     version.syntax getOrElse resources.base_syntax
   231   }
   232 
   233   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   234       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   235     global_state.value.snapshot(name, pending_edits)
   236 
   237 
   238   /* protocol handlers */
   239 
   240   @volatile private var _protocol_handlers = new Session.Protocol_Handlers()
   241 
   242   def protocol_handler(name: String): Option[Session.Protocol_Handler] =
   243     _protocol_handlers.get(name)
   244 
   245 
   246   /* theory files */
   247 
   248   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   249   {
   250     val header1 =
   251       if (resources.loaded_theories(name.theory))
   252         header.error("Cannot update finished theory " + quote(name.theory))
   253       else header
   254     (name, Document.Node.Deps(header1))
   255   }
   256 
   257 
   258   /* internal messages */
   259 
   260   private case class Start(name: String, args: List[String])
   261   private case object Stop
   262   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   263   private case class Protocol_Command(name: String, args: List[String])
   264   private case class Update_Options(options: Options)
   265 
   266 
   267   /* buffered changes */
   268 
   269   private object change_buffer
   270   {
   271     private var assignment: Boolean = false
   272     private var nodes: Set[Document.Node.Name] = Set.empty
   273     private var commands: Set[Command] = Set.empty
   274 
   275     def flush(): Unit = synchronized {
   276       if (assignment || !nodes.isEmpty || !commands.isEmpty)
   277         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   278       assignment = false
   279       nodes = Set.empty
   280       commands = Set.empty
   281     }
   282 
   283     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   284       assignment |= assign
   285       for (command <- cmds) {
   286         nodes += command.node_name
   287         commands += command
   288       }
   289     }
   290 
   291     private val timer = new Timer("change_buffer", true)
   292     timer.schedule(new TimerTask { def run = flush() }, output_delay.ms, output_delay.ms)
   293 
   294     def shutdown()
   295     {
   296       timer.cancel()
   297       flush()
   298     }
   299   }
   300 
   301 
   302   /* postponed changes */
   303 
   304   private object postponed_changes
   305   {
   306     private var postponed: List[Session.Change] = Nil
   307 
   308     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   309 
   310     def flush(): Unit = synchronized {
   311       val state = global_state.value
   312       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   313       postponed = unassigned
   314       assigned.reverseIterator.foreach(change => manager.send(change))
   315     }
   316   }
   317 
   318 
   319   /* manager thread */
   320 
   321   private val manager: Consumer_Thread[Any] =
   322   {
   323     var prune_next = Time.now() + prune_delay
   324 
   325     var prover: Option[Prover] = None
   326 
   327 
   328     /* raw edits */
   329 
   330     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   331     //{{{
   332     {
   333       require(prover.isDefined)
   334 
   335       prover.get.discontinue_execution()
   336 
   337       val previous = global_state.value.history.tip.version
   338       val version = Future.promise[Document.Version]
   339       global_state.change(_.continue_history(previous, edits, version))
   340 
   341       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   342       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   343     }
   344     //}}}
   345 
   346 
   347     /* resulting changes */
   348 
   349     def handle_change(change: Session.Change)
   350     //{{{
   351     {
   352       require(prover.isDefined)
   353 
   354       def id_command(command: Command)
   355       {
   356         for {
   357           digest <- command.blobs_digests
   358           if !global_state.value.defined_blob(digest)
   359         } {
   360           change.doc_blobs.get(digest) match {
   361             case Some(blob) =>
   362               global_state.change(_.define_blob(digest))
   363               prover.get.define_blob(digest, blob.bytes)
   364             case None =>
   365               System.err.println("Missing blob for SHA1 digest " + digest)
   366           }
   367         }
   368 
   369         if (!global_state.value.defined_command(command.id)) {
   370           global_state.change(_.define_command(command))
   371           prover.get.define_command(command)
   372         }
   373       }
   374       change.doc_edits foreach {
   375         case (_, edit) =>
   376           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   377       }
   378 
   379       val assignment = global_state.value.the_assignment(change.previous).check_finished
   380       global_state.change(_.define_version(change.version, assignment))
   381       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   382       resources.commit(change)
   383     }
   384     //}}}
   385 
   386 
   387     /* prover output */
   388 
   389     def handle_output(output: Prover.Output)
   390     //{{{
   391     {
   392       def bad_output()
   393       {
   394         if (verbose)
   395           System.err.println("Ignoring bad prover output: " + output.message.toString)
   396       }
   397 
   398       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   399       {
   400         try {
   401           val st = global_state.change_result(_.accumulate(state_id, message))
   402           change_buffer.invoke(false, List(st.command))
   403         }
   404         catch {
   405           case _: Document.State.Fail => bad_output()
   406         }
   407       }
   408 
   409       output match {
   410         case msg: Prover.Protocol_Output =>
   411           val handled = _protocol_handlers.invoke(msg)
   412           if (!handled) {
   413             msg.properties match {
   414               case Markup.Protocol_Handler(name) if prover.isDefined =>
   415                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
   416 
   417               case Protocol.Command_Timing(state_id, timing) if prover.isDefined =>
   418                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   419                 accumulate(state_id, prover.get.xml_cache.elem(message))
   420 
   421               case Markup.Assign_Update =>
   422                 msg.text match {
   423                   case Protocol.Assign_Update(id, update) =>
   424                     try {
   425                       val cmds = global_state.change_result(_.assign(id, update))
   426                       change_buffer.invoke(true, cmds)
   427                     }
   428                     catch { case _: Document.State.Fail => bad_output() }
   429                     postponed_changes.flush()
   430                   case _ => bad_output()
   431                 }
   432                 // FIXME separate timeout event/message!?
   433                 if (prover.isDefined && Time.now() > prune_next) {
   434                   val old_versions = global_state.change_result(_.prune_history(prune_size))
   435                   if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   436                   prune_next = Time.now() + prune_delay
   437                 }
   438 
   439               case Markup.Removed_Versions =>
   440                 msg.text match {
   441                   case Protocol.Removed(removed) =>
   442                     try {
   443                       global_state.change(_.removed_versions(removed))
   444                     }
   445                     catch { case _: Document.State.Fail => bad_output() }
   446                   case _ => bad_output()
   447                 }
   448 
   449               case Markup.ML_Statistics(props) =>
   450                 statistics.post(Session.Statistics(props))
   451 
   452               case Markup.Task_Statistics(props) =>
   453                 // FIXME
   454 
   455               case _ => bad_output()
   456             }
   457           }
   458         case _ =>
   459           output.properties match {
   460             case Position.Id(state_id) =>
   461               accumulate(state_id, output.message)
   462 
   463             case _ if output.is_init =>
   464               phase = Session.Ready
   465 
   466             case Markup.Return_Code(rc) if output.is_exit =>
   467               if (rc == 0) phase = Session.Inactive
   468               else phase = Session.Failed
   469               prover = None
   470 
   471             case _ => raw_output_messages.post(output)
   472           }
   473         }
   474     }
   475     //}}}
   476 
   477 
   478     /* main thread */
   479 
   480     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   481     {
   482       case arg: Any =>
   483         //{{{
   484         arg match {
   485           case output: Prover.Output =>
   486             if (output.is_stdout || output.is_stderr) raw_output_messages.post(output)
   487             else handle_output(output)
   488             if (output.is_syslog) {
   489               syslog.change(queue =>
   490                 {
   491                   val queue1 = queue.enqueue(output.message)
   492                   if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   493                 })
   494               syslog_messages.post(output)
   495             }
   496             all_messages.post(output)
   497 
   498           case input: Prover.Input =>
   499             all_messages.post(input)
   500 
   501           case Start(name, args) if prover.isEmpty =>
   502             if (phase == Session.Inactive || phase == Session.Failed) {
   503               phase = Session.Startup
   504               prover = Some(resources.start_prover(manager.send(_), name, args))
   505             }
   506 
   507           case Stop =>
   508             if (prover.isDefined && is_ready) {
   509               _protocol_handlers = _protocol_handlers.stop(prover.get)
   510               global_state.change(_ => Document.State.init)  // FIXME event bus!?
   511               phase = Session.Shutdown
   512               prover.get.terminate
   513             }
   514 
   515           case Update_Options(options) =>
   516             if (prover.isDefined && is_ready) {
   517               prover.get.options(options)
   518               handle_raw_edits(Document.Blobs.empty, Nil)
   519             }
   520             global_options.post(Session.Global_Options(options))
   521 
   522           case Cancel_Exec(exec_id) if prover.isDefined =>
   523             prover.get.cancel_exec(exec_id)
   524 
   525           case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
   526             handle_raw_edits(doc_blobs, edits)
   527 
   528           case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   529             prover.get.dialog_result(serial, result)
   530             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   531 
   532           case Protocol_Command(name, args) if prover.isDefined =>
   533             prover.get.protocol_command(name, args:_*)
   534 
   535           case change: Session.Change if prover.isDefined =>
   536             if (global_state.value.is_assigned(change.previous))
   537               handle_change(change)
   538             else postponed_changes.store(change)
   539         }
   540         true
   541         //}}}
   542     }
   543   }
   544 
   545 
   546   /* actions */
   547 
   548   def start(name: String, args: List[String])
   549   { manager.send(Start(name, args)) }
   550 
   551   def stop()
   552   {
   553     manager.send_wait(Stop)
   554     change_parser.shutdown()
   555     change_buffer.shutdown()
   556     manager.shutdown()
   557     dispatcher.shutdown()
   558   }
   559 
   560   def protocol_command(name: String, args: String*)
   561   { manager.send(Protocol_Command(name, args.toList)) }
   562 
   563   def cancel_exec(exec_id: Document_ID.Exec)
   564   { manager.send(Cancel_Exec(exec_id)) }
   565 
   566   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   567   { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   568 
   569   def update_options(options: Options)
   570   { manager.send_wait(Update_Options(options)) }
   571 
   572   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   573   { manager.send(Session.Dialog_Result(id, serial, result)) }
   574 }