src/Pure/PIDE/session.scala
author wenzelm
Mon May 05 15:17:07 2014 +0200 (2014-05-05 ago)
changeset 56864 0446c7ac2e32
parent 56799 00b13fb87b3a
child 57651 10df45dd14da
permissions -rw-r--r--
support print operations as asynchronous query;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import scala.collection.immutable.Queue
    12 
    13 
    14 object Session
    15 {
    16   /* outlets */
    17 
    18   object Consumer
    19   {
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    21       new Consumer[A](name, consume)
    22   }
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    24 
    25   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    26   {
    27     private val consumers = Synchronized(List.empty[Consumer[A]])
    28 
    29     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    30     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    31 
    32     def post(a: A)
    33     {
    34       for (c <- consumers.value.iterator) {
    35         dispatcher.send(() =>
    36           try { c.consume(a) }
    37           catch {
    38             case exn: Throwable =>
    39               Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    40           })
    41       }
    42     }
    43   }
    44 
    45 
    46   /* change */
    47 
    48   sealed case class Change(
    49     previous: Document.Version,
    50     doc_blobs: Document.Blobs,
    51     syntax_changed: Boolean,
    52     deps_changed: Boolean,
    53     doc_edits: List[Document.Edit_Command],
    54     version: Document.Version)
    55 
    56 
    57   /* events */
    58 
    59   //{{{
    60   case class Statistics(props: Properties.T)
    61   case class Global_Options(options: Options)
    62   case object Caret_Focus
    63   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    64   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    65   case class Commands_Changed(
    66     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    67 
    68   sealed abstract class Phase
    69   case object Inactive extends Phase
    70   case object Startup extends Phase  // transient
    71   case object Failed extends Phase
    72   case object Ready extends Phase
    73   case object Shutdown extends Phase  // transient
    74   //}}}
    75 
    76 
    77   /* syslog */
    78 
    79   private[Session] class Syslog(limit: Int)
    80   {
    81     private var queue = Queue.empty[XML.Elem]
    82     private var length = 0
    83 
    84     def += (msg: XML.Elem): Unit = synchronized {
    85       queue = queue.enqueue(msg)
    86       length += 1
    87       if (length > limit) queue = queue.dequeue._2
    88     }
    89 
    90     def content: String = synchronized {
    91       cat_lines(queue.iterator.map(XML.content)) +
    92       (if (length > limit) "\n(A total of " + length + " messages...)" else "")
    93     }
    94   }
    95 
    96 
    97   /* protocol handlers */
    98 
    99   abstract class Protocol_Handler
   100   {
   101     def start(prover: Prover): Unit = {}
   102     def stop(prover: Prover): Unit = {}
   103     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
   104   }
   105 
   106   class Protocol_Handlers(
   107     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
   108     functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty)
   109   {
   110     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
   111 
   112     def add(prover: Prover, name: String): Protocol_Handlers =
   113     {
   114       val (handlers1, functions1) =
   115         handlers.get(name) match {
   116           case Some(old_handler) =>
   117             Output.warning("Redefining protocol handler: " + name)
   118             old_handler.stop(prover)
   119             (handlers - name, functions -- old_handler.functions.keys)
   120           case None => (handlers, functions)
   121         }
   122 
   123       val (handlers2, functions2) =
   124         try {
   125           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
   126           new_handler.start(prover)
   127 
   128           val new_functions =
   129             for ((a, f) <- new_handler.functions.toList) yield
   130               (a, (msg: Prover.Protocol_Output) => f(prover, msg))
   131 
   132           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
   133           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
   134 
   135           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
   136         }
   137         catch {
   138           case exn: Throwable =>
   139             Output.error_message(
   140               "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn))
   141             (handlers1, functions1)
   142         }
   143 
   144       new Protocol_Handlers(handlers2, functions2)
   145     }
   146 
   147     def invoke(msg: Prover.Protocol_Output): Boolean =
   148       msg.properties match {
   149         case Markup.Function(a) if functions.isDefinedAt(a) =>
   150           try { functions(a)(msg) }
   151           catch {
   152             case exn: Throwable =>
   153               Output.error_message(
   154                 "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn))
   155             false
   156           }
   157         case _ => false
   158       }
   159 
   160     def stop(prover: Prover): Protocol_Handlers =
   161     {
   162       for ((_, handler) <- handlers) handler.stop(prover)
   163       new Protocol_Handlers()
   164     }
   165   }
   166 }
   167 
   168 
   169 class Session(val resources: Resources)
   170 {
   171   /* global flags */
   172 
   173   @volatile var timing: Boolean = false
   174   @volatile var verbose: Boolean = false
   175 
   176 
   177   /* tuning parameters */
   178 
   179   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   180   def message_delay: Time = Time.seconds(0.1)  // prover input/output messages
   181   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   182   def prune_size: Int = 0  // size of retained history
   183   def syslog_limit: Int = 100
   184   def reparse_limit: Int = 0
   185 
   186 
   187   /* outlets */
   188 
   189   private val dispatcher =
   190     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   191 
   192   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   193   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   194   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   195   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   196   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   197   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   198   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   199   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   200   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
   201   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   202 
   203 
   204 
   205   /** main protocol manager **/
   206 
   207   /* internal messages */
   208 
   209   private case class Start(name: String, args: List[String])
   210   private case object Stop
   211   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   212   private case class Protocol_Command(name: String, args: List[String])
   213   private case class Update_Options(options: Options)
   214   private case object Prune_History
   215 
   216 
   217   /* global state */
   218 
   219   private val syslog = new Session.Syslog(syslog_limit)
   220   def syslog_content(): String = syslog.content
   221 
   222   @volatile private var _phase: Session.Phase = Session.Inactive
   223   private def phase_=(new_phase: Session.Phase)
   224   {
   225     _phase = new_phase
   226     phase_changed.post(new_phase)
   227   }
   228   def phase = _phase
   229   def is_ready: Boolean = phase == Session.Ready
   230 
   231   private val global_state = Synchronized(Document.State.init)
   232   def current_state(): Document.State = global_state.value
   233 
   234   def recent_syntax(): Prover.Syntax =
   235   {
   236     val version = current_state().recent_finished.version.get_finished
   237     version.syntax getOrElse resources.base_syntax
   238   }
   239 
   240 
   241   /* protocol handlers */
   242 
   243   @volatile private var _protocol_handlers = new Session.Protocol_Handlers()
   244 
   245   def protocol_handler(name: String): Option[Session.Protocol_Handler] =
   246     _protocol_handlers.get(name)
   247 
   248 
   249   /* theory files */
   250 
   251   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   252   {
   253     val header1 =
   254       if (resources.loaded_theories(name.theory))
   255         header.error("Cannot update finished theory " + quote(name.theory))
   256       else header
   257     (name, Document.Node.Deps(header1))
   258   }
   259 
   260 
   261   /* pipelined change parsing */
   262 
   263   private case class Text_Edits(
   264     previous: Future[Document.Version],
   265     doc_blobs: Document.Blobs,
   266     text_edits: List[Document.Edit_Text],
   267     version_result: Promise[Document.Version])
   268 
   269   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   270   {
   271     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   272       val prev = previous.get_finished
   273       val change =
   274         Timing.timeit("parse_change", timing) {
   275           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   276         }
   277       version_result.fulfill(change.version)
   278       manager.send(change)
   279       true
   280   }
   281 
   282 
   283   /* buffered changes */
   284 
   285   private object change_buffer
   286   {
   287     private var assignment: Boolean = false
   288     private var nodes: Set[Document.Node.Name] = Set.empty
   289     private var commands: Set[Command] = Set.empty
   290 
   291     def flush(): Unit = synchronized {
   292       if (assignment || !nodes.isEmpty || !commands.isEmpty)
   293         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   294       assignment = false
   295       nodes = Set.empty
   296       commands = Set.empty
   297     }
   298     private val delay_flush = Simple_Thread.delay_first(output_delay) { flush() }
   299 
   300     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   301       assignment |= assign
   302       for (command <- cmds) {
   303         nodes += command.node_name
   304         commands += command
   305       }
   306       delay_flush.invoke()
   307     }
   308 
   309     def shutdown()
   310     {
   311       delay_flush.revoke()
   312       flush()
   313     }
   314   }
   315 
   316 
   317   /* postponed changes */
   318 
   319   private object postponed_changes
   320   {
   321     private var postponed: List[Session.Change] = Nil
   322 
   323     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   324 
   325     def flush(): Unit = synchronized {
   326       val state = global_state.value
   327       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   328       postponed = unassigned
   329       assigned.reverseIterator.foreach(change => manager.send(change))
   330     }
   331   }
   332 
   333 
   334   /* prover process */
   335 
   336   private object prover
   337   {
   338     private val variable = Synchronized(None: Option[Prover])
   339 
   340     def defined: Boolean = variable.value.isDefined
   341     def get: Prover = variable.value.get
   342     def set(p: Prover) { variable.change(_ => Some(p)) }
   343     def reset { variable.change(_ => None) }
   344     def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) }
   345   }
   346 
   347 
   348   /* manager thread */
   349 
   350   private val delay_prune = Simple_Thread.delay_first(prune_delay) { manager.send(Prune_History) }
   351 
   352   private val manager: Consumer_Thread[Any] =
   353   {
   354     /* raw edits */
   355 
   356     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   357     //{{{
   358     {
   359       require(prover.defined)
   360 
   361       prover.get.discontinue_execution()
   362 
   363       val previous = global_state.value.history.tip.version
   364       val version = Future.promise[Document.Version]
   365       global_state.change(_.continue_history(previous, edits, version))
   366 
   367       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   368       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   369     }
   370     //}}}
   371 
   372 
   373     /* resulting changes */
   374 
   375     def handle_change(change: Session.Change)
   376     //{{{
   377     {
   378       require(prover.defined)
   379 
   380       def id_command(command: Command)
   381       {
   382         for {
   383           digest <- command.blobs_digests
   384           if !global_state.value.defined_blob(digest)
   385         } {
   386           change.doc_blobs.get(digest) match {
   387             case Some(blob) =>
   388               global_state.change(_.define_blob(digest))
   389               prover.get.define_blob(digest, blob.bytes)
   390             case None =>
   391               Output.error_message("Missing blob for SHA1 digest " + digest)
   392           }
   393         }
   394 
   395         if (!global_state.value.defined_command(command.id)) {
   396           global_state.change(_.define_command(command))
   397           prover.get.define_command(command)
   398         }
   399       }
   400       change.doc_edits foreach {
   401         case (_, edit) =>
   402           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   403       }
   404 
   405       val assignment = global_state.value.the_assignment(change.previous).check_finished
   406       global_state.change(_.define_version(change.version, assignment))
   407       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   408       resources.commit(change)
   409     }
   410     //}}}
   411 
   412 
   413     /* prover output */
   414 
   415     def handle_output(output: Prover.Output)
   416     //{{{
   417     {
   418       def bad_output()
   419       {
   420         if (verbose)
   421           Output.warning("Ignoring bad prover output: " + output.message.toString)
   422       }
   423 
   424       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   425       {
   426         try {
   427           val st = global_state.change_result(_.accumulate(state_id, message))
   428           change_buffer.invoke(false, List(st.command))
   429         }
   430         catch {
   431           case _: Document.State.Fail => bad_output()
   432         }
   433       }
   434 
   435       output match {
   436         case msg: Prover.Protocol_Output =>
   437           val handled = _protocol_handlers.invoke(msg)
   438           if (!handled) {
   439             msg.properties match {
   440               case Markup.Protocol_Handler(name) if prover.defined =>
   441                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
   442 
   443               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
   444                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   445                 accumulate(state_id, prover.get.xml_cache.elem(message))
   446 
   447               case Markup.Assign_Update =>
   448                 msg.text match {
   449                   case Protocol.Assign_Update(id, update) =>
   450                     try {
   451                       val cmds = global_state.change_result(_.assign(id, update))
   452                       change_buffer.invoke(true, cmds)
   453                     }
   454                     catch { case _: Document.State.Fail => bad_output() }
   455                     postponed_changes.flush()
   456                   case _ => bad_output()
   457                 }
   458                 delay_prune.invoke()
   459 
   460               case Markup.Removed_Versions =>
   461                 msg.text match {
   462                   case Protocol.Removed(removed) =>
   463                     try {
   464                       global_state.change(_.removed_versions(removed))
   465                     }
   466                     catch { case _: Document.State.Fail => bad_output() }
   467                   case _ => bad_output()
   468                 }
   469 
   470               case Markup.ML_Statistics(props) =>
   471                 statistics.post(Session.Statistics(props))
   472 
   473               case Markup.Task_Statistics(props) =>
   474                 // FIXME
   475 
   476               case _ => bad_output()
   477             }
   478           }
   479         case _ =>
   480           output.properties match {
   481             case Position.Id(state_id) =>
   482               accumulate(state_id, output.message)
   483 
   484             case _ if output.is_init =>
   485               phase = Session.Ready
   486 
   487             case Markup.Return_Code(rc) if output.is_exit =>
   488               if (rc == 0) phase = Session.Inactive
   489               else phase = Session.Failed
   490               prover.reset
   491 
   492             case _ => raw_output_messages.post(output)
   493           }
   494         }
   495     }
   496     //}}}
   497 
   498 
   499     /* main thread */
   500 
   501     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   502     {
   503       case arg: Any =>
   504         //{{{
   505         arg match {
   506           case output: Prover.Output =>
   507             if (output.is_stdout || output.is_stderr)
   508               raw_output_messages.post(output)
   509             else handle_output(output)
   510 
   511             if (output.is_syslog) {
   512               syslog += output.message
   513               syslog_messages.post(output)
   514             }
   515 
   516             all_messages.post(output)
   517 
   518           case input: Prover.Input =>
   519             all_messages.post(input)
   520 
   521           case Start(name, args) if !prover.defined =>
   522             if (phase == Session.Inactive || phase == Session.Failed) {
   523               phase = Session.Startup
   524               prover.set(resources.start_prover(manager.send(_), name, args))
   525             }
   526 
   527           case Stop =>
   528             if (prover.defined && is_ready) {
   529               _protocol_handlers = _protocol_handlers.stop(prover.get)
   530               global_state.change(_ => Document.State.init)
   531               phase = Session.Shutdown
   532               prover.get.terminate
   533             }
   534 
   535           case Prune_History =>
   536             if (prover.defined) {
   537               val old_versions = global_state.change_result(_.prune_history(prune_size))
   538               if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   539             }
   540 
   541           case Update_Options(options) =>
   542             if (prover.defined && is_ready) {
   543               prover.get.options(options)
   544               handle_raw_edits(Document.Blobs.empty, Nil)
   545             }
   546             global_options.post(Session.Global_Options(options))
   547 
   548           case Cancel_Exec(exec_id) if prover.defined =>
   549             prover.get.cancel_exec(exec_id)
   550 
   551           case Session.Raw_Edits(doc_blobs, edits) if prover.defined =>
   552             handle_raw_edits(doc_blobs, edits)
   553 
   554           case Session.Dialog_Result(id, serial, result) if prover.defined =>
   555             prover.get.dialog_result(serial, result)
   556             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   557 
   558           case Protocol_Command(name, args) if prover.defined =>
   559             prover.get.protocol_command(name, args:_*)
   560 
   561           case change: Session.Change if prover.defined =>
   562             if (global_state.value.is_assigned(change.previous))
   563               handle_change(change)
   564             else postponed_changes.store(change)
   565         }
   566         true
   567         //}}}
   568     }
   569   }
   570 
   571 
   572   /* main operations */
   573 
   574   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   575       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   576     global_state.value.snapshot(name, pending_edits)
   577 
   578   def start(name: String, args: List[String])
   579   { manager.send(Start(name, args)) }
   580 
   581   def stop()
   582   {
   583     manager.send_wait(Stop)
   584     prover.await_reset()
   585     change_parser.shutdown()
   586     change_buffer.shutdown()
   587     delay_prune.revoke()
   588     manager.shutdown()
   589     dispatcher.shutdown()
   590   }
   591 
   592   def protocol_command(name: String, args: String*)
   593   { manager.send(Protocol_Command(name, args.toList)) }
   594 
   595   def cancel_exec(exec_id: Document_ID.Exec)
   596   { manager.send(Cancel_Exec(exec_id)) }
   597 
   598   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   599   { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   600 
   601   def update_options(options: Options)
   602   { manager.send_wait(Update_Options(options)) }
   603 
   604   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   605   { manager.send(Session.Dialog_Result(id, serial, result)) }
   606 }