src/Pure/PIDE/session.scala
author wenzelm
Mon Mar 13 21:37:35 2017 +0100 (2017-03-13 ago)
changeset 65214 a2ec0db555c7
parent 65213 51c0f094dc02
child 65215 90a7876d6f4c
permissions -rw-r--r--
clarified modules;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import scala.collection.immutable.Queue
    12 
    13 
    14 object Session
    15 {
    16   /* outlets */
    17 
    18   object Consumer
    19   {
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    21       new Consumer[A](name, consume)
    22   }
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    24 
    25   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    26   {
    27     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    28 
    29     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    30     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    31 
    32     def post(a: A)
    33     {
    34       for (c <- consumers.value.iterator) {
    35         dispatcher.send(() =>
    36           try { c.consume(a) }
    37           catch {
    38             case exn: Throwable =>
    39               Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    40           })
    41       }
    42     }
    43   }
    44 
    45 
    46   /* change */
    47 
    48   sealed case class Change(
    49     previous: Document.Version,
    50     syntax_changed: List[Document.Node.Name],
    51     deps_changed: Boolean,
    52     doc_edits: List[Document.Edit_Command],
    53     version: Document.Version)
    54 
    55   case object Change_Flush
    56 
    57 
    58   /* events */
    59 
    60   //{{{
    61   case class Statistics(props: Properties.T)
    62   case class Global_Options(options: Options)
    63   case object Caret_Focus
    64   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    65   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    66   case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
    67   case class Commands_Changed(
    68     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    69 
    70   sealed abstract class Phase
    71   {
    72     def print: String =
    73       this match {
    74         case Terminated(rc) => if (rc == 0) "finished" else "failed"
    75         case _ => Word.lowercase(this.toString)
    76       }
    77   }
    78   case object Inactive extends Phase  // stable
    79   case object Startup extends Phase  // transient
    80   case object Ready extends Phase  // metastable
    81   case object Shutdown extends Phase  // transient
    82   case class Terminated(rc: Int) extends Phase  // stable
    83   //}}}
    84 
    85 
    86   /* syslog */
    87 
    88   private[Session] class Syslog(limit: Int)
    89   {
    90     private var queue = Queue.empty[XML.Elem]
    91     private var length = 0
    92 
    93     def += (msg: XML.Elem): Unit = synchronized {
    94       queue = queue.enqueue(msg)
    95       length += 1
    96       if (length > limit) queue = queue.dequeue._2
    97     }
    98 
    99     def content: String = synchronized {
   100       cat_lines(queue.iterator.map(XML.content)) +
   101       (if (length > limit) "\n(A total of " + length + " messages...)" else "")
   102     }
   103   }
   104 
   105 
   106   /* protocol handlers */
   107 
   108   abstract class Protocol_Handler
   109   {
   110     def start(session: Session, prover: Prover): Unit = {}
   111     def stop(prover: Prover): Unit = {}
   112     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
   113   }
   114 }
   115 
   116 
   117 class Session(val resources: Resources)
   118 {
   119   session =>
   120 
   121 
   122   /* global flags */
   123 
   124   @volatile var timing: Boolean = false
   125   @volatile var verbose: Boolean = false
   126 
   127 
   128   /* tuning parameters */
   129 
   130   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   131   def prune_delay: Time = Time.seconds(15.0)  // prune history (delete old versions)
   132   def prune_size: Int = 0  // size of retained history
   133   def syslog_limit: Int = 100
   134   def reparse_limit: Int = 0
   135 
   136 
   137   /* outlets */
   138 
   139   private val dispatcher =
   140     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   141 
   142   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   143   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   144   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   145   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   146   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   147   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   148   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   149   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   150   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   151   val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher)
   152 
   153   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck!
   154 
   155 
   156   /** main protocol manager **/
   157 
   158   /* internal messages */
   159 
   160   private case class Start(start_prover: Prover.Receiver => Prover)
   161   private case object Stop
   162   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   163   private case class Protocol_Command(name: String, args: List[String])
   164   private case class Update_Options(options: Options)
   165   private case object Prune_History
   166 
   167 
   168   /* phase */
   169 
   170   private def post_phase(new_phase: Session.Phase): Session.Phase =
   171   {
   172     phase_changed.post(new_phase)
   173     new_phase
   174   }
   175   private val _phase = Synchronized[Session.Phase](Session.Inactive)
   176   private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase))
   177 
   178   def phase = _phase.value
   179   def is_ready: Boolean = phase == Session.Ready
   180 
   181 
   182   /* global state */
   183 
   184   private val syslog = new Session.Syslog(syslog_limit)
   185   def syslog_content(): String = syslog.content
   186 
   187   private val global_state = Synchronized(Document.State.init)
   188   def current_state(): Document.State = global_state.value
   189 
   190   def recent_syntax(name: Document.Node.Name): Outer_Syntax =
   191     global_state.value.recent_finished.version.get_finished.nodes(name).syntax getOrElse
   192     resources.base.syntax
   193 
   194 
   195   /* pipelined change parsing */
   196 
   197   private case class Text_Edits(
   198     previous: Future[Document.Version],
   199     doc_blobs: Document.Blobs,
   200     text_edits: List[Document.Edit_Text],
   201     version_result: Promise[Document.Version])
   202 
   203   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   204   {
   205     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   206       val prev = previous.get_finished
   207       val change =
   208         Timing.timeit("parse_change", timing) {
   209           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   210         }
   211       version_result.fulfill(change.version)
   212       manager.send(change)
   213       true
   214   }
   215 
   216 
   217   /* buffered changes */
   218 
   219   private object change_buffer
   220   {
   221     private var assignment: Boolean = false
   222     private var nodes: Set[Document.Node.Name] = Set.empty
   223     private var commands: Set[Command] = Set.empty
   224 
   225     def flush(): Unit = synchronized {
   226       if (assignment || nodes.nonEmpty || commands.nonEmpty)
   227         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   228       assignment = false
   229       nodes = Set.empty
   230       commands = Set.empty
   231     }
   232     private val delay_flush = Standard_Thread.delay_first(output_delay) { flush() }
   233 
   234     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   235       assignment |= assign
   236       for (command <- cmds) {
   237         nodes += command.node_name
   238         command.blobs_names.foreach(nodes += _)
   239         commands += command
   240       }
   241       delay_flush.invoke()
   242     }
   243 
   244     def shutdown()
   245     {
   246       delay_flush.revoke()
   247       flush()
   248     }
   249   }
   250 
   251 
   252   /* postponed changes */
   253 
   254   private object postponed_changes
   255   {
   256     private var postponed: List[Session.Change] = Nil
   257 
   258     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   259 
   260     def flush(state: Document.State): List[Session.Change] = synchronized {
   261       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   262       postponed = unassigned
   263       assigned.reverse
   264     }
   265   }
   266 
   267 
   268   /* prover process */
   269 
   270   private object prover
   271   {
   272     private val variable = Synchronized[Option[Prover]](None)
   273 
   274     def defined: Boolean = variable.value.isDefined
   275     def get: Prover = variable.value.get
   276     def set(p: Prover) { variable.change(_ => Some(p)) }
   277     def reset { variable.change(_ => None) }
   278     def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) }
   279   }
   280 
   281 
   282   /* protocol handlers */
   283 
   284   private val protocol_handlers = Protocol_Handlers.init()
   285 
   286   def get_protocol_handler(name: String): Option[Session.Protocol_Handler] =
   287     protocol_handlers.get(name)
   288 
   289   def add_protocol_handler(handler: Session.Protocol_Handler): Unit =
   290     protocol_handlers.add(session, prover.get, handler)
   291 
   292   def add_protocol_handler(name: String): Unit =
   293     protocol_handlers.add(session, prover.get, name)
   294 
   295 
   296   /* manager thread */
   297 
   298   private val delay_prune =
   299     Standard_Thread.delay_first(prune_delay) { manager.send(Prune_History) }
   300 
   301   private val manager: Consumer_Thread[Any] =
   302   {
   303     /* raw edits */
   304 
   305     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   306     //{{{
   307     {
   308       require(prover.defined)
   309 
   310       prover.get.discontinue_execution()
   311 
   312       val previous = global_state.value.history.tip.version
   313       val version = Future.promise[Document.Version]
   314       global_state.change(_.continue_history(previous, edits, version))
   315 
   316       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   317       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   318     }
   319     //}}}
   320 
   321 
   322     /* resulting changes */
   323 
   324     def handle_change(change: Session.Change)
   325     //{{{
   326     {
   327       require(prover.defined)
   328 
   329       def id_command(command: Command)
   330       {
   331         for {
   332           (name, digest) <- command.blobs_defined
   333           if !global_state.value.defined_blob(digest)
   334         } {
   335           change.version.nodes(name).get_blob match {
   336             case Some(blob) =>
   337               global_state.change(_.define_blob(digest))
   338               prover.get.define_blob(digest, blob.bytes)
   339             case None =>
   340               Output.error_message("Missing blob " + quote(name.toString))
   341           }
   342         }
   343 
   344         if (!global_state.value.defined_command(command.id)) {
   345           global_state.change(_.define_command(command))
   346           prover.get.define_command(command)
   347         }
   348       }
   349       change.doc_edits foreach {
   350         case (_, edit) =>
   351           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   352       }
   353 
   354       val assignment = global_state.value.the_assignment(change.previous).check_finished
   355       global_state.change(_.define_version(change.version, assignment))
   356       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   357       resources.commit(change)
   358     }
   359     //}}}
   360 
   361 
   362     /* prover output */
   363 
   364     def handle_output(output: Prover.Output)
   365     //{{{
   366     {
   367       def bad_output()
   368       {
   369         if (verbose)
   370           Output.warning("Ignoring bad prover output: " + output.message.toString)
   371       }
   372 
   373       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   374       {
   375         try {
   376           val st = global_state.change_result(_.accumulate(state_id, message))
   377           change_buffer.invoke(false, List(st.command))
   378         }
   379         catch {
   380           case _: Document.State.Fail => bad_output()
   381         }
   382       }
   383 
   384       output match {
   385         case msg: Prover.Protocol_Output =>
   386           val handled = protocol_handlers.invoke(msg)
   387           if (!handled) {
   388             msg.properties match {
   389               case Markup.Protocol_Handler(name) if prover.defined =>
   390                 add_protocol_handler(name)
   391 
   392               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
   393                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   394                 accumulate(state_id, prover.get.xml_cache.elem(message))
   395 
   396               case Markup.Assign_Update =>
   397                 msg.text match {
   398                   case Protocol.Assign_Update(id, update) =>
   399                     try {
   400                       val cmds = global_state.change_result(_.assign(id, update))
   401                       change_buffer.invoke(true, cmds)
   402                       manager.send(Session.Change_Flush)
   403                     }
   404                     catch { case _: Document.State.Fail => bad_output() }
   405                   case _ => bad_output()
   406                 }
   407                 delay_prune.invoke()
   408 
   409               case Markup.Removed_Versions =>
   410                 msg.text match {
   411                   case Protocol.Removed(removed) =>
   412                     try {
   413                       global_state.change(_.removed_versions(removed))
   414                       manager.send(Session.Change_Flush)
   415                     }
   416                     catch { case _: Document.State.Fail => bad_output() }
   417                   case _ => bad_output()
   418                 }
   419 
   420               case Markup.ML_Statistics(props) =>
   421                 statistics.post(Session.Statistics(props))
   422 
   423               case Markup.Task_Statistics(props) =>
   424                 // FIXME
   425 
   426               case _ => bad_output()
   427             }
   428           }
   429         case _ =>
   430           output.properties match {
   431             case Position.Id(state_id) =>
   432               accumulate(state_id, output.message)
   433 
   434             case _ if output.is_init =>
   435               phase = Session.Ready
   436 
   437             case Markup.Return_Code(rc) if output.is_exit =>
   438               phase = Session.Terminated(rc)
   439               prover.reset
   440 
   441             case _ =>
   442               raw_output_messages.post(output)
   443           }
   444         }
   445     }
   446     //}}}
   447 
   448 
   449     /* main thread */
   450 
   451     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   452     {
   453       case arg: Any =>
   454         //{{{
   455         arg match {
   456           case output: Prover.Output =>
   457             if (output.is_stdout || output.is_stderr)
   458               raw_output_messages.post(output)
   459             else handle_output(output)
   460 
   461             if (output.is_syslog) {
   462               syslog += output.message
   463               syslog_messages.post(output)
   464             }
   465 
   466             all_messages.post(output)
   467 
   468           case input: Prover.Input =>
   469             all_messages.post(input)
   470 
   471           case Start(start_prover) if !prover.defined =>
   472             prover.set(start_prover(manager.send(_)))
   473 
   474           case Stop =>
   475             delay_prune.revoke()
   476             if (prover.defined) {
   477               protocol_handlers.stop(prover.get)
   478               global_state.change(_ => Document.State.init)
   479               prover.get.terminate
   480             }
   481 
   482           case Prune_History =>
   483             if (prover.defined) {
   484               val old_versions = global_state.change_result(_.remove_versions(prune_size))
   485               if (old_versions.nonEmpty) prover.get.remove_versions(old_versions)
   486             }
   487 
   488           case Update_Options(options) =>
   489             if (prover.defined && is_ready) {
   490               prover.get.options(options)
   491               handle_raw_edits(Document.Blobs.empty, Nil)
   492             }
   493             global_options.post(Session.Global_Options(options))
   494 
   495           case Cancel_Exec(exec_id) if prover.defined =>
   496             prover.get.cancel_exec(exec_id)
   497 
   498           case Session.Raw_Edits(doc_blobs, edits) if prover.defined =>
   499             handle_raw_edits(doc_blobs, edits)
   500 
   501           case Session.Dialog_Result(id, serial, result) if prover.defined =>
   502             prover.get.dialog_result(serial, result)
   503             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   504 
   505           case Session.Build_Theories(id, master_dir, theories) if prover.defined =>
   506             prover.get.build_theories(id, master_dir, theories)
   507 
   508           case Protocol_Command(name, args) if prover.defined =>
   509             prover.get.protocol_command(name, args:_*)
   510 
   511           case change: Session.Change if prover.defined =>
   512             val state = global_state.value
   513             if (!state.removing_versions && state.is_assigned(change.previous))
   514               handle_change(change)
   515             else postponed_changes.store(change)
   516 
   517           case Session.Change_Flush if prover.defined =>
   518             val state = global_state.value
   519             if (!state.removing_versions)
   520               postponed_changes.flush(state).foreach(handle_change(_))
   521 
   522           case bad =>
   523             if (verbose) Output.warning("Ignoring bad message: " + bad.toString)
   524         }
   525         true
   526         //}}}
   527     }
   528   }
   529 
   530 
   531   /* main operations */
   532 
   533   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   534       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   535     global_state.value.snapshot(name, pending_edits)
   536 
   537   def start(start_prover: Prover.Receiver => Prover)
   538   {
   539     _phase.change(
   540       {
   541         case Session.Inactive =>
   542           manager.send(Start(start_prover))
   543           post_phase(Session.Startup)
   544         case phase => error("Cannot start prover in phase " + quote(phase.print))
   545       })
   546   }
   547 
   548   def stop(): Int =
   549   {
   550     val was_ready =
   551       _phase.guarded_access(phase =>
   552         phase match {
   553           case Session.Startup | Session.Shutdown => None
   554           case Session.Terminated(_) => Some((false, phase))
   555           case Session.Inactive => Some((false, post_phase(Session.Terminated(0))))
   556           case Session.Ready => Some((true, post_phase(Session.Shutdown)))
   557         })
   558     if (was_ready) manager.send_wait(Stop)
   559     prover.await_reset()
   560 
   561     change_parser.shutdown()
   562     change_buffer.shutdown()
   563     manager.shutdown()
   564     dispatcher.shutdown()
   565 
   566     phase match {
   567       case Session.Terminated(rc) => rc
   568       case phase => error("Bad session phase after shutdown: " + quote(phase.print))
   569     }
   570   }
   571 
   572   def protocol_command(name: String, args: String*)
   573   { manager.send(Protocol_Command(name, args.toList)) }
   574 
   575   def cancel_exec(exec_id: Document_ID.Exec)
   576   { manager.send(Cancel_Exec(exec_id)) }
   577 
   578   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   579   { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   580 
   581   def update_options(options: Options)
   582   { manager.send_wait(Update_Options(options)) }
   583 
   584   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   585   { manager.send(Session.Dialog_Result(id, serial, result)) }
   586 
   587   def build_theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
   588   { manager.send(Session.Build_Theories(id, master_dir, theories)) }
   589 }