src/Pure/PIDE/session.scala
author wenzelm
Sun Mar 10 14:19:30 2019 +0100 (4 months ago ago)
changeset 70070 be04e9a053a7
parent 69653 af09cc4792dc
child 70469 3e17c3a5fd39
permissions -rw-r--r--
markup and document markers for some meta data from "Dublin Core Metadata Element Set";
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import scala.collection.immutable.Queue
    12 import scala.annotation.tailrec
    13 
    14 
    15 object Session
    16 {
    17   /* outlets */
    18 
    19   object Consumer
    20   {
    21     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    22       new Consumer[A](name, consume)
    23   }
    24   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    25 
    26   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    27   {
    28     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    29 
    30     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    31     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    32 
    33     def post(a: A)
    34     {
    35       for (c <- consumers.value.iterator) {
    36         dispatcher.send(() =>
    37           try { c.consume(a) }
    38           catch {
    39             case exn: Throwable =>
    40               Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    41           })
    42       }
    43     }
    44   }
    45 
    46 
    47   /* change */
    48 
    49   sealed case class Change(
    50     previous: Document.Version,
    51     syntax_changed: List[Document.Node.Name],
    52     deps_changed: Boolean,
    53     doc_edits: List[Document.Edit_Command],
    54     consolidate: List[Document.Node.Name],
    55     version: Document.Version)
    56 
    57   case object Change_Flush
    58 
    59 
    60   /* events */
    61 
    62   //{{{
    63   case class Statistics(props: Properties.T)
    64   case class Global_Options(options: Options)
    65   case object Caret_Focus
    66   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    67   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    68   case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
    69   case class Commands_Changed(
    70     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    71 
    72   sealed abstract class Phase
    73   {
    74     def print: String =
    75       this match {
    76         case Terminated(result) => if (result.ok) "finished" else "failed"
    77         case _ => Word.lowercase(this.toString)
    78       }
    79   }
    80   case object Inactive extends Phase  // stable
    81   case object Startup extends Phase  // transient
    82   case object Ready extends Phase  // metastable
    83   case object Shutdown extends Phase  // transient
    84   case class Terminated(result: Process_Result) extends Phase  // stable
    85   //}}}
    86 
    87 
    88   /* syslog */
    89 
    90   private[Session] class Syslog(limit: Int)
    91   {
    92     private var queue = Queue.empty[XML.Elem]
    93     private var length = 0
    94 
    95     def += (msg: XML.Elem): Unit = synchronized {
    96       queue = queue.enqueue(msg)
    97       length += 1
    98       if (length > limit) queue = queue.dequeue._2
    99     }
   100 
   101     def content: String = synchronized {
   102       cat_lines(queue.iterator.map(XML.content)) +
   103       (if (length > limit) "\n(A total of " + length + " messages...)" else "")
   104     }
   105   }
   106 
   107 
   108   /* protocol handlers */
   109 
   110   abstract class Protocol_Handler
   111   {
   112     def init(session: Session): Unit = {}
   113     def exit(): Unit = {}
   114     val functions: List[(String, Prover.Protocol_Output => Boolean)]
   115   }
   116 }
   117 
   118 
   119 class Session(_session_options: => Options, val resources: Resources) extends Document.Session
   120 {
   121   session =>
   122 
   123   val xml_cache: XML.Cache = XML.make_cache()
   124   val xz_cache: XZ.Cache = XZ.make_cache()
   125 
   126 
   127   /* global flags */
   128 
   129   @volatile var timing: Boolean = false
   130   @volatile var verbose: Boolean = false
   131 
   132 
   133   /* dynamic session options */
   134 
   135   def session_options: Options = _session_options
   136 
   137   def output_delay: Time = session_options.seconds("editor_output_delay")
   138   def consolidate_delay: Time = session_options.seconds("editor_consolidate_delay")
   139   def prune_delay: Time = session_options.seconds("editor_prune_delay")
   140   def prune_size: Int = session_options.int("editor_prune_size")
   141   def syslog_limit: Int = session_options.int("editor_syslog_limit")
   142   def reparse_limit: Int = session_options.int("editor_reparse_limit")
   143 
   144 
   145   /* dispatcher */
   146 
   147   private val dispatcher =
   148     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   149 
   150   def assert_dispatcher[A](body: => A): A =
   151   {
   152     assert(dispatcher.check_thread)
   153     body
   154   }
   155 
   156   def require_dispatcher[A](body: => A): A =
   157   {
   158     require(dispatcher.check_thread)
   159     body
   160   }
   161 
   162   def send_dispatcher(body: => Unit): Unit =
   163   {
   164     if (dispatcher.check_thread) body
   165     else dispatcher.send(() => body)
   166   }
   167 
   168   def send_wait_dispatcher(body: => Unit): Unit =
   169   {
   170     if (dispatcher.check_thread) body
   171     else dispatcher.send_wait(() => body)
   172   }
   173 
   174 
   175   /* outlets */
   176 
   177   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   178   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   179   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   180   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   181   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   182   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   183   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   184   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   185   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   186   val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher)
   187 
   188   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck!
   189 
   190 
   191   /** main protocol manager **/
   192 
   193   /* internal messages */
   194 
   195   private case class Start(start_prover: Prover.Receiver => Prover)
   196   private case object Stop
   197   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   198   private case class Protocol_Command(name: String, args: List[String])
   199   private case class Update_Options(options: Options)
   200   private case object Consolidate_Execution
   201   private case object Prune_History
   202 
   203 
   204   /* phase */
   205 
   206   private def post_phase(new_phase: Session.Phase): Session.Phase =
   207   {
   208     phase_changed.post(new_phase)
   209     new_phase
   210   }
   211   private val _phase = Synchronized[Session.Phase](Session.Inactive)
   212   private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase))
   213 
   214   def phase = _phase.value
   215   def is_ready: Boolean = phase == Session.Ready
   216 
   217 
   218   /* global state */
   219 
   220   private val syslog = new Session.Syslog(syslog_limit)
   221   def syslog_content(): String = syslog.content
   222 
   223   private val global_state = Synchronized(Document.State.init)
   224   def current_state(): Document.State = global_state.value
   225 
   226   def recent_syntax(name: Document.Node.Name): Outer_Syntax =
   227     global_state.value.recent_finished.version.get_finished.nodes(name).syntax getOrElse
   228     resources.session_base.overall_syntax
   229 
   230 
   231   /* pipelined change parsing */
   232 
   233   private case class Text_Edits(
   234     previous: Future[Document.Version],
   235     doc_blobs: Document.Blobs,
   236     text_edits: List[Document.Edit_Text],
   237     consolidate: List[Document.Node.Name],
   238     version_result: Promise[Document.Version])
   239 
   240   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   241   {
   242     case Text_Edits(previous, doc_blobs, text_edits, consolidate, version_result) =>
   243       val prev = previous.get_finished
   244       val change =
   245         Timing.timeit("parse_change", timing) {
   246           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits, consolidate)
   247         }
   248       version_result.fulfill(change.version)
   249       manager.send(change)
   250       true
   251   }
   252 
   253 
   254   /* buffered changes */
   255 
   256   private object change_buffer
   257   {
   258     private var assignment: Boolean = false
   259     private var nodes: Set[Document.Node.Name] = Set.empty
   260     private var commands: Set[Command] = Set.empty
   261 
   262     def flush(): Unit = synchronized {
   263       if (assignment || nodes.nonEmpty || commands.nonEmpty)
   264         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   265       if (nodes.nonEmpty) consolidation.update(nodes)
   266       assignment = false
   267       nodes = Set.empty
   268       commands = Set.empty
   269     }
   270     private val delay_flush = Standard_Thread.delay_first(output_delay) { flush() }
   271 
   272     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   273       assignment |= assign
   274       for (command <- cmds) {
   275         nodes += command.node_name
   276         command.blobs_names.foreach(nodes += _)
   277         commands += command
   278       }
   279       delay_flush.invoke()
   280     }
   281 
   282     def shutdown()
   283     {
   284       delay_flush.revoke()
   285       flush()
   286     }
   287   }
   288 
   289 
   290   /* postponed changes */
   291 
   292   private object postponed_changes
   293   {
   294     private var postponed: List[Session.Change] = Nil
   295 
   296     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   297 
   298     def flush(state: Document.State): List[Session.Change] = synchronized {
   299       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   300       postponed = unassigned
   301       assigned.reverse
   302     }
   303   }
   304 
   305 
   306   /* node consolidation */
   307 
   308   private object consolidation
   309   {
   310     private val delay =
   311       Standard_Thread.delay_first(consolidate_delay) { manager.send(Consolidate_Execution) }
   312 
   313     private val init_state: Option[Set[Document.Node.Name]] = Some(Set.empty)
   314     private val state = Synchronized(init_state)
   315 
   316     def exit()
   317     {
   318       delay.revoke()
   319       state.change(_ => None)
   320     }
   321 
   322     def update(new_nodes: Set[Document.Node.Name] = Set.empty)
   323     {
   324       val active =
   325         state.change_result(st =>
   326           (st.isDefined, st.map(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes)))
   327       if (active) delay.invoke()
   328     }
   329 
   330     def flush(): Set[Document.Node.Name] =
   331       state.change_result(st => if (st.isDefined) (st.get, init_state) else (Set.empty, None))
   332   }
   333 
   334 
   335   /* prover process */
   336 
   337   private object prover
   338   {
   339     private val variable = Synchronized[Option[Prover]](None)
   340 
   341     def defined: Boolean = variable.value.isDefined
   342     def get: Prover = variable.value.get
   343     def set(p: Prover) { variable.change(_ => Some(p)) }
   344     def reset { variable.change(_ => None) }
   345     def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) }
   346   }
   347 
   348 
   349   /* file formats */
   350 
   351   lazy val file_formats: File_Format.Session =
   352     resources.file_formats.start_session(session)
   353 
   354 
   355   /* protocol handlers */
   356 
   357   private val protocol_handlers = Protocol_Handlers.init(session)
   358 
   359   def get_protocol_handler(name: String): Option[Session.Protocol_Handler] =
   360     protocol_handlers.get(name)
   361 
   362   def init_protocol_handler(handler: Session.Protocol_Handler): Unit =
   363     protocol_handlers.init(handler)
   364 
   365   def init_protocol_handler(name: String): Unit =
   366     protocol_handlers.init(name)
   367 
   368 
   369   /* debugger */
   370 
   371   private val debugger_handler = new Debugger.Handler(this)
   372   init_protocol_handler(debugger_handler)
   373 
   374   def debugger: Debugger = debugger_handler.debugger
   375 
   376 
   377   /* manager thread */
   378 
   379   private val delay_prune =
   380     Standard_Thread.delay_first(prune_delay) { manager.send(Prune_History) }
   381 
   382   private val manager: Consumer_Thread[Any] =
   383   {
   384     /* raw edits */
   385 
   386     def handle_raw_edits(
   387       doc_blobs: Document.Blobs = Document.Blobs.empty,
   388       edits: List[Document.Edit_Text] = Nil,
   389       consolidate: List[Document.Node.Name] = Nil)
   390     //{{{
   391     {
   392       require(prover.defined)
   393 
   394       prover.get.discontinue_execution()
   395 
   396       val previous = global_state.value.history.tip.version
   397       val version = Future.promise[Document.Version]
   398       global_state.change(_.continue_history(previous, edits, version))
   399 
   400       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   401       change_parser.send(Text_Edits(previous, doc_blobs, edits, consolidate, version))
   402     }
   403     //}}}
   404 
   405 
   406     /* resulting changes */
   407 
   408     def handle_change(change: Session.Change)
   409     //{{{
   410     {
   411       require(prover.defined)
   412 
   413       def id_command(command: Command)
   414       {
   415         for {
   416           (name, digest) <- command.blobs_defined
   417           if !global_state.value.defined_blob(digest)
   418         } {
   419           change.version.nodes(name).get_blob match {
   420             case Some(blob) =>
   421               global_state.change(_.define_blob(digest))
   422               prover.get.define_blob(digest, blob.bytes)
   423             case None =>
   424               Output.error_message("Missing blob " + quote(name.toString))
   425           }
   426         }
   427 
   428         if (!global_state.value.defined_command(command.id)) {
   429           global_state.change(_.define_command(command))
   430           prover.get.define_command(command)
   431         }
   432       }
   433       for { (_, edit) <- change.doc_edits } {
   434         edit.foreach({ case (c1, c2) => c1.foreach(id_command); c2.foreach(id_command) })
   435       }
   436 
   437       val assignment = global_state.value.the_assignment(change.previous).check_finished
   438       global_state.change(_.define_version(change.version, assignment))
   439       prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate)
   440       resources.commit(change)
   441     }
   442     //}}}
   443 
   444 
   445     /* prover output */
   446 
   447     def handle_output(output: Prover.Output)
   448     //{{{
   449     {
   450       def bad_output()
   451       {
   452         if (verbose)
   453           Output.warning("Ignoring bad prover output: " + output.message.toString)
   454       }
   455 
   456       def change_command(f: Document.State => (Command.State, Document.State))
   457       {
   458         try {
   459           val st = global_state.change_result(f)
   460           change_buffer.invoke(false, List(st.command))
   461         }
   462         catch { case _: Document.State.Fail => bad_output() }
   463       }
   464 
   465       output match {
   466         case msg: Prover.Protocol_Output =>
   467           val handled = protocol_handlers.invoke(msg)
   468           if (!handled) {
   469             msg.properties match {
   470               case Markup.Protocol_Handler(name) if prover.defined =>
   471                 init_protocol_handler(name)
   472 
   473               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
   474                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   475                 change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache))
   476 
   477               case Protocol.Theory_Timing(_, _) =>
   478                 // FIXME
   479 
   480               case Markup.Export(args)
   481               if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined =>
   482                 val id = Value.Long.unapply(args.id.get).get
   483                 val export = Export.make_entry("", args, msg.bytes, cache = xz_cache)
   484                 change_command(_.add_export(id, (args.serial, export)))
   485 
   486               case Markup.Assign_Update =>
   487                 msg.text match {
   488                   case Protocol.Assign_Update(id, update) =>
   489                     try {
   490                       val cmds = global_state.change_result(_.assign(id, update))
   491                       change_buffer.invoke(true, cmds)
   492                       manager.send(Session.Change_Flush)
   493                     }
   494                     catch { case _: Document.State.Fail => bad_output() }
   495                   case _ => bad_output()
   496                 }
   497                 delay_prune.invoke()
   498 
   499               case Markup.Removed_Versions =>
   500                 msg.text match {
   501                   case Protocol.Removed(removed) =>
   502                     try {
   503                       global_state.change(_.removed_versions(removed))
   504                       manager.send(Session.Change_Flush)
   505                     }
   506                     catch { case _: Document.State.Fail => bad_output() }
   507                   case _ => bad_output()
   508                 }
   509 
   510               case Markup.ML_Statistics(props) =>
   511                 statistics.post(Session.Statistics(props))
   512 
   513               case Markup.Task_Statistics(props) =>
   514                 // FIXME
   515 
   516               case _ => bad_output()
   517             }
   518           }
   519         case _ =>
   520           output.properties match {
   521             case Position.Id(state_id) =>
   522               change_command(_.accumulate(state_id, output.message, xml_cache))
   523 
   524             case _ if output.is_init =>
   525               prover.get.options(file_formats.prover_options(session_options))
   526               prover.get.session_base(resources)
   527               phase = Session.Ready
   528               debugger.ready()
   529 
   530             case Markup.Process_Result(result) if output.is_exit =>
   531               file_formats.stop_session
   532               phase = Session.Terminated(result)
   533               prover.reset
   534 
   535             case _ =>
   536               raw_output_messages.post(output)
   537           }
   538         }
   539     }
   540     //}}}
   541 
   542 
   543     /* main thread */
   544 
   545     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   546     {
   547       case arg: Any =>
   548         //{{{
   549         arg match {
   550           case output: Prover.Output =>
   551             if (output.is_stdout || output.is_stderr)
   552               raw_output_messages.post(output)
   553             else handle_output(output)
   554 
   555             if (output.is_syslog) {
   556               syslog += output.message
   557               syslog_messages.post(output)
   558             }
   559 
   560             all_messages.post(output)
   561 
   562           case input: Prover.Input =>
   563             all_messages.post(input)
   564 
   565           case Start(start_prover) if !prover.defined =>
   566             prover.set(start_prover(manager.send(_)))
   567 
   568           case Stop =>
   569             consolidation.exit()
   570             delay_prune.revoke()
   571             if (prover.defined) {
   572               protocol_handlers.exit()
   573               global_state.change(_ => Document.State.init)
   574               prover.get.terminate
   575             }
   576 
   577           case Consolidate_Execution =>
   578             if (prover.defined) {
   579               val state = global_state.value
   580               state.stable_tip_version match {
   581                 case None => consolidation.update()
   582                 case Some(version) =>
   583                   val consolidate =
   584                     consolidation.flush().iterator.filter(name =>
   585                       !resources.session_base.loaded_theory(name) &&
   586                       !state.node_consolidated(version, name) &&
   587                       state.node_maybe_consolidated(version, name)).toList
   588                   if (consolidate.nonEmpty) handle_raw_edits(consolidate = consolidate)
   589               }
   590             }
   591 
   592           case Prune_History =>
   593             if (prover.defined) {
   594               val old_versions = global_state.change_result(_.remove_versions(prune_size))
   595               if (old_versions.nonEmpty) prover.get.remove_versions(old_versions)
   596             }
   597 
   598           case Update_Options(options) =>
   599             if (prover.defined && is_ready) {
   600               prover.get.options(file_formats.prover_options(options))
   601               handle_raw_edits()
   602             }
   603             global_options.post(Session.Global_Options(options))
   604 
   605           case Cancel_Exec(exec_id) if prover.defined =>
   606             prover.get.cancel_exec(exec_id)
   607 
   608           case Session.Raw_Edits(doc_blobs, edits) if prover.defined =>
   609             handle_raw_edits(doc_blobs = doc_blobs, edits = edits)
   610 
   611           case Session.Dialog_Result(id, serial, result) if prover.defined =>
   612             prover.get.dialog_result(serial, result)
   613             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   614 
   615           case Protocol_Command(name, args) if prover.defined =>
   616             prover.get.protocol_command(name, args:_*)
   617 
   618           case change: Session.Change if prover.defined =>
   619             val state = global_state.value
   620             if (!state.removing_versions && state.is_assigned(change.previous))
   621               handle_change(change)
   622             else postponed_changes.store(change)
   623 
   624           case Session.Change_Flush if prover.defined =>
   625             val state = global_state.value
   626             if (!state.removing_versions)
   627               postponed_changes.flush(state).foreach(handle_change(_))
   628 
   629           case bad =>
   630             if (verbose) Output.warning("Ignoring bad message: " + bad.toString)
   631         }
   632         true
   633         //}}}
   634     }
   635   }
   636 
   637 
   638   /* main operations */
   639 
   640   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   641       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   642     global_state.value.snapshot(name, pending_edits)
   643 
   644   @tailrec final def await_stable_snapshot(): Document.Snapshot =
   645   {
   646     val snapshot = this.snapshot()
   647     if (snapshot.is_outdated) {
   648       Thread.sleep(output_delay.ms)
   649       await_stable_snapshot()
   650     }
   651     else snapshot
   652   }
   653 
   654   def start(start_prover: Prover.Receiver => Prover)
   655   {
   656     file_formats
   657     _phase.change(
   658       {
   659         case Session.Inactive =>
   660           manager.send(Start(start_prover))
   661           post_phase(Session.Startup)
   662         case phase => error("Cannot start prover in phase " + quote(phase.print))
   663       })
   664   }
   665 
   666   def send_stop()
   667   {
   668     val was_ready =
   669       _phase.guarded_access(phase =>
   670         phase match {
   671           case Session.Startup | Session.Shutdown => None
   672           case Session.Terminated(_) => Some((false, phase))
   673           case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0)))))
   674           case Session.Ready => Some((true, post_phase(Session.Shutdown)))
   675         })
   676     if (was_ready) manager.send(Stop)
   677   }
   678 
   679   def stop(): Process_Result =
   680   {
   681     send_stop()
   682     prover.await_reset()
   683 
   684     change_parser.shutdown()
   685     change_buffer.shutdown()
   686     manager.shutdown()
   687     dispatcher.shutdown()
   688 
   689     phase match {
   690       case Session.Terminated(result) => result
   691       case phase => error("Bad session phase after shutdown: " + quote(phase.print))
   692     }
   693   }
   694 
   695   def protocol_command(name: String, args: String*)
   696   { manager.send(Protocol_Command(name, args.toList)) }
   697 
   698   def cancel_exec(exec_id: Document_ID.Exec)
   699   { manager.send(Cancel_Exec(exec_id)) }
   700 
   701   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   702   { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   703 
   704   def update_options(options: Options)
   705   { manager.send_wait(Update_Options(options)) }
   706 
   707   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   708   { manager.send(Session.Dialog_Result(id, serial, result)) }
   709 }