src/Pure/PIDE/session.scala
author wenzelm
Fri Dec 21 13:33:56 2018 +0100 (11 months ago)
changeset 69492 b4b4d3ec55b3
parent 69488 b05c0bb47f6d
child 69640 af09cc4792dc
permissions -rw-r--r--
tuned signature;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import scala.collection.immutable.Queue
    12 
    13 
    14 object Session
    15 {
    16   /* outlets */
    17 
    18   object Consumer
    19   {
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    21       new Consumer[A](name, consume)
    22   }
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    24 
    25   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    26   {
    27     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    28 
    29     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    30     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    31 
    32     def post(a: A)
    33     {
    34       for (c <- consumers.value.iterator) {
    35         dispatcher.send(() =>
    36           try { c.consume(a) }
    37           catch {
    38             case exn: Throwable =>
    39               Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    40           })
    41       }
    42     }
    43   }
    44 
    45 
    46   /* change */
    47 
    48   sealed case class Change(
    49     previous: Document.Version,
    50     syntax_changed: List[Document.Node.Name],
    51     deps_changed: Boolean,
    52     doc_edits: List[Document.Edit_Command],
    53     consolidate: List[Document.Node.Name],
    54     version: Document.Version)
    55 
    56   case object Change_Flush
    57 
    58 
    59   /* events */
    60 
    61   //{{{
    62   case class Statistics(props: Properties.T)
    63   case class Global_Options(options: Options)
    64   case object Caret_Focus
    65   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    66   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    67   case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
    68   case class Commands_Changed(
    69     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    70 
    71   sealed abstract class Phase
    72   {
    73     def print: String =
    74       this match {
    75         case Terminated(result) => if (result.ok) "finished" else "failed"
    76         case _ => Word.lowercase(this.toString)
    77       }
    78   }
    79   case object Inactive extends Phase  // stable
    80   case object Startup extends Phase  // transient
    81   case object Ready extends Phase  // metastable
    82   case object Shutdown extends Phase  // transient
    83   case class Terminated(result: Process_Result) extends Phase  // stable
    84   //}}}
    85 
    86 
    87   /* syslog */
    88 
    89   private[Session] class Syslog(limit: Int)
    90   {
    91     private var queue = Queue.empty[XML.Elem]
    92     private var length = 0
    93 
    94     def += (msg: XML.Elem): Unit = synchronized {
    95       queue = queue.enqueue(msg)
    96       length += 1
    97       if (length > limit) queue = queue.dequeue._2
    98     }
    99 
   100     def content: String = synchronized {
   101       cat_lines(queue.iterator.map(XML.content)) +
   102       (if (length > limit) "\n(A total of " + length + " messages...)" else "")
   103     }
   104   }
   105 
   106 
   107   /* protocol handlers */
   108 
   109   abstract class Protocol_Handler
   110   {
   111     def init(session: Session): Unit = {}
   112     def exit(): Unit = {}
   113     val functions: List[(String, Prover.Protocol_Output => Boolean)]
   114   }
   115 }
   116 
   117 
   118 class Session(_session_options: => Options, val resources: Resources) extends Document.Session
   119 {
   120   session =>
   121 
   122   val xml_cache: XML.Cache = XML.make_cache()
   123   val xz_cache: XZ.Cache = XZ.make_cache()
   124 
   125 
   126   /* global flags */
   127 
   128   @volatile var timing: Boolean = false
   129   @volatile var verbose: Boolean = false
   130 
   131 
   132   /* dynamic session options */
   133 
   134   def session_options: Options = _session_options
   135 
   136   def output_delay: Time = session_options.seconds("editor_output_delay")
   137   def consolidate_delay: Time = session_options.seconds("editor_consolidate_delay")
   138   def prune_delay: Time = session_options.seconds("editor_prune_delay")
   139   def prune_size: Int = session_options.int("editor_prune_size")
   140   def syslog_limit: Int = session_options.int("editor_syslog_limit")
   141   def reparse_limit: Int = session_options.int("editor_reparse_limit")
   142 
   143 
   144   /* dispatcher */
   145 
   146   private val dispatcher =
   147     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   148 
   149   def assert_dispatcher[A](body: => A): A =
   150   {
   151     assert(dispatcher.check_thread)
   152     body
   153   }
   154 
   155   def require_dispatcher[A](body: => A): A =
   156   {
   157     require(dispatcher.check_thread)
   158     body
   159   }
   160 
   161   def send_dispatcher(body: => Unit): Unit =
   162   {
   163     if (dispatcher.check_thread) body
   164     else dispatcher.send(() => body)
   165   }
   166 
   167   def send_wait_dispatcher(body: => Unit): Unit =
   168   {
   169     if (dispatcher.check_thread) body
   170     else dispatcher.send_wait(() => body)
   171   }
   172 
   173 
   174   /* outlets */
   175 
   176   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   177   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   178   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   179   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   180   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   181   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   182   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   183   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   184   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   185   val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher)
   186 
   187   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck!
   188 
   189 
   190   /** main protocol manager **/
   191 
   192   /* internal messages */
   193 
   194   private case class Start(start_prover: Prover.Receiver => Prover)
   195   private case object Stop
   196   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   197   private case class Protocol_Command(name: String, args: List[String])
   198   private case class Update_Options(options: Options)
   199   private case object Consolidate_Execution
   200   private case object Prune_History
   201 
   202 
   203   /* phase */
   204 
   205   private def post_phase(new_phase: Session.Phase): Session.Phase =
   206   {
   207     phase_changed.post(new_phase)
   208     new_phase
   209   }
   210   private val _phase = Synchronized[Session.Phase](Session.Inactive)
   211   private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase))
   212 
   213   def phase = _phase.value
   214   def is_ready: Boolean = phase == Session.Ready
   215 
   216 
   217   /* global state */
   218 
   219   private val syslog = new Session.Syslog(syslog_limit)
   220   def syslog_content(): String = syslog.content
   221 
   222   private val global_state = Synchronized(Document.State.init)
   223   def current_state(): Document.State = global_state.value
   224 
   225   def recent_syntax(name: Document.Node.Name): Outer_Syntax =
   226     global_state.value.recent_finished.version.get_finished.nodes(name).syntax getOrElse
   227     resources.session_base.overall_syntax
   228 
   229 
   230   /* pipelined change parsing */
   231 
   232   private case class Text_Edits(
   233     previous: Future[Document.Version],
   234     doc_blobs: Document.Blobs,
   235     text_edits: List[Document.Edit_Text],
   236     consolidate: List[Document.Node.Name],
   237     version_result: Promise[Document.Version])
   238 
   239   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   240   {
   241     case Text_Edits(previous, doc_blobs, text_edits, consolidate, version_result) =>
   242       val prev = previous.get_finished
   243       val change =
   244         Timing.timeit("parse_change", timing) {
   245           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits, consolidate)
   246         }
   247       version_result.fulfill(change.version)
   248       manager.send(change)
   249       true
   250   }
   251 
   252 
   253   /* buffered changes */
   254 
   255   private object change_buffer
   256   {
   257     private var assignment: Boolean = false
   258     private var nodes: Set[Document.Node.Name] = Set.empty
   259     private var commands: Set[Command] = Set.empty
   260 
   261     def flush(): Unit = synchronized {
   262       if (assignment || nodes.nonEmpty || commands.nonEmpty)
   263         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   264       if (nodes.nonEmpty) consolidation.update(nodes)
   265       assignment = false
   266       nodes = Set.empty
   267       commands = Set.empty
   268     }
   269     private val delay_flush = Standard_Thread.delay_first(output_delay) { flush() }
   270 
   271     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   272       assignment |= assign
   273       for (command <- cmds) {
   274         nodes += command.node_name
   275         command.blobs_names.foreach(nodes += _)
   276         commands += command
   277       }
   278       delay_flush.invoke()
   279     }
   280 
   281     def shutdown()
   282     {
   283       delay_flush.revoke()
   284       flush()
   285     }
   286   }
   287 
   288 
   289   /* postponed changes */
   290 
   291   private object postponed_changes
   292   {
   293     private var postponed: List[Session.Change] = Nil
   294 
   295     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   296 
   297     def flush(state: Document.State): List[Session.Change] = synchronized {
   298       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   299       postponed = unassigned
   300       assigned.reverse
   301     }
   302   }
   303 
   304 
   305   /* node consolidation */
   306 
   307   private object consolidation
   308   {
   309     private val delay =
   310       Standard_Thread.delay_first(consolidate_delay) { manager.send(Consolidate_Execution) }
   311 
   312     private val init_state: Option[Set[Document.Node.Name]] = Some(Set.empty)
   313     private val state = Synchronized(init_state)
   314 
   315     def exit()
   316     {
   317       delay.revoke()
   318       state.change(_ => None)
   319     }
   320 
   321     def update(new_nodes: Set[Document.Node.Name] = Set.empty)
   322     {
   323       val active =
   324         state.change_result(st =>
   325           (st.isDefined, st.map(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes)))
   326       if (active) delay.invoke()
   327     }
   328 
   329     def flush(): Set[Document.Node.Name] =
   330       state.change_result(st => if (st.isDefined) (st.get, init_state) else (Set.empty, None))
   331   }
   332 
   333 
   334   /* prover process */
   335 
   336   private object prover
   337   {
   338     private val variable = Synchronized[Option[Prover]](None)
   339 
   340     def defined: Boolean = variable.value.isDefined
   341     def get: Prover = variable.value.get
   342     def set(p: Prover) { variable.change(_ => Some(p)) }
   343     def reset { variable.change(_ => None) }
   344     def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) }
   345   }
   346 
   347 
   348   /* file formats */
   349 
   350   lazy val file_formats: File_Format.Session =
   351     resources.file_formats.start_session(session)
   352 
   353 
   354   /* protocol handlers */
   355 
   356   private val protocol_handlers = Protocol_Handlers.init(session)
   357 
   358   def get_protocol_handler(name: String): Option[Session.Protocol_Handler] =
   359     protocol_handlers.get(name)
   360 
   361   def init_protocol_handler(handler: Session.Protocol_Handler): Unit =
   362     protocol_handlers.init(handler)
   363 
   364   def init_protocol_handler(name: String): Unit =
   365     protocol_handlers.init(name)
   366 
   367 
   368   /* debugger */
   369 
   370   private val debugger_handler = new Debugger.Handler(this)
   371   init_protocol_handler(debugger_handler)
   372 
   373   def debugger: Debugger = debugger_handler.debugger
   374 
   375 
   376   /* manager thread */
   377 
   378   private val delay_prune =
   379     Standard_Thread.delay_first(prune_delay) { manager.send(Prune_History) }
   380 
   381   private val manager: Consumer_Thread[Any] =
   382   {
   383     /* raw edits */
   384 
   385     def handle_raw_edits(
   386       doc_blobs: Document.Blobs = Document.Blobs.empty,
   387       edits: List[Document.Edit_Text] = Nil,
   388       consolidate: List[Document.Node.Name] = Nil)
   389     //{{{
   390     {
   391       require(prover.defined)
   392 
   393       prover.get.discontinue_execution()
   394 
   395       val previous = global_state.value.history.tip.version
   396       val version = Future.promise[Document.Version]
   397       global_state.change(_.continue_history(previous, edits, version))
   398 
   399       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   400       change_parser.send(Text_Edits(previous, doc_blobs, edits, consolidate, version))
   401     }
   402     //}}}
   403 
   404 
   405     /* resulting changes */
   406 
   407     def handle_change(change: Session.Change)
   408     //{{{
   409     {
   410       require(prover.defined)
   411 
   412       def id_command(command: Command)
   413       {
   414         for {
   415           (name, digest) <- command.blobs_defined
   416           if !global_state.value.defined_blob(digest)
   417         } {
   418           change.version.nodes(name).get_blob match {
   419             case Some(blob) =>
   420               global_state.change(_.define_blob(digest))
   421               prover.get.define_blob(digest, blob.bytes)
   422             case None =>
   423               Output.error_message("Missing blob " + quote(name.toString))
   424           }
   425         }
   426 
   427         if (!global_state.value.defined_command(command.id)) {
   428           global_state.change(_.define_command(command))
   429           prover.get.define_command(command)
   430         }
   431       }
   432       for { (_, edit) <- change.doc_edits } {
   433         edit.foreach({ case (c1, c2) => c1.foreach(id_command); c2.foreach(id_command) })
   434       }
   435 
   436       val assignment = global_state.value.the_assignment(change.previous).check_finished
   437       global_state.change(_.define_version(change.version, assignment))
   438       prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate)
   439       resources.commit(change)
   440     }
   441     //}}}
   442 
   443 
   444     /* prover output */
   445 
   446     def handle_output(output: Prover.Output)
   447     //{{{
   448     {
   449       def bad_output()
   450       {
   451         if (verbose)
   452           Output.warning("Ignoring bad prover output: " + output.message.toString)
   453       }
   454 
   455       def change_command(f: Document.State => (Command.State, Document.State))
   456       {
   457         try {
   458           val st = global_state.change_result(f)
   459           change_buffer.invoke(false, List(st.command))
   460         }
   461         catch { case _: Document.State.Fail => bad_output() }
   462       }
   463 
   464       output match {
   465         case msg: Prover.Protocol_Output =>
   466           val handled = protocol_handlers.invoke(msg)
   467           if (!handled) {
   468             msg.properties match {
   469               case Markup.Protocol_Handler(name) if prover.defined =>
   470                 init_protocol_handler(name)
   471 
   472               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
   473                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   474                 change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache))
   475 
   476               case Protocol.Theory_Timing(_, _) =>
   477                 // FIXME
   478 
   479               case Markup.Export(args)
   480               if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined =>
   481                 val id = Value.Long.unapply(args.id.get).get
   482                 val export = Export.make_entry("", args, msg.bytes, cache = xz_cache)
   483                 change_command(_.add_export(id, (args.serial, export)))
   484 
   485               case Markup.Assign_Update =>
   486                 msg.text match {
   487                   case Protocol.Assign_Update(id, update) =>
   488                     try {
   489                       val cmds = global_state.change_result(_.assign(id, update))
   490                       change_buffer.invoke(true, cmds)
   491                       manager.send(Session.Change_Flush)
   492                     }
   493                     catch { case _: Document.State.Fail => bad_output() }
   494                   case _ => bad_output()
   495                 }
   496                 delay_prune.invoke()
   497 
   498               case Markup.Removed_Versions =>
   499                 msg.text match {
   500                   case Protocol.Removed(removed) =>
   501                     try {
   502                       global_state.change(_.removed_versions(removed))
   503                       manager.send(Session.Change_Flush)
   504                     }
   505                     catch { case _: Document.State.Fail => bad_output() }
   506                   case _ => bad_output()
   507                 }
   508 
   509               case Markup.ML_Statistics(props) =>
   510                 statistics.post(Session.Statistics(props))
   511 
   512               case Markup.Task_Statistics(props) =>
   513                 // FIXME
   514 
   515               case _ => bad_output()
   516             }
   517           }
   518         case _ =>
   519           output.properties match {
   520             case Position.Id(state_id) =>
   521               change_command(_.accumulate(state_id, output.message, xml_cache))
   522 
   523             case _ if output.is_init =>
   524               prover.get.options(file_formats.prover_options(session_options))
   525               prover.get.session_base(resources)
   526               phase = Session.Ready
   527               debugger.ready()
   528 
   529             case Markup.Process_Result(result) if output.is_exit =>
   530               file_formats.stop_session
   531               phase = Session.Terminated(result)
   532               prover.reset
   533 
   534             case _ =>
   535               raw_output_messages.post(output)
   536           }
   537         }
   538     }
   539     //}}}
   540 
   541 
   542     /* main thread */
   543 
   544     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   545     {
   546       case arg: Any =>
   547         //{{{
   548         arg match {
   549           case output: Prover.Output =>
   550             if (output.is_stdout || output.is_stderr)
   551               raw_output_messages.post(output)
   552             else handle_output(output)
   553 
   554             if (output.is_syslog) {
   555               syslog += output.message
   556               syslog_messages.post(output)
   557             }
   558 
   559             all_messages.post(output)
   560 
   561           case input: Prover.Input =>
   562             all_messages.post(input)
   563 
   564           case Start(start_prover) if !prover.defined =>
   565             prover.set(start_prover(manager.send(_)))
   566 
   567           case Stop =>
   568             consolidation.exit()
   569             delay_prune.revoke()
   570             if (prover.defined) {
   571               protocol_handlers.exit()
   572               global_state.change(_ => Document.State.init)
   573               prover.get.terminate
   574             }
   575 
   576           case Consolidate_Execution =>
   577             if (prover.defined) {
   578               val state = global_state.value
   579               state.stable_tip_version match {
   580                 case None => consolidation.update()
   581                 case Some(version) =>
   582                   val consolidate =
   583                     consolidation.flush().iterator.filter(name =>
   584                       !resources.session_base.loaded_theory(name) &&
   585                       !state.node_consolidated(version, name) &&
   586                       state.node_maybe_consolidated(version, name)).toList
   587                   if (consolidate.nonEmpty) handle_raw_edits(consolidate = consolidate)
   588               }
   589             }
   590 
   591           case Prune_History =>
   592             if (prover.defined) {
   593               val old_versions = global_state.change_result(_.remove_versions(prune_size))
   594               if (old_versions.nonEmpty) prover.get.remove_versions(old_versions)
   595             }
   596 
   597           case Update_Options(options) =>
   598             if (prover.defined && is_ready) {
   599               prover.get.options(file_formats.prover_options(options))
   600               handle_raw_edits()
   601             }
   602             global_options.post(Session.Global_Options(options))
   603 
   604           case Cancel_Exec(exec_id) if prover.defined =>
   605             prover.get.cancel_exec(exec_id)
   606 
   607           case Session.Raw_Edits(doc_blobs, edits) if prover.defined =>
   608             handle_raw_edits(doc_blobs = doc_blobs, edits = edits)
   609 
   610           case Session.Dialog_Result(id, serial, result) if prover.defined =>
   611             prover.get.dialog_result(serial, result)
   612             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   613 
   614           case Protocol_Command(name, args) if prover.defined =>
   615             prover.get.protocol_command(name, args:_*)
   616 
   617           case change: Session.Change if prover.defined =>
   618             val state = global_state.value
   619             if (!state.removing_versions && state.is_assigned(change.previous))
   620               handle_change(change)
   621             else postponed_changes.store(change)
   622 
   623           case Session.Change_Flush if prover.defined =>
   624             val state = global_state.value
   625             if (!state.removing_versions)
   626               postponed_changes.flush(state).foreach(handle_change(_))
   627 
   628           case bad =>
   629             if (verbose) Output.warning("Ignoring bad message: " + bad.toString)
   630         }
   631         true
   632         //}}}
   633     }
   634   }
   635 
   636 
   637   /* main operations */
   638 
   639   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   640       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   641     global_state.value.snapshot(name, pending_edits)
   642 
   643   def start(start_prover: Prover.Receiver => Prover)
   644   {
   645     file_formats
   646     _phase.change(
   647       {
   648         case Session.Inactive =>
   649           manager.send(Start(start_prover))
   650           post_phase(Session.Startup)
   651         case phase => error("Cannot start prover in phase " + quote(phase.print))
   652       })
   653   }
   654 
   655   def send_stop()
   656   {
   657     val was_ready =
   658       _phase.guarded_access(phase =>
   659         phase match {
   660           case Session.Startup | Session.Shutdown => None
   661           case Session.Terminated(_) => Some((false, phase))
   662           case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0)))))
   663           case Session.Ready => Some((true, post_phase(Session.Shutdown)))
   664         })
   665     if (was_ready) manager.send(Stop)
   666   }
   667 
   668   def stop(): Process_Result =
   669   {
   670     send_stop()
   671     prover.await_reset()
   672 
   673     change_parser.shutdown()
   674     change_buffer.shutdown()
   675     manager.shutdown()
   676     dispatcher.shutdown()
   677 
   678     phase match {
   679       case Session.Terminated(result) => result
   680       case phase => error("Bad session phase after shutdown: " + quote(phase.print))
   681     }
   682   }
   683 
   684   def protocol_command(name: String, args: String*)
   685   { manager.send(Protocol_Command(name, args.toList)) }
   686 
   687   def cancel_exec(exec_id: Document_ID.Exec)
   688   { manager.send(Cancel_Exec(exec_id)) }
   689 
   690   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   691   { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   692 
   693   def update_options(options: Options)
   694   { manager.send_wait(Update_Options(options)) }
   695 
   696   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   697   { manager.send(Session.Dialog_Result(id, serial, result)) }
   698 }