src/Pure/PIDE/session.scala
author wenzelm
Tue Aug 12 18:36:43 2014 +0200 (2014-08-12)
changeset 57916 2c2c24dbf0a4
parent 57867 abae8aff6262
child 57923 cdae2467311d
child 57976 bf99106b6672
permissions -rw-r--r--
generic process wrapping in Prover;
clarified module arrangement;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import scala.collection.immutable.Queue
    12 
    13 
    14 object Session
    15 {
    16   /* outlets */
    17 
    18   object Consumer
    19   {
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    21       new Consumer[A](name, consume)
    22   }
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    24 
    25   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    26   {
    27     private val consumers = Synchronized(List.empty[Consumer[A]])
    28 
    29     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    30     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    31 
    32     def post(a: A)
    33     {
    34       for (c <- consumers.value.iterator) {
    35         dispatcher.send(() =>
    36           try { c.consume(a) }
    37           catch {
    38             case exn: Throwable =>
    39               Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    40           })
    41       }
    42     }
    43   }
    44 
    45 
    46   /* change */
    47 
    48   sealed case class Change(
    49     previous: Document.Version,
    50     syntax_changed: Boolean,
    51     deps_changed: Boolean,
    52     doc_edits: List[Document.Edit_Command],
    53     version: Document.Version)
    54 
    55 
    56   /* events */
    57 
    58   //{{{
    59   case class Statistics(props: Properties.T)
    60   case class Global_Options(options: Options)
    61   case object Caret_Focus
    62   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    63   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    64   case class Commands_Changed(
    65     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    66 
    67   sealed abstract class Phase
    68   case object Inactive extends Phase
    69   case object Startup extends Phase  // transient
    70   case object Failed extends Phase
    71   case object Ready extends Phase
    72   case object Shutdown extends Phase  // transient
    73   //}}}
    74 
    75 
    76   /* syslog */
    77 
    78   private[Session] class Syslog(limit: Int)
    79   {
    80     private var queue = Queue.empty[XML.Elem]
    81     private var length = 0
    82 
    83     def += (msg: XML.Elem): Unit = synchronized {
    84       queue = queue.enqueue(msg)
    85       length += 1
    86       if (length > limit) queue = queue.dequeue._2
    87     }
    88 
    89     def content: String = synchronized {
    90       cat_lines(queue.iterator.map(XML.content)) +
    91       (if (length > limit) "\n(A total of " + length + " messages...)" else "")
    92     }
    93   }
    94 
    95 
    96   /* protocol handlers */
    97 
    98   abstract class Protocol_Handler
    99   {
   100     def start(prover: Prover): Unit = {}
   101     def stop(prover: Prover): Unit = {}
   102     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
   103   }
   104 
   105   class Protocol_Handlers(
   106     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
   107     functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty)
   108   {
   109     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
   110 
   111     def add(prover: Prover, name: String): Protocol_Handlers =
   112     {
   113       val (handlers1, functions1) =
   114         handlers.get(name) match {
   115           case Some(old_handler) =>
   116             Output.warning("Redefining protocol handler: " + name)
   117             old_handler.stop(prover)
   118             (handlers - name, functions -- old_handler.functions.keys)
   119           case None => (handlers, functions)
   120         }
   121 
   122       val (handlers2, functions2) =
   123         try {
   124           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
   125           new_handler.start(prover)
   126 
   127           val new_functions =
   128             for ((a, f) <- new_handler.functions.toList) yield
   129               (a, (msg: Prover.Protocol_Output) => f(prover, msg))
   130 
   131           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
   132           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
   133 
   134           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
   135         }
   136         catch {
   137           case exn: Throwable =>
   138             Output.error_message(
   139               "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn))
   140             (handlers1, functions1)
   141         }
   142 
   143       new Protocol_Handlers(handlers2, functions2)
   144     }
   145 
   146     def invoke(msg: Prover.Protocol_Output): Boolean =
   147       msg.properties match {
   148         case Markup.Function(a) if functions.isDefinedAt(a) =>
   149           try { functions(a)(msg) }
   150           catch {
   151             case exn: Throwable =>
   152               Output.error_message(
   153                 "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn))
   154             false
   155           }
   156         case _ => false
   157       }
   158 
   159     def stop(prover: Prover): Protocol_Handlers =
   160     {
   161       for ((_, handler) <- handlers) handler.stop(prover)
   162       new Protocol_Handlers()
   163     }
   164   }
   165 }
   166 
   167 
   168 class Session(val resources: Resources)
   169 {
   170   /* global flags */
   171 
   172   @volatile var timing: Boolean = false
   173   @volatile var verbose: Boolean = false
   174 
   175 
   176   /* tuning parameters */
   177 
   178   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   179   def prune_delay: Time = Time.seconds(60.0)  // prune history (delete old versions)
   180   def prune_size: Int = 0  // size of retained history
   181   def syslog_limit: Int = 100
   182   def reparse_limit: Int = 0
   183 
   184 
   185   /* outlets */
   186 
   187   private val dispatcher =
   188     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   189 
   190   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   191   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   192   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   193   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   194   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   195   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   196   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   197   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   198   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
   199   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   200 
   201 
   202 
   203   /** main protocol manager **/
   204 
   205   /* internal messages */
   206 
   207   private case class Start(name: String, args: List[String])
   208   private case object Stop
   209   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   210   private case class Protocol_Command(name: String, args: List[String])
   211   private case class Update_Options(options: Options)
   212   private case object Prune_History
   213 
   214 
   215   /* global state */
   216 
   217   private val syslog = new Session.Syslog(syslog_limit)
   218   def syslog_content(): String = syslog.content
   219 
   220   @volatile private var _phase: Session.Phase = Session.Inactive
   221   private def phase_=(new_phase: Session.Phase)
   222   {
   223     _phase = new_phase
   224     phase_changed.post(new_phase)
   225   }
   226   def phase = _phase
   227   def is_ready: Boolean = phase == Session.Ready
   228 
   229   private val global_state = Synchronized(Document.State.init)
   230   def current_state(): Document.State = global_state.value
   231 
   232   def recent_syntax(): Prover.Syntax =
   233   {
   234     val version = current_state().recent_finished.version.get_finished
   235     version.syntax getOrElse resources.base_syntax
   236   }
   237 
   238 
   239   /* protocol handlers */
   240 
   241   @volatile private var _protocol_handlers = new Session.Protocol_Handlers()
   242 
   243   def protocol_handler(name: String): Option[Session.Protocol_Handler] =
   244     _protocol_handlers.get(name)
   245 
   246 
   247   /* theory files */
   248 
   249   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   250   {
   251     val header1 =
   252       if (resources.loaded_theories(name.theory))
   253         header.error("Cannot update finished theory " + quote(name.theory))
   254       else header
   255     (name, Document.Node.Deps(header1))
   256   }
   257 
   258 
   259   /* pipelined change parsing */
   260 
   261   private case class Text_Edits(
   262     previous: Future[Document.Version],
   263     doc_blobs: Document.Blobs,
   264     text_edits: List[Document.Edit_Text],
   265     version_result: Promise[Document.Version])
   266 
   267   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   268   {
   269     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   270       val prev = previous.get_finished
   271       val change =
   272         Timing.timeit("parse_change", timing) {
   273           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   274         }
   275       version_result.fulfill(change.version)
   276       manager.send(change)
   277       true
   278   }
   279 
   280 
   281   /* buffered changes */
   282 
   283   private object change_buffer
   284   {
   285     private var assignment: Boolean = false
   286     private var nodes: Set[Document.Node.Name] = Set.empty
   287     private var commands: Set[Command] = Set.empty
   288 
   289     def flush(): Unit = synchronized {
   290       if (assignment || !nodes.isEmpty || !commands.isEmpty)
   291         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   292       assignment = false
   293       nodes = Set.empty
   294       commands = Set.empty
   295     }
   296     private val delay_flush = Simple_Thread.delay_first(output_delay) { flush() }
   297 
   298     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   299       assignment |= assign
   300       for (command <- cmds) {
   301         nodes += command.node_name
   302         commands += command
   303       }
   304       delay_flush.invoke()
   305     }
   306 
   307     def shutdown()
   308     {
   309       delay_flush.revoke()
   310       flush()
   311     }
   312   }
   313 
   314 
   315   /* postponed changes */
   316 
   317   private object postponed_changes
   318   {
   319     private var postponed: List[Session.Change] = Nil
   320 
   321     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   322 
   323     def flush(): Unit = synchronized {
   324       val state = global_state.value
   325       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   326       postponed = unassigned
   327       assigned.reverseIterator.foreach(change => manager.send(change))
   328     }
   329   }
   330 
   331 
   332   /* prover process */
   333 
   334   private object prover
   335   {
   336     private val variable = Synchronized(None: Option[Prover])
   337 
   338     def defined: Boolean = variable.value.isDefined
   339     def get: Prover = variable.value.get
   340     def set(p: Prover) { variable.change(_ => Some(p)) }
   341     def reset { variable.change(_ => None) }
   342     def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) }
   343   }
   344 
   345 
   346   /* manager thread */
   347 
   348   private val delay_prune = Simple_Thread.delay_first(prune_delay) { manager.send(Prune_History) }
   349 
   350   private val manager: Consumer_Thread[Any] =
   351   {
   352     /* raw edits */
   353 
   354     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   355     //{{{
   356     {
   357       require(prover.defined)
   358 
   359       prover.get.discontinue_execution()
   360 
   361       val previous = global_state.value.history.tip.version
   362       val version = Future.promise[Document.Version]
   363       global_state.change(_.continue_history(previous, edits, version))
   364 
   365       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   366       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   367     }
   368     //}}}
   369 
   370 
   371     /* resulting changes */
   372 
   373     def handle_change(change: Session.Change)
   374     //{{{
   375     {
   376       require(prover.defined)
   377 
   378       def id_command(command: Command)
   379       {
   380         for {
   381           (name, digest) <- command.blobs_defined
   382           if !global_state.value.defined_blob(digest)
   383         } {
   384           change.version.nodes(name).get_blob match {
   385             case Some(blob) =>
   386               global_state.change(_.define_blob(digest))
   387               prover.get.define_blob(digest, blob.bytes)
   388             case None =>
   389               Output.error_message("Missing blob " + quote(name.toString))
   390           }
   391         }
   392 
   393         if (!global_state.value.defined_command(command.id)) {
   394           global_state.change(_.define_command(command))
   395           prover.get.define_command(command)
   396         }
   397       }
   398       change.doc_edits foreach {
   399         case (_, edit) =>
   400           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   401       }
   402 
   403       val assignment = global_state.value.the_assignment(change.previous).check_finished
   404       global_state.change(_.define_version(change.version, assignment))
   405       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   406       resources.commit(change)
   407     }
   408     //}}}
   409 
   410 
   411     /* prover output */
   412 
   413     def handle_output(output: Prover.Output)
   414     //{{{
   415     {
   416       def bad_output()
   417       {
   418         if (verbose)
   419           Output.warning("Ignoring bad prover output: " + output.message.toString)
   420       }
   421 
   422       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   423       {
   424         try {
   425           val st = global_state.change_result(_.accumulate(state_id, message))
   426           change_buffer.invoke(false, List(st.command))
   427         }
   428         catch {
   429           case _: Document.State.Fail => bad_output()
   430         }
   431       }
   432 
   433       output match {
   434         case msg: Prover.Protocol_Output =>
   435           val handled = _protocol_handlers.invoke(msg)
   436           if (!handled) {
   437             msg.properties match {
   438               case Markup.Protocol_Handler(name) if prover.defined =>
   439                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
   440 
   441               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
   442                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   443                 accumulate(state_id, prover.get.xml_cache.elem(message))
   444 
   445               case Markup.Assign_Update =>
   446                 msg.text match {
   447                   case Protocol.Assign_Update(id, update) =>
   448                     try {
   449                       val cmds = global_state.change_result(_.assign(id, update))
   450                       change_buffer.invoke(true, cmds)
   451                     }
   452                     catch { case _: Document.State.Fail => bad_output() }
   453                     postponed_changes.flush()
   454                   case _ => bad_output()
   455                 }
   456                 delay_prune.invoke()
   457 
   458               case Markup.Removed_Versions =>
   459                 msg.text match {
   460                   case Protocol.Removed(removed) =>
   461                     try {
   462                       global_state.change(_.removed_versions(removed))
   463                     }
   464                     catch { case _: Document.State.Fail => bad_output() }
   465                   case _ => bad_output()
   466                 }
   467 
   468               case Markup.ML_Statistics(props) =>
   469                 statistics.post(Session.Statistics(props))
   470 
   471               case Markup.Task_Statistics(props) =>
   472                 // FIXME
   473 
   474               case _ => bad_output()
   475             }
   476           }
   477         case _ =>
   478           output.properties match {
   479             case Position.Id(state_id) =>
   480               accumulate(state_id, output.message)
   481 
   482             case _ if output.is_init =>
   483               phase = Session.Ready
   484 
   485             case Markup.Return_Code(rc) if output.is_exit =>
   486               if (rc == 0) phase = Session.Inactive
   487               else phase = Session.Failed
   488               prover.reset
   489 
   490             case _ => raw_output_messages.post(output)
   491           }
   492         }
   493     }
   494     //}}}
   495 
   496 
   497     /* main thread */
   498 
   499     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   500     {
   501       case arg: Any =>
   502         //{{{
   503         arg match {
   504           case output: Prover.Output =>
   505             if (output.is_stdout || output.is_stderr)
   506               raw_output_messages.post(output)
   507             else handle_output(output)
   508 
   509             if (output.is_syslog) {
   510               syslog += output.message
   511               syslog_messages.post(output)
   512             }
   513 
   514             all_messages.post(output)
   515 
   516           case input: Prover.Input =>
   517             all_messages.post(input)
   518 
   519           case Start(name, args) if !prover.defined =>
   520             if (phase == Session.Inactive || phase == Session.Failed) {
   521               phase = Session.Startup
   522               prover.set(resources.start_prover(manager.send(_), name, args))
   523             }
   524 
   525           case Stop =>
   526             if (prover.defined && is_ready) {
   527               _protocol_handlers = _protocol_handlers.stop(prover.get)
   528               global_state.change(_ => Document.State.init)
   529               phase = Session.Shutdown
   530               prover.get.terminate
   531             }
   532 
   533           case Prune_History =>
   534             if (prover.defined) {
   535               val old_versions = global_state.change_result(_.prune_history(prune_size))
   536               if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   537             }
   538 
   539           case Update_Options(options) =>
   540             if (prover.defined && is_ready) {
   541               prover.get.options(options)
   542               handle_raw_edits(Document.Blobs.empty, Nil)
   543             }
   544             global_options.post(Session.Global_Options(options))
   545 
   546           case Cancel_Exec(exec_id) if prover.defined =>
   547             prover.get.cancel_exec(exec_id)
   548 
   549           case Session.Raw_Edits(doc_blobs, edits) if prover.defined =>
   550             handle_raw_edits(doc_blobs, edits)
   551 
   552           case Session.Dialog_Result(id, serial, result) if prover.defined =>
   553             prover.get.dialog_result(serial, result)
   554             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   555 
   556           case Protocol_Command(name, args) if prover.defined =>
   557             prover.get.protocol_command(name, args:_*)
   558 
   559           case change: Session.Change if prover.defined =>
   560             if (global_state.value.is_assigned(change.previous))
   561               handle_change(change)
   562             else postponed_changes.store(change)
   563 
   564           case bad =>
   565             if (verbose) Output.warning("Ignoring bad message: " + bad.toString)
   566         }
   567         true
   568         //}}}
   569     }
   570   }
   571 
   572 
   573   /* main operations */
   574 
   575   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   576       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   577     global_state.value.snapshot(name, pending_edits)
   578 
   579   def start(name: String, args: List[String])
   580   { manager.send(Start(name, args)) }
   581 
   582   def stop()
   583   {
   584     manager.send_wait(Stop)
   585     prover.await_reset()
   586     change_parser.shutdown()
   587     change_buffer.shutdown()
   588     delay_prune.revoke()
   589     manager.shutdown()
   590     dispatcher.shutdown()
   591   }
   592 
   593   def protocol_command(name: String, args: String*)
   594   { manager.send(Protocol_Command(name, args.toList)) }
   595 
   596   def cancel_exec(exec_id: Document_ID.Exec)
   597   { manager.send(Cancel_Exec(exec_id)) }
   598 
   599   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   600   { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   601 
   602   def update_options(options: Options)
   603   { manager.send_wait(Update_Options(options)) }
   604 
   605   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   606   { manager.send(Session.Dialog_Result(id, serial, result)) }
   607 }