src/Pure/PIDE/session.scala
author wenzelm
Wed Jan 14 17:24:55 2015 +0100 (2015-01-14 ago)
changeset 59367 6193bbbbe564
parent 59366 e94df7f6b608
child 60075 b079ee0e766c
permissions -rw-r--r--
more type-safe handler interface;
proper progress for Build.Handler;
     1 /*  Title:      Pure/PIDE/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:
     4 
     5 PIDE editor session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import scala.collection.immutable.Queue
    12 
    13 
    14 object Session
    15 {
    16   /* outlets */
    17 
    18   object Consumer
    19   {
    20     def apply[A](name: String)(consume: A => Unit): Consumer[A] =
    21       new Consumer[A](name, consume)
    22   }
    23   final class Consumer[-A] private(val name: String, val consume: A => Unit)
    24 
    25   class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
    26   {
    27     private val consumers = Synchronized(List.empty[Consumer[A]])
    28 
    29     def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
    30     def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
    31 
    32     def post(a: A)
    33     {
    34       for (c <- consumers.value.iterator) {
    35         dispatcher.send(() =>
    36           try { c.consume(a) }
    37           catch {
    38             case exn: Throwable =>
    39               Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
    40           })
    41       }
    42     }
    43   }
    44 
    45 
    46   /* change */
    47 
    48   sealed case class Change(
    49     previous: Document.Version,
    50     syntax_changed: List[Document.Node.Name],
    51     deps_changed: Boolean,
    52     doc_edits: List[Document.Edit_Command],
    53     version: Document.Version)
    54 
    55   case object Change_Flush
    56 
    57 
    58   /* events */
    59 
    60   //{{{
    61   case class Statistics(props: Properties.T)
    62   case class Global_Options(options: Options)
    63   case object Caret_Focus
    64   case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
    65   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    66   case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
    67   case class Commands_Changed(
    68     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    69 
    70   sealed abstract class Phase
    71   case object Inactive extends Phase
    72   case object Startup extends Phase  // transient
    73   case object Failed extends Phase
    74   case object Ready extends Phase
    75   case object Shutdown extends Phase  // transient
    76   //}}}
    77 
    78 
    79   /* syslog */
    80 
    81   private[Session] class Syslog(limit: Int)
    82   {
    83     private var queue = Queue.empty[XML.Elem]
    84     private var length = 0
    85 
    86     def += (msg: XML.Elem): Unit = synchronized {
    87       queue = queue.enqueue(msg)
    88       length += 1
    89       if (length > limit) queue = queue.dequeue._2
    90     }
    91 
    92     def content: String = synchronized {
    93       cat_lines(queue.iterator.map(XML.content)) +
    94       (if (length > limit) "\n(A total of " + length + " messages...)" else "")
    95     }
    96   }
    97 
    98 
    99   /* protocol handlers */
   100 
   101   abstract class Protocol_Handler
   102   {
   103     def start(prover: Prover): Unit = {}
   104     def stop(prover: Prover): Unit = {}
   105     val functions: Map[String, (Prover, Prover.Protocol_Output) => Boolean]
   106   }
   107 
   108   def bad_protocol_handler(exn: Throwable, name: String): Unit =
   109     Output.error_message(
   110       "Failed to initialize protocol handler: " + quote(name) + "\n" + Exn.message(exn))
   111 
   112   private class Protocol_Handlers(
   113     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
   114     functions: Map[String, Prover.Protocol_Output => Boolean] = Map.empty)
   115   {
   116     def get(name: String): Option[Protocol_Handler] = handlers.get(name)
   117 
   118     def add(prover: Prover, handler: Protocol_Handler): Protocol_Handlers =
   119     {
   120       val name = handler.getClass.getName
   121 
   122       val (handlers1, functions1) =
   123         handlers.get(name) match {
   124           case Some(old_handler) =>
   125             Output.warning("Redefining protocol handler: " + name)
   126             old_handler.stop(prover)
   127             (handlers - name, functions -- old_handler.functions.keys)
   128           case None => (handlers, functions)
   129         }
   130 
   131       val (handlers2, functions2) =
   132         try {
   133           handler.start(prover)
   134 
   135           val new_functions =
   136             for ((a, f) <- handler.functions.toList) yield
   137               (a, (msg: Prover.Protocol_Output) => f(prover, msg))
   138 
   139           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
   140           if (dups.nonEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
   141 
   142           (handlers1 + (name -> handler), functions1 ++ new_functions)
   143         }
   144         catch {
   145           case exn: Throwable =>
   146             Session.bad_protocol_handler(exn, name)
   147             (handlers1, functions1)
   148         }
   149 
   150       new Protocol_Handlers(handlers2, functions2)
   151     }
   152 
   153     def invoke(msg: Prover.Protocol_Output): Boolean =
   154       msg.properties match {
   155         case Markup.Function(a) if functions.isDefinedAt(a) =>
   156           try { functions(a)(msg) }
   157           catch {
   158             case exn: Throwable =>
   159               Output.error_message(
   160                 "Failed invocation of protocol function: " + quote(a) + "\n" + Exn.message(exn))
   161             false
   162           }
   163         case _ => false
   164       }
   165 
   166     def stop(prover: Prover): Protocol_Handlers =
   167     {
   168       for ((_, handler) <- handlers) handler.stop(prover)
   169       new Protocol_Handlers()
   170     }
   171   }
   172 }
   173 
   174 
   175 class Session(val resources: Resources)
   176 {
   177   /* global flags */
   178 
   179   @volatile var timing: Boolean = false
   180   @volatile var verbose: Boolean = false
   181 
   182 
   183   /* tuning parameters */
   184 
   185   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   186   def prune_delay: Time = Time.seconds(60.0)  // prune history (delete old versions)
   187   def prune_size: Int = 0  // size of retained history
   188   def syslog_limit: Int = 100
   189   def reparse_limit: Int = 0
   190 
   191 
   192   /* outlets */
   193 
   194   private val dispatcher =
   195     Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
   196 
   197   val statistics = new Session.Outlet[Session.Statistics](dispatcher)
   198   val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
   199   val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
   200   val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
   201   val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
   202   val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
   203   val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
   204   val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
   205   val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
   206   val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
   207 
   208 
   209 
   210   /** main protocol manager **/
   211 
   212   /* internal messages */
   213 
   214   private case class Start(name: String, args: List[String])
   215   private case object Stop
   216   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   217   private case class Protocol_Command(name: String, args: List[String])
   218   private case class Update_Options(options: Options)
   219   private case object Prune_History
   220 
   221 
   222   /* global state */
   223 
   224   private val syslog = new Session.Syslog(syslog_limit)
   225   def syslog_content(): String = syslog.content
   226 
   227   @volatile private var _phase: Session.Phase = Session.Inactive
   228   private def phase_=(new_phase: Session.Phase)
   229   {
   230     _phase = new_phase
   231     phase_changed.post(new_phase)
   232   }
   233   def phase = _phase
   234   def is_ready: Boolean = phase == Session.Ready
   235 
   236   private val global_state = Synchronized(Document.State.init)
   237   def current_state(): Document.State = global_state.value
   238 
   239   def recent_syntax(name: Document.Node.Name): Prover.Syntax =
   240     current_state().recent_finished.version.get_finished.nodes(name).syntax getOrElse
   241     resources.base_syntax
   242 
   243 
   244   /* theory files */
   245 
   246   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   247   {
   248     val header1 =
   249       if (resources.loaded_theories(name.theory))
   250         header.error("Cannot update finished theory " + quote(name.theory))
   251       else header
   252     (name, Document.Node.Deps(header1))
   253   }
   254 
   255 
   256   /* pipelined change parsing */
   257 
   258   private case class Text_Edits(
   259     previous: Future[Document.Version],
   260     doc_blobs: Document.Blobs,
   261     text_edits: List[Document.Edit_Text],
   262     version_result: Promise[Document.Version])
   263 
   264   private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   265   {
   266     case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
   267       val prev = previous.get_finished
   268       val change =
   269         Timing.timeit("parse_change", timing) {
   270           resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
   271         }
   272       version_result.fulfill(change.version)
   273       manager.send(change)
   274       true
   275   }
   276 
   277 
   278   /* buffered changes */
   279 
   280   private object change_buffer
   281   {
   282     private var assignment: Boolean = false
   283     private var nodes: Set[Document.Node.Name] = Set.empty
   284     private var commands: Set[Command] = Set.empty
   285 
   286     def flush(): Unit = synchronized {
   287       if (assignment || nodes.nonEmpty || commands.nonEmpty)
   288         commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
   289       assignment = false
   290       nodes = Set.empty
   291       commands = Set.empty
   292     }
   293     private val delay_flush = Simple_Thread.delay_first(output_delay) { flush() }
   294 
   295     def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
   296       assignment |= assign
   297       for (command <- cmds) {
   298         nodes += command.node_name
   299         commands += command
   300       }
   301       delay_flush.invoke()
   302     }
   303 
   304     def shutdown()
   305     {
   306       delay_flush.revoke()
   307       flush()
   308     }
   309   }
   310 
   311 
   312   /* postponed changes */
   313 
   314   private object postponed_changes
   315   {
   316     private var postponed: List[Session.Change] = Nil
   317 
   318     def store(change: Session.Change): Unit = synchronized { postponed ::= change }
   319 
   320     def flush(state: Document.State): List[Session.Change] = synchronized {
   321       val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
   322       postponed = unassigned
   323       assigned.reverse
   324     }
   325   }
   326 
   327 
   328   /* prover process */
   329 
   330   private object prover
   331   {
   332     private val variable = Synchronized(None: Option[Prover])
   333 
   334     def defined: Boolean = variable.value.isDefined
   335     def get: Prover = variable.value.get
   336     def set(p: Prover) { variable.change(_ => Some(p)) }
   337     def reset { variable.change(_ => None) }
   338     def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) }
   339   }
   340 
   341 
   342   /* protocol handlers */
   343 
   344   private val _protocol_handlers = Synchronized(new Session.Protocol_Handlers)
   345 
   346   def get_protocol_handler(name: String): Option[Session.Protocol_Handler] =
   347     _protocol_handlers.value.get(name)
   348 
   349   def add_protocol_handler(handler: Session.Protocol_Handler): Unit =
   350     _protocol_handlers.change(_.add(prover.get, handler))
   351 
   352 
   353   /* manager thread */
   354 
   355   private val delay_prune = Simple_Thread.delay_first(prune_delay) { manager.send(Prune_History) }
   356 
   357   private val manager: Consumer_Thread[Any] =
   358   {
   359     /* raw edits */
   360 
   361     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   362     //{{{
   363     {
   364       require(prover.defined)
   365 
   366       prover.get.discontinue_execution()
   367 
   368       val previous = global_state.value.history.tip.version
   369       val version = Future.promise[Document.Version]
   370       global_state.change(_.continue_history(previous, edits, version))
   371 
   372       raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
   373       change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
   374     }
   375     //}}}
   376 
   377 
   378     /* resulting changes */
   379 
   380     def handle_change(change: Session.Change)
   381     //{{{
   382     {
   383       require(prover.defined)
   384 
   385       def id_command(command: Command)
   386       {
   387         for {
   388           (name, digest) <- command.blobs_defined
   389           if !global_state.value.defined_blob(digest)
   390         } {
   391           change.version.nodes(name).get_blob match {
   392             case Some(blob) =>
   393               global_state.change(_.define_blob(digest))
   394               prover.get.define_blob(digest, blob.bytes)
   395             case None =>
   396               Output.error_message("Missing blob " + quote(name.toString))
   397           }
   398         }
   399 
   400         if (!global_state.value.defined_command(command.id)) {
   401           global_state.change(_.define_command(command))
   402           prover.get.define_command(command)
   403         }
   404       }
   405       change.doc_edits foreach {
   406         case (_, edit) =>
   407           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   408       }
   409 
   410       val assignment = global_state.value.the_assignment(change.previous).check_finished
   411       global_state.change(_.define_version(change.version, assignment))
   412       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
   413       resources.commit(change)
   414     }
   415     //}}}
   416 
   417 
   418     /* prover output */
   419 
   420     def handle_output(output: Prover.Output)
   421     //{{{
   422     {
   423       def bad_output()
   424       {
   425         if (verbose)
   426           Output.warning("Ignoring bad prover output: " + output.message.toString)
   427       }
   428 
   429       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   430       {
   431         try {
   432           val st = global_state.change_result(_.accumulate(state_id, message))
   433           change_buffer.invoke(false, List(st.command))
   434         }
   435         catch {
   436           case _: Document.State.Fail => bad_output()
   437         }
   438       }
   439 
   440       output match {
   441         case msg: Prover.Protocol_Output =>
   442           val handled = _protocol_handlers.value.invoke(msg)
   443           if (!handled) {
   444             msg.properties match {
   445               case Markup.Protocol_Handler(name) if prover.defined =>
   446                 try {
   447                   val handler =
   448                     Class.forName(name).newInstance.asInstanceOf[Session.Protocol_Handler]
   449                   add_protocol_handler(handler)
   450                 }
   451                 catch { case exn: Throwable => Session.bad_protocol_handler(exn, name) }
   452 
   453               case Protocol.Command_Timing(state_id, timing) if prover.defined =>
   454                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   455                 accumulate(state_id, prover.get.xml_cache.elem(message))
   456 
   457               case Markup.Assign_Update =>
   458                 msg.text match {
   459                   case Protocol.Assign_Update(id, update) =>
   460                     try {
   461                       val cmds = global_state.change_result(_.assign(id, update))
   462                       change_buffer.invoke(true, cmds)
   463                       manager.send(Session.Change_Flush)
   464                     }
   465                     catch { case _: Document.State.Fail => bad_output() }
   466                   case _ => bad_output()
   467                 }
   468                 delay_prune.invoke()
   469 
   470               case Markup.Removed_Versions =>
   471                 msg.text match {
   472                   case Protocol.Removed(removed) =>
   473                     try {
   474                       global_state.change(_.removed_versions(removed))
   475                       manager.send(Session.Change_Flush)
   476                     }
   477                     catch { case _: Document.State.Fail => bad_output() }
   478                   case _ => bad_output()
   479                 }
   480 
   481               case Markup.ML_Statistics(props) =>
   482                 statistics.post(Session.Statistics(props))
   483 
   484               case Markup.Task_Statistics(props) =>
   485                 // FIXME
   486 
   487               case _ => bad_output()
   488             }
   489           }
   490         case _ =>
   491           output.properties match {
   492             case Position.Id(state_id) =>
   493               accumulate(state_id, output.message)
   494 
   495             case _ if output.is_init =>
   496               phase = Session.Ready
   497 
   498             case Markup.Return_Code(rc) if output.is_exit =>
   499               if (rc == 0) phase = Session.Inactive
   500               else phase = Session.Failed
   501               prover.reset
   502 
   503             case _ => raw_output_messages.post(output)
   504           }
   505         }
   506     }
   507     //}}}
   508 
   509 
   510     /* main thread */
   511 
   512     Consumer_Thread.fork[Any]("Session.manager", daemon = true)
   513     {
   514       case arg: Any =>
   515         //{{{
   516         arg match {
   517           case output: Prover.Output =>
   518             if (output.is_stdout || output.is_stderr)
   519               raw_output_messages.post(output)
   520             else handle_output(output)
   521 
   522             if (output.is_syslog) {
   523               syslog += output.message
   524               syslog_messages.post(output)
   525             }
   526 
   527             all_messages.post(output)
   528 
   529           case input: Prover.Input =>
   530             all_messages.post(input)
   531 
   532           case Start(name, args) if !prover.defined =>
   533             if (phase == Session.Inactive || phase == Session.Failed) {
   534               phase = Session.Startup
   535               prover.set(resources.start_prover(manager.send(_), name, args))
   536             }
   537 
   538           case Stop =>
   539             if (prover.defined && is_ready) {
   540               _protocol_handlers.change(_.stop(prover.get))
   541               global_state.change(_ => Document.State.init)
   542               phase = Session.Shutdown
   543               prover.get.terminate
   544             }
   545 
   546           case Prune_History =>
   547             if (prover.defined) {
   548               val old_versions = global_state.change_result(_.remove_versions(prune_size))
   549               if (old_versions.nonEmpty) prover.get.remove_versions(old_versions)
   550             }
   551 
   552           case Update_Options(options) =>
   553             if (prover.defined && is_ready) {
   554               prover.get.options(options)
   555               handle_raw_edits(Document.Blobs.empty, Nil)
   556             }
   557             global_options.post(Session.Global_Options(options))
   558 
   559           case Cancel_Exec(exec_id) if prover.defined =>
   560             prover.get.cancel_exec(exec_id)
   561 
   562           case Session.Raw_Edits(doc_blobs, edits) if prover.defined =>
   563             handle_raw_edits(doc_blobs, edits)
   564 
   565           case Session.Dialog_Result(id, serial, result) if prover.defined =>
   566             prover.get.dialog_result(serial, result)
   567             handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
   568 
   569           case Session.Build_Theories(id, master_dir, theories) if prover.defined =>
   570             prover.get.build_theories(id, master_dir, theories)
   571 
   572           case Protocol_Command(name, args) if prover.defined =>
   573             prover.get.protocol_command(name, args:_*)
   574 
   575           case change: Session.Change if prover.defined =>
   576             val state = global_state.value
   577             if (!state.removing_versions && state.is_assigned(change.previous))
   578               handle_change(change)
   579             else postponed_changes.store(change)
   580 
   581           case Session.Change_Flush if prover.defined =>
   582             val state = global_state.value
   583             if (!state.removing_versions)
   584               postponed_changes.flush(state).foreach(handle_change(_))
   585 
   586           case bad =>
   587             if (verbose) Output.warning("Ignoring bad message: " + bad.toString)
   588         }
   589         true
   590         //}}}
   591     }
   592   }
   593 
   594 
   595   /* main operations */
   596 
   597   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   598       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   599     global_state.value.snapshot(name, pending_edits)
   600 
   601   def start(name: String, args: List[String])
   602   { manager.send(Start(name, args)) }
   603 
   604   def stop()
   605   {
   606     manager.send_wait(Stop)
   607     prover.await_reset()
   608     change_parser.shutdown()
   609     change_buffer.shutdown()
   610     delay_prune.revoke()
   611     manager.shutdown()
   612     dispatcher.shutdown()
   613   }
   614 
   615   def protocol_command(name: String, args: String*)
   616   { manager.send(Protocol_Command(name, args.toList)) }
   617 
   618   def cancel_exec(exec_id: Document_ID.Exec)
   619   { manager.send(Cancel_Exec(exec_id)) }
   620 
   621   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
   622   { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
   623 
   624   def update_options(options: Options)
   625   { manager.send_wait(Update_Options(options)) }
   626 
   627   def init_options(options: Options)
   628   {
   629     update_options(options)
   630   }
   631 
   632   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   633   { manager.send(Session.Dialog_Result(id, serial, result)) }
   634 
   635   def build_theories(id: String, master_dir: Path, theories: List[(Options, List[Path])])
   636   { manager.send(Session.Build_Theories(id, master_dir, theories)) }
   637 }