src/Pure/PIDE/session.scala
author wenzelm
Tue Mar 08 14:44:11 2016 +0100 (2016-03-08)
changeset 62556 c115e69f457f
parent 62545 8ebffdaf2ce2
child 63584 68751fe1c036
permissions -rw-r--r--
more abstract Session.start, without prover command-line;
Isabelle_Process.apply is directly based on ML_Process;
clarified Isabelle_Process.main command-line;
tuned signature;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import scala.collection.immutable.Queue
    12 
    13 
    14 object Session
    15 {
    16   /* outlets */
    17 
    18   object Consumer
    19   {
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    21       new Consumer[A](name, consume)
    22   }
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    24 
    25   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    26   {
    27     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    28 
    29     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    30     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    31 
    32     def post(a: A)
    33     {
    34       for (c <- consumers.value.iterator) {
    35         dispatcher.send(() =>
    36           try { c.consume(a) }
    37           catch {
    38             case exn: Throwable =>
    39               Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    40           })
    41       }
    42     }
    43   }
    44 
    45 
    46   /* change */
    47 
    48   sealed case class Change(
    49     previous: Document.Version,
    50     syntax_changed: List[Document.Node.Name],
    51     deps_changed: Boolean,
    52     doc_edits: List[Document.Edit_Command],
    53     version: Document.Version)
    54 
    55   case object Change_Flush
    56 
    57 
    58   /* events */
    59 
    60   //{{{
    61   case class Statistics(props: Properties.T)
    62   case class Global_Options(options: Options)
    63   case object Caret_Focus
    64   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    65   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    66   case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
    67   case class Commands_Changed(
    68     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    69 
    70   sealed abstract class Phase
    71   case object Inactive extends Phase
    72   case object Startup extends Phase  // transient
    73   case object Failed extends Phase
    74   case object Ready extends Phase
    75   case object Shutdown extends Phase  // transient
    76   //}}}
    77 
    78 
    79   /* syslog */
    80 
    81   private[Session] class Syslog(limit: Int)
    82   {
    83     private var queue = Queue.empty[XML.Elem]
    84     private var length = 0
    85 
    86     def += (msg: XML.Elem): Unit = synchronized {
    87       queue = queue.enqueue(msg)
    88       length += 1
    89       if (length > limit) queue = queue.dequeue._2
    90     }
    91 
    92     def content: String = synchronized {
    93       cat_lines(queue.iterator.map(XML.content)) +
    94       (if (length > limit) "\n(A total of " + length + " messages...)" else "")
    95     }
    96   }
    97 
    98 
    99   /* protocol handlers */
   100 
   101   abstract class Protocol_Handler
   102   {
   103     def start(prover: Prover): Unit = {}
   104     def stop(prover: Prover): Unit = {}
   105     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
   106   }
   107 
   108   def bad_protocol_handler(exn: Throwable, name: String): Unit =
   109     Output.error_message(
   110       "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn))
   111 
   112   private class Protocol_Handlers(
   113     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
   114     functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty)
   115   {
   116     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
   117 
   118     def add(prover: Prover, handler: Protocol_Handler): Protocol_Handlers =
   119     {
   120       val name = handler.getClass.getName
   121 
   122       val (handlers1, functions1) =
   123         handlers.get(name) match {
   124           case Some(old_handler) =>
   125             Output.warning("Redefining protocol handler: " + name)
   126             old_handler.stop(prover)
   127             (handlers - name, functions -- old_handler.functions.keys)
   128           case None => (handlers, functions)
   129         }
   130 
   131       val (handlers2, functions2) =
   132         try {
   133           handler.start(prover)
   134 
   135           val new_functions =
   136             for ((a, f) <- handler.functions.toList) yield
   137               (a, (msg: Prover.Protocol_Output) => f(prover, msg))
   138 
   139           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
   140           if (dups.nonEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
   141 
   142           (handlers1 + (name -> handler), functions1 ++ new_functions)
   143         }
   144         catch {
   145           case exn: Throwable =>
   146             Session.bad_protocol_handler(exn, name)
   147             (handlers1, functions1)
   148         }
   149 
   150       new Protocol_Handlers(handlers2, functions2)
   151     }
   152 
   153     def invoke(msg: Prover.Protocol_Output): Boolean =
   154       msg.properties match {
   155         case Markup.Function(a) if functions.isDefinedAt(a) =>
   156           try { functions(a)(msg) }
   157           catch {
   158             case exn: Throwable =>
   159               Output.error_message(
   160                 "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn))
   161             false
   162           }
   163         case _ => false
   164       }
   165 
   166     def stop(prover: Prover): Protocol_Handlers =
   167     {
   168       for ((_, handler) <- handlers) handler.stop(prover)
   169       new Protocol_Handlers()
   170     }
   171   }
   172 }
   173 
   174 
   175 class Session(val resources: Resources)
   176 {
   177   /* global flags */
   178 
   179   @volatile var timing: Boolean = false
   180   @volatile var verbose: Boolean = false
   181 
   182 
   183   /* tuning parameters */
   184 
   185   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   186   def prune_delay: Time = Time.seconds(15.0)  // prune history (delete old versions)
   187   def prune_size: Int = 0  // size of retained history
   188   def syslog_limit: Int = 100
   189   def reparse_limit: Int = 0
   190 
   191 
   192   /* outlets */
   193 
   194   private val dispatcher =
   195     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   196 
   197   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   198   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   199   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   200   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   201   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   202   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   203   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   204   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   205   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   206   val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher)
   207 
   208   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck!
   209 
   210 
   211   /** main protocol manager **/
   212 
   213   /* internal messages */
   214 
   215   private case class Start(start_prover: Prover.Receiver => Prover)
   216   private case object Stop
   217   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   218   private case class Protocol_Command(name: String, args: List[String])
   219   private case class Update_Options(options: Options)
   220   private case object Prune_History
   221 
   222 
   223   /* global state */
   224 
   225   private val syslog = new Session.Syslog(syslog_limit)
   226   def syslog_content(): String = syslog.content
   227 
   228   @volatile private var _phase: Session.Phase = Session.Inactive
   229   private def phase_=(new_phase: Session.Phase)
   230   {
   231     _phase = new_phase
   232     phase_changed.post(new_phase)
   233   }
   234   def phase = _phase
   235   def is_ready: Boolean = phase == Session.Ready
   236 
   237   private val global_state = Synchronized(Document.State.init)
   238   def current_state(): Document.State = global_state.value
   239 
   240   def recent_syntax(name: Document.Node.Name): Prover.Syntax =
   241     global_state.value.recent_finished.version.get_finished.nodes(name).syntax getOrElse
   242     resources.base_syntax
   243 
   244 
   245   /* theory files */
   246 
   247   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   248   {
   249     val header1 =
   250       if (resources.loaded_theories(name.theory))
   251         header.error("Cannot update finished theory " + quote(name.theory))
   252       else header
   253     (name, Document.Node.Deps(header1))
   254   }
   255 
   256 
   257   /* pipelined change parsing */
   258 
   259   private case class Text_Edits(
   260     previous: Future[Document.Version],
   261     doc_blobs: Document.Blobs,
   262     text_edits: List[Document.Edit_Text],
   263     version_result: Promise[Document.Version])
   264 
   265   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   266   {
   267     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   268       val prev = previous.get_finished
   269       val change =
   270         Timing.timeit("parse_change", timing) {
   271           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   272         }
   273       version_result.fulfill(change.version)
   274       manager.send(change)
   275       true
   276   }
   277 
   278 
   279   /* buffered changes */
   280 
   281   private object change_buffer
   282   {
   283     private var assignment: Boolean = false
   284     private var nodes: Set[Document.Node.Name] = Set.empty
   285     private var commands: Set[Command] = Set.empty
   286 
   287     def flush(): Unit = synchronized {
   288       if (assignment || nodes.nonEmpty || commands.nonEmpty)
   289         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   290       assignment = false
   291       nodes = Set.empty
   292       commands = Set.empty
   293     }
   294     private val delay_flush = Standard_Thread.delay_first(output_delay) { flush() }
   295 
   296     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   297       assignment |= assign
   298       for (command <- cmds) {
   299         nodes += command.node_name
   300         commands += command
   301       }
   302       delay_flush.invoke()
   303     }
   304 
   305     def shutdown()
   306     {
   307       delay_flush.revoke()
   308       flush()
   309     }
   310   }
   311 
   312 
   313   /* postponed changes */
   314 
   315   private object postponed_changes
   316   {
   317     private var postponed: List[Session.Change] = Nil
   318 
   319     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   320 
   321     def flush(state: Document.State): List[Session.Change] = synchronized {
   322       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   323       postponed = unassigned
   324       assigned.reverse
   325     }
   326   }
   327 
   328 
   329   /* prover process */
   330 
   331   private object prover
   332   {
   333     private val variable = Synchronized[Option[Prover]](None)
   334 
   335     def defined: Boolean = variable.value.isDefined
   336     def get: Prover = variable.value.get
   337     def set(p: Prover) { variable.change(_ => Some(p)) }
   338     def reset { variable.change(_ => None) }
   339     def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) }
   340   }
   341 
   342 
   343   /* protocol handlers */
   344 
   345   private val _protocol_handlers = Synchronized(new Session.Protocol_Handlers)
   346 
   347   def get_protocol_handler(name: String): Option[Session.Protocol_Handler] =
   348     _protocol_handlers.value.get(name)
   349 
   350   def add_protocol_handler(handler: Session.Protocol_Handler): Unit =
   351     _protocol_handlers.change(_.add(prover.get, handler))
   352 
   353 
   354   /* manager thread */
   355 
   356   private val delay_prune =
   357     Standard_Thread.delay_first(prune_delay) { manager.send(Prune_History) }
   358 
   359   private val manager: Consumer_Thread[Any] =
   360   {
   361     /* raw edits */
   362 
   363     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   364     //{{{
   365     {
   366       require(prover.defined)
   367 
   368       prover.get.discontinue_execution()
   369 
   370       val previous = global_state.value.history.tip.version
   371       val version = Future.promise[Document.Version]
   372       global_state.change(_.continue_history(previous, edits, version))
   373 
   374       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   375       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   376     }
   377     //}}}
   378 
   379 
   380     /* resulting changes */
   381 
   382     def handle_change(change: Session.Change)
   383     //{{{
   384     {
   385       require(prover.defined)
   386 
   387       def id_command(command: Command)
   388       {
   389         for {
   390           (name, digest) <- command.blobs_defined
   391           if !global_state.value.defined_blob(digest)
   392         } {
   393           change.version.nodes(name).get_blob match {
   394             case Some(blob) =>
   395               global_state.change(_.define_blob(digest))
   396               prover.get.define_blob(digest, blob.bytes)
   397             case None =>
   398               Output.error_message("Missing blob " + quote(name.toString))
   399           }
   400         }
   401 
   402         if (!global_state.value.defined_command(command.id)) {
   403           global_state.change(_.define_command(command))
   404           prover.get.define_command(command)
   405         }
   406       }
   407       change.doc_edits foreach {
   408         case (_, edit) =>
   409           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   410       }
   411 
   412       val assignment = global_state.value.the_assignment(change.previous).check_finished
   413       global_state.change(_.define_version(change.version, assignment))
   414       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   415       resources.commit(change)
   416     }
   417     //}}}
   418 
   419 
   420     /* prover output */
   421 
   422     def handle_output(output: Prover.Output)
   423     //{{{
   424     {
   425       def bad_output()
   426       {
   427         if (verbose)
   428           Output.warning("Ignoring bad prover output: " + output.message.toString)
   429       }
   430 
   431       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   432       {
   433         try {
   434           val st = global_state.change_result(_.accumulate(state_id, message))
   435           change_buffer.invoke(false, List(st.command))
   436         }
   437         catch {
   438           case _: Document.State.Fail => bad_output()
   439         }
   440       }
   441 
   442       output match {
   443         case msg: Prover.Protocol_Output =>
   444           val handled = _protocol_handlers.value.invoke(msg)
   445           if (!handled) {
   446             msg.properties match {
   447               case Markup.Protocol_Handler(name) if prover.defined =>
   448                 try {
   449                   val handler =
   450                     Class.forName(name).newInstance.asInstanceOf[Session.Protocol_Handler]
   451                   add_protocol_handler(handler)
   452                 }
   453                 catch { case exn: Throwable => Session.bad_protocol_handler(exn, name) }
   454 
   455               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
   456                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   457                 accumulate(state_id, prover.get.xml_cache.elem(message))
   458 
   459               case Markup.Assign_Update =>
   460                 msg.text match {
   461                   case Protocol.Assign_Update(id, update) =>
   462                     try {
   463                       val cmds = global_state.change_result(_.assign(id, update))
   464                       change_buffer.invoke(true, cmds)
   465                       manager.send(Session.Change_Flush)
   466                     }
   467                     catch { case _: Document.State.Fail => bad_output() }
   468                   case _ => bad_output()
   469                 }
   470                 delay_prune.invoke()
   471 
   472               case Markup.Removed_Versions =>
   473                 msg.text match {
   474                   case Protocol.Removed(removed) =>
   475                     try {
   476                       global_state.change(_.removed_versions(removed))
   477                       manager.send(Session.Change_Flush)
   478                     }
   479                     catch { case _: Document.State.Fail => bad_output() }
   480                   case _ => bad_output()
   481                 }
   482 
   483               case Markup.ML_Statistics(props) =>
   484                 statistics.post(Session.Statistics(props))
   485 
   486               case Markup.Task_Statistics(props) =>
   487                 // FIXME
   488 
   489               case _ => bad_output()
   490             }
   491           }
   492         case _ =>
   493           output.properties match {
   494             case Position.Id(state_id) =>
   495               accumulate(state_id, output.message)
   496 
   497             case _ if output.is_init =>
   498               phase = Session.Ready
   499 
   500             case Markup.Return_Code(rc) if output.is_exit =>
   501               if (rc == 0) phase = Session.Inactive
   502               else phase = Session.Failed
   503               prover.reset
   504 
   505             case _ =>
   506               raw_output_messages.post(output)
   507           }
   508         }
   509     }
   510     //}}}
   511 
   512 
   513     /* main thread */
   514 
   515     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   516     {
   517       case arg: Any =>
   518         //{{{
   519         arg match {
   520           case output: Prover.Output =>
   521             if (output.is_stdout || output.is_stderr)
   522               raw_output_messages.post(output)
   523             else handle_output(output)
   524 
   525             if (output.is_syslog) {
   526               syslog += output.message
   527               syslog_messages.post(output)
   528             }
   529 
   530             all_messages.post(output)
   531 
   532           case input: Prover.Input =>
   533             all_messages.post(input)
   534 
   535           case Start(start_prover) if !prover.defined =>
   536             if (phase == Session.Inactive || phase == Session.Failed) {
   537               phase = Session.Startup
   538               prover.set(start_prover(manager.send(_)))
   539             }
   540 
   541           case Stop =>
   542             if (prover.defined && is_ready) {
   543               _protocol_handlers.change(_.stop(prover.get))
   544               global_state.change(_ => Document.State.init)
   545               phase = Session.Shutdown
   546               prover.get.terminate
   547             }
   548 
   549           case Prune_History =>
   550             if (prover.defined) {
   551               val old_versions = global_state.change_result(_.remove_versions(prune_size))
   552               if (old_versions.nonEmpty) prover.get.remove_versions(old_versions)
   553             }
   554 
   555           case Update_Options(options) =>
   556             if (prover.defined && is_ready) {
   557               prover.get.options(options)
   558               handle_raw_edits(Document.Blobs.empty, Nil)
   559             }
   560             global_options.post(Session.Global_Options(options))
   561 
   562           case Cancel_Exec(exec_id) if prover.defined =>
   563             prover.get.cancel_exec(exec_id)
   564 
   565           case Session.Raw_Edits(doc_blobs, edits) if prover.defined =>
   566             handle_raw_edits(doc_blobs, edits)
   567 
   568           case Session.Dialog_Result(id, serial, result) if prover.defined =>
   569             prover.get.dialog_result(serial, result)
   570             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   571 
   572           case Session.Build_Theories(id, master_dir, theories) if prover.defined =>
   573             prover.get.build_theories(id, master_dir, theories)
   574 
   575           case Protocol_Command(name, args) if prover.defined =>
   576             prover.get.protocol_command(name, args:_*)
   577 
   578           case change: Session.Change if prover.defined =>
   579             val state = global_state.value
   580             if (!state.removing_versions && state.is_assigned(change.previous))
   581               handle_change(change)
   582             else postponed_changes.store(change)
   583 
   584           case Session.Change_Flush if prover.defined =>
   585             val state = global_state.value
   586             if (!state.removing_versions)
   587               postponed_changes.flush(state).foreach(handle_change(_))
   588 
   589           case bad =>
   590             if (verbose) Output.warning("Ignoring bad message: " + bad.toString)
   591         }
   592         true
   593         //}}}
   594     }
   595   }
   596 
   597 
   598   /* main operations */
   599 
   600   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   601       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   602     global_state.value.snapshot(name, pending_edits)
   603 
   604   def start(start_prover: Prover.Receiver => Prover)
   605   { manager.send(Start(start_prover)) }
   606 
   607   def stop()
   608   {
   609     delay_prune.revoke()
   610     manager.send_wait(Stop)
   611     prover.await_reset()
   612     change_parser.shutdown()
   613     change_buffer.shutdown()
   614     manager.shutdown()
   615     dispatcher.shutdown()
   616   }
   617 
   618   def protocol_command(name: String, args: String*)
   619   { manager.send(Protocol_Command(name, args.toList)) }
   620 
   621   def cancel_exec(exec_id: Document_ID.Exec)
   622   { manager.send(Cancel_Exec(exec_id)) }
   623 
   624   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   625   { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   626 
   627   def update_options(options: Options)
   628   { manager.send_wait(Update_Options(options)) }
   629 
   630   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   631   { manager.send(Session.Dialog_Result(id, serial, result)) }
   632 
   633   def build_theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
   634   { manager.send(Session.Build_Theories(id, master_dir, theories)) }
   635 }