src/Pure/PIDE/session.scala
author wenzelm
Sun May 13 16:37:36 2018 +0200 (13 months ago)
changeset 68169 395432e7516e
parent 68168 a9b49430f061
child 68293 2bc4e5d9cca6
permissions -rw-r--r--
tuned signature;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import scala.collection.immutable.Queue
    12 
    13 
    14 object Session
    15 {
    16   /* outlets */
    17 
    18   object Consumer
    19   {
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    21       new Consumer[A](name, consume)
    22   }
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    24 
    25   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    26   {
    27     private val consumers = Synchronized[List[Consumer[A]]](Nil)
    28 
    29     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    30     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    31 
    32     def post(a: A)
    33     {
    34       for (c <- consumers.value.iterator) {
    35         dispatcher.send(() =>
    36           try { c.consume(a) }
    37           catch {
    38             case exn: Throwable =>
    39               Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    40           })
    41       }
    42     }
    43   }
    44 
    45 
    46   /* change */
    47 
    48   sealed case class Change(
    49     previous: Document.Version,
    50     syntax_changed: List[Document.Node.Name],
    51     deps_changed: Boolean,
    52     doc_edits: List[Document.Edit_Command],
    53     version: Document.Version)
    54 
    55   case object Change_Flush
    56 
    57 
    58   /* events */
    59 
    60   //{{{
    61   case class Statistics(props: Properties.T)
    62   case class Global_Options(options: Options)
    63   case object Caret_Focus
    64   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    65   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    66   case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
    67   case class Commands_Changed(
    68     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    69 
    70   sealed abstract class Phase
    71   {
    72     def print: String =
    73       this match {
    74         case Terminated(result) => if (result.ok) "finished" else "failed"
    75         case _ => Word.lowercase(this.toString)
    76       }
    77   }
    78   case object Inactive extends Phase  // stable
    79   case object Startup extends Phase  // transient
    80   case object Ready extends Phase  // metastable
    81   case object Shutdown extends Phase  // transient
    82   case class Terminated(result: Process_Result) extends Phase  // stable
    83   //}}}
    84 
    85 
    86   /* syslog */
    87 
    88   private[Session] class Syslog(limit: Int)
    89   {
    90     private var queue = Queue.empty[XML.Elem]
    91     private var length = 0
    92 
    93     def += (msg: XML.Elem): Unit = synchronized {
    94       queue = queue.enqueue(msg)
    95       length += 1
    96       if (length > limit) queue = queue.dequeue._2
    97     }
    98 
    99     def content: String = synchronized {
   100       cat_lines(queue.iterator.map(XML.content)) +
   101       (if (length > limit) "\n(A total of " + length + " messages...)" else "")
   102     }
   103   }
   104 
   105 
   106   /* protocol handlers */
   107 
   108   abstract class Protocol_Handler
   109   {
   110     def init(session: Session): Unit = {}
   111     def exit(): Unit = {}
   112     val functions: List[(String, Prover.Protocol_Output => Boolean)]
   113   }
   114 }
   115 
   116 
   117 class Session(session_options: => Options, val resources: Resources) extends Document.Session
   118 {
   119   session =>
   120 
   121   val xml_cache: XML.Cache = XML.make_cache()
   122   val xz_cache: XZ.Cache = XZ.make_cache()
   123 
   124 
   125   /* global flags */
   126 
   127   @volatile var timing: Boolean = false
   128   @volatile var verbose: Boolean = false
   129 
   130 
   131   /* dynamic session options */
   132 
   133   def output_delay: Time = session_options.seconds("editor_output_delay")
   134   def consolidate_delay: Time = session_options.seconds("editor_consolidate_delay")
   135   def prune_delay: Time = session_options.seconds("editor_prune_delay")
   136   def prune_size: Int = session_options.int("editor_prune_size")
   137   def syslog_limit: Int = session_options.int("editor_syslog_limit")
   138   def reparse_limit: Int = session_options.int("editor_reparse_limit")
   139 
   140 
   141   /* dispatcher */
   142 
   143   private val dispatcher =
   144     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   145 
   146   def assert_dispatcher[A](body: => A): A =
   147   {
   148     assert(dispatcher.check_thread)
   149     body
   150   }
   151 
   152   def require_dispatcher[A](body: => A): A =
   153   {
   154     require(dispatcher.check_thread)
   155     body
   156   }
   157 
   158   def send_dispatcher(body: => Unit): Unit =
   159   {
   160     if (dispatcher.check_thread) body
   161     else dispatcher.send(() => body)
   162   }
   163 
   164   def send_wait_dispatcher(body: => Unit): Unit =
   165   {
   166     if (dispatcher.check_thread) body
   167     else dispatcher.send_wait(() => body)
   168   }
   169 
   170 
   171   /* outlets */
   172 
   173   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   174   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   175   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   176   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   177   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   178   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   179   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   180   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   181   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   182   val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher)
   183 
   184   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck!
   185 
   186 
   187   /** main protocol manager **/
   188 
   189   /* internal messages */
   190 
   191   private case class Start(start_prover: Prover.Receiver => Prover)
   192   private case object Stop
   193   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   194   private case class Protocol_Command(name: String, args: List[String])
   195   private case class Update_Options(options: Options)
   196   private case object Consolidate_Execution
   197   private case object Prune_History
   198 
   199 
   200   /* phase */
   201 
   202   private def post_phase(new_phase: Session.Phase): Session.Phase =
   203   {
   204     phase_changed.post(new_phase)
   205     new_phase
   206   }
   207   private val _phase = Synchronized[Session.Phase](Session.Inactive)
   208   private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase))
   209 
   210   def phase = _phase.value
   211   def is_ready: Boolean = phase == Session.Ready
   212 
   213 
   214   /* global state */
   215 
   216   private val syslog = new Session.Syslog(syslog_limit)
   217   def syslog_content(): String = syslog.content
   218 
   219   private val global_state = Synchronized(Document.State.init)
   220   def current_state(): Document.State = global_state.value
   221 
   222   def recent_syntax(name: Document.Node.Name): Outer_Syntax =
   223     global_state.value.recent_finished.version.get_finished.nodes(name).syntax getOrElse
   224     resources.session_base.overall_syntax
   225 
   226 
   227   /* pipelined change parsing */
   228 
   229   private case class Text_Edits(
   230     previous: Future[Document.Version],
   231     doc_blobs: Document.Blobs,
   232     text_edits: List[Document.Edit_Text],
   233     version_result: Promise[Document.Version])
   234 
   235   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   236   {
   237     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   238       val prev = previous.get_finished
   239       val change =
   240         Timing.timeit("parse_change", timing) {
   241           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   242         }
   243       version_result.fulfill(change.version)
   244       manager.send(change)
   245       true
   246   }
   247 
   248 
   249   /* buffered changes */
   250 
   251   private object change_buffer
   252   {
   253     private var assignment: Boolean = false
   254     private var nodes: Set[Document.Node.Name] = Set.empty
   255     private var commands: Set[Command] = Set.empty
   256 
   257     def flush(): Unit = synchronized {
   258       if (assignment || nodes.nonEmpty || commands.nonEmpty)
   259         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   260       assignment = false
   261       nodes = Set.empty
   262       commands = Set.empty
   263     }
   264     private val delay_flush = Standard_Thread.delay_first(output_delay) { flush() }
   265 
   266     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   267       assignment |= assign
   268       for (command <- cmds) {
   269         nodes += command.node_name
   270         command.blobs_names.foreach(nodes += _)
   271         commands += command
   272       }
   273       delay_flush.invoke()
   274     }
   275 
   276     def shutdown()
   277     {
   278       delay_flush.revoke()
   279       flush()
   280     }
   281   }
   282 
   283 
   284   /* postponed changes */
   285 
   286   private object postponed_changes
   287   {
   288     private var postponed: List[Session.Change] = Nil
   289 
   290     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   291 
   292     def flush(state: Document.State): List[Session.Change] = synchronized {
   293       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   294       postponed = unassigned
   295       assigned.reverse
   296     }
   297   }
   298 
   299 
   300   /* prover process */
   301 
   302   private object prover
   303   {
   304     private val variable = Synchronized[Option[Prover]](None)
   305 
   306     def defined: Boolean = variable.value.isDefined
   307     def get: Prover = variable.value.get
   308     def set(p: Prover) { variable.change(_ => Some(p)) }
   309     def reset { variable.change(_ => None) }
   310     def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) }
   311   }
   312 
   313 
   314   /* protocol handlers */
   315 
   316   private val protocol_handlers = Protocol_Handlers.init(session)
   317 
   318   def get_protocol_handler(name: String): Option[Session.Protocol_Handler] =
   319     protocol_handlers.get(name)
   320 
   321   def init_protocol_handler(handler: Session.Protocol_Handler): Unit =
   322     protocol_handlers.init(handler)
   323 
   324   def init_protocol_handler(name: String): Unit =
   325     protocol_handlers.init(name)
   326 
   327 
   328   /* debugger */
   329 
   330   private val debugger_handler = new Debugger.Handler(this)
   331   init_protocol_handler(debugger_handler)
   332 
   333   def debugger: Debugger = debugger_handler.debugger
   334 
   335 
   336   /* manager thread */
   337 
   338   private val delay_prune =
   339     Standard_Thread.delay_first(prune_delay) { manager.send(Prune_History) }
   340 
   341   private val manager: Consumer_Thread[Any] =
   342   {
   343     /* raw edits */
   344 
   345     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   346     //{{{
   347     {
   348       require(prover.defined)
   349 
   350       prover.get.discontinue_execution()
   351 
   352       val previous = global_state.value.history.tip.version
   353       val version = Future.promise[Document.Version]
   354       global_state.change(_.continue_history(previous, edits, version))
   355 
   356       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   357       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   358     }
   359     //}}}
   360 
   361 
   362     /* resulting changes */
   363 
   364     def handle_change(change: Session.Change)
   365     //{{{
   366     {
   367       require(prover.defined)
   368 
   369       def id_command(command: Command)
   370       {
   371         for {
   372           (name, digest) <- command.blobs_defined
   373           if !global_state.value.defined_blob(digest)
   374         } {
   375           change.version.nodes(name).get_blob match {
   376             case Some(blob) =>
   377               global_state.change(_.define_blob(digest))
   378               prover.get.define_blob(digest, blob.bytes)
   379             case None =>
   380               Output.error_message("Missing blob " + quote(name.toString))
   381           }
   382         }
   383 
   384         if (!global_state.value.defined_command(command.id)) {
   385           global_state.change(_.define_command(command))
   386           prover.get.define_command(command)
   387         }
   388       }
   389       change.doc_edits foreach {
   390         case (_, edit) =>
   391           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   392       }
   393 
   394       val assignment = global_state.value.the_assignment(change.previous).check_finished
   395       global_state.change(_.define_version(change.version, assignment))
   396       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   397       resources.commit(change)
   398     }
   399     //}}}
   400 
   401 
   402     /* prover output */
   403 
   404     def handle_output(output: Prover.Output)
   405     //{{{
   406     {
   407       def bad_output()
   408       {
   409         if (verbose)
   410           Output.warning("Ignoring bad prover output: " + output.message.toString)
   411       }
   412 
   413       def change_command(f: Document.State => (Command.State, Document.State))
   414       {
   415         try {
   416           val st = global_state.change_result(f)
   417           change_buffer.invoke(false, List(st.command))
   418         }
   419         catch { case _: Document.State.Fail => bad_output() }
   420       }
   421 
   422       output match {
   423         case msg: Prover.Protocol_Output =>
   424           val handled = protocol_handlers.invoke(msg)
   425           if (!handled) {
   426             msg.properties match {
   427               case Markup.Protocol_Handler(name) if prover.defined =>
   428                 init_protocol_handler(name)
   429 
   430               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
   431                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   432                 change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache))
   433 
   434               case Protocol.Theory_Timing(_, _) =>
   435                 // FIXME
   436 
   437               case Markup.Export(args)
   438               if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined =>
   439                 val id = Value.Long.unapply(args.id.get).get
   440                 val export = Export.make_entry("", args, msg.bytes, cache = xz_cache)
   441                 change_command(_.add_export(id, (args.serial, export)))
   442 
   443               case Markup.Assign_Update =>
   444                 msg.text match {
   445                   case Protocol.Assign_Update(id, update) =>
   446                     try {
   447                       val cmds = global_state.change_result(_.assign(id, update))
   448                       change_buffer.invoke(true, cmds)
   449                       manager.send(Session.Change_Flush)
   450                     }
   451                     catch { case _: Document.State.Fail => bad_output() }
   452                   case _ => bad_output()
   453                 }
   454                 delay_prune.invoke()
   455 
   456               case Markup.Removed_Versions =>
   457                 msg.text match {
   458                   case Protocol.Removed(removed) =>
   459                     try {
   460                       global_state.change(_.removed_versions(removed))
   461                       manager.send(Session.Change_Flush)
   462                     }
   463                     catch { case _: Document.State.Fail => bad_output() }
   464                   case _ => bad_output()
   465                 }
   466 
   467               case Markup.ML_Statistics(props) =>
   468                 statistics.post(Session.Statistics(props))
   469 
   470               case Markup.Task_Statistics(props) =>
   471                 // FIXME
   472 
   473               case _ => bad_output()
   474             }
   475           }
   476         case _ =>
   477           output.properties match {
   478             case Position.Id(state_id) =>
   479               change_command(_.accumulate(state_id, output.message, xml_cache))
   480 
   481             case _ if output.is_init =>
   482               prover.get.options(session_options)
   483               prover.get.session_base(resources)
   484               phase = Session.Ready
   485               debugger.ready()
   486 
   487             case Markup.Process_Result(result) if output.is_exit =>
   488               phase = Session.Terminated(result)
   489               prover.reset
   490 
   491             case _ =>
   492               raw_output_messages.post(output)
   493           }
   494         }
   495     }
   496     //}}}
   497 
   498 
   499     /* main thread */
   500 
   501     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   502     {
   503       case arg: Any =>
   504         //{{{
   505         arg match {
   506           case output: Prover.Output =>
   507             if (output.is_stdout || output.is_stderr)
   508               raw_output_messages.post(output)
   509             else handle_output(output)
   510 
   511             if (output.is_syslog) {
   512               syslog += output.message
   513               syslog_messages.post(output)
   514             }
   515 
   516             all_messages.post(output)
   517 
   518           case input: Prover.Input =>
   519             all_messages.post(input)
   520 
   521           case Start(start_prover) if !prover.defined =>
   522             prover.set(start_prover(manager.send(_)))
   523 
   524           case Stop =>
   525             delay_prune.revoke()
   526             if (prover.defined) {
   527               protocol_handlers.exit()
   528               global_state.change(_ => Document.State.init)
   529               prover.get.terminate
   530             }
   531 
   532           case Consolidate_Execution =>
   533             if (prover.defined) prover.get.consolidate_execution()
   534 
   535           case Prune_History =>
   536             if (prover.defined) {
   537               val old_versions = global_state.change_result(_.remove_versions(prune_size))
   538               if (old_versions.nonEmpty) prover.get.remove_versions(old_versions)
   539             }
   540 
   541           case Update_Options(options) =>
   542             if (prover.defined && is_ready) {
   543               prover.get.options(options)
   544               handle_raw_edits(Document.Blobs.empty, Nil)
   545             }
   546             global_options.post(Session.Global_Options(options))
   547 
   548           case Cancel_Exec(exec_id) if prover.defined =>
   549             prover.get.cancel_exec(exec_id)
   550 
   551           case Session.Raw_Edits(doc_blobs, edits) if prover.defined =>
   552             handle_raw_edits(doc_blobs, edits)
   553 
   554           case Session.Dialog_Result(id, serial, result) if prover.defined =>
   555             prover.get.dialog_result(serial, result)
   556             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   557 
   558           case Protocol_Command(name, args) if prover.defined =>
   559             prover.get.protocol_command(name, args:_*)
   560 
   561           case change: Session.Change if prover.defined =>
   562             val state = global_state.value
   563             if (!state.removing_versions && state.is_assigned(change.previous))
   564               handle_change(change)
   565             else postponed_changes.store(change)
   566 
   567           case Session.Change_Flush if prover.defined =>
   568             val state = global_state.value
   569             if (!state.removing_versions)
   570               postponed_changes.flush(state).foreach(handle_change(_))
   571 
   572           case bad =>
   573             if (verbose) Output.warning("Ignoring bad message: " + bad.toString)
   574         }
   575         true
   576         //}}}
   577     }
   578   }
   579 
   580   private val consolidator: Thread =
   581     Standard_Thread.fork("Session.consolidator", daemon = true) {
   582       try {
   583         while (true) {
   584           Thread.sleep(consolidate_delay.ms)
   585 
   586           val state = global_state.value
   587           state.stable_tip_version match {
   588             case None =>
   589             case Some(version) =>
   590               val consolidated =
   591                 version.nodes.iterator.forall(
   592                   { case (name, _) =>
   593                       resources.session_base.loaded_theory(name) ||
   594                       state.node_consolidated(version, name) })
   595               if (!consolidated) manager.send(Consolidate_Execution)
   596           }
   597         }
   598       }
   599       catch { case Exn.Interrupt() => }
   600     }
   601 
   602 
   603   /* main operations */
   604 
   605   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   606       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   607     global_state.value.snapshot(name, pending_edits)
   608 
   609   def start(start_prover: Prover.Receiver => Prover)
   610   {
   611     _phase.change(
   612       {
   613         case Session.Inactive =>
   614           manager.send(Start(start_prover))
   615           post_phase(Session.Startup)
   616         case phase => error("Cannot start prover in phase " + quote(phase.print))
   617       })
   618   }
   619 
   620   def send_stop()
   621   {
   622     val was_ready =
   623       _phase.guarded_access(phase =>
   624         phase match {
   625           case Session.Startup | Session.Shutdown => None
   626           case Session.Terminated(_) => Some((false, phase))
   627           case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0)))))
   628           case Session.Ready => Some((true, post_phase(Session.Shutdown)))
   629         })
   630     if (was_ready) manager.send(Stop)
   631   }
   632 
   633   def stop(): Process_Result =
   634   {
   635     send_stop()
   636     prover.await_reset()
   637 
   638     change_parser.shutdown()
   639     change_buffer.shutdown()
   640     consolidator.interrupt
   641     consolidator.join
   642     manager.shutdown()
   643     dispatcher.shutdown()
   644 
   645     phase match {
   646       case Session.Terminated(result) => result
   647       case phase => error("Bad session phase after shutdown: " + quote(phase.print))
   648     }
   649   }
   650 
   651   def protocol_command(name: String, args: String*)
   652   { manager.send(Protocol_Command(name, args.toList)) }
   653 
   654   def cancel_exec(exec_id: Document_ID.Exec)
   655   { manager.send(Cancel_Exec(exec_id)) }
   656 
   657   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   658   { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   659 
   660   def update_options(options: Options)
   661   { manager.send_wait(Update_Options(options)) }
   662 
   663   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   664   { manager.send(Session.Dialog_Result(id, serial, result)) }
   665 }