src/Pure/System/session.scala
author wenzelm
Wed May 22 14:10:45 2013 +0200 (2013-05-22 ago)
changeset 52111 1fd184eaa310
parent 52084 573e80625c78
child 52113 2d2b049429f3
permissions -rw-r--r--
explicit management of Session.Protocol_Handlers, with protocol state and functions;
more self-contained ML/Scala module Invoke_Scala;
     1 /*  Title:      Pure/System/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Main Isabelle/Scala session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.lang.System
    12 import java.util.{Timer, TimerTask}
    13 
    14 import scala.collection.mutable
    15 import scala.collection.immutable.Queue
    16 import scala.actors.TIMEOUT
    17 import scala.actors.Actor._
    18 
    19 
    20 object Session
    21 {
    22   /* events */
    23 
    24   //{{{
    25   case class Statistics(props: Properties.T)
    26   case class Global_Options(options: Options)
    27   case object Caret_Focus
    28   case class Raw_Edits(edits: List[Document.Edit_Text])
    29   case class Dialog_Result(id: Document.ID, serial: Long, result: String)
    30   case class Commands_Changed(
    31     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    32 
    33   sealed abstract class Phase
    34   case object Inactive extends Phase
    35   case object Startup extends Phase  // transient
    36   case object Failed extends Phase
    37   case object Ready extends Phase
    38   case object Shutdown extends Phase  // transient
    39   //}}}
    40 
    41 
    42   /* protocol handlers */
    43 
    44   type Prover = Isabelle_Process with Protocol
    45 
    46   abstract class Protocol_Handler
    47   {
    48     def stop(prover: Prover): Unit = {}
    49     val functions: Map[String, (Prover, Isabelle_Process.Output) => Boolean]
    50   }
    51 
    52   class Protocol_Handlers(
    53     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    54     functions: Map[String, Isabelle_Process.Output => Boolean] = Map.empty)
    55   {
    56     def add(prover: Prover, name: String): Protocol_Handlers =
    57     {
    58       val (handlers1, functions1) =
    59         handlers.get(name) match {
    60           case Some(old_handler) =>
    61             System.err.println("Redefining protocol handler: " + name)
    62             old_handler.stop(prover)
    63             (handlers - name, functions -- old_handler.functions.keys)
    64           case None => (handlers, functions)
    65         }
    66 
    67       val (handlers2, functions2) =
    68         try {
    69           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
    70           val new_functions =
    71             for ((a, f) <- new_handler.functions.toList) yield
    72               (a, (output: Isabelle_Process.Output) => f(prover, output))
    73 
    74           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
    75           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
    76 
    77           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
    78         }
    79         catch {
    80           case exn: Throwable =>
    81             System.err.println("Failed to initialize protocol handler: " +
    82               name + "\n" + Exn.message(exn))
    83             (handlers1, functions1)
    84         }
    85 
    86       new Protocol_Handlers(handlers2, functions2)
    87     }
    88 
    89     def invoke(output: Isabelle_Process.Output): Boolean =
    90       output.properties match {
    91         case Markup.Function(a) if functions.isDefinedAt(a) =>
    92           try { functions(a)(output) }
    93           catch {
    94             case exn: Throwable =>
    95               System.err.println("Failed invocation of protocol function: " +
    96                 quote(a) + "\n" + Exn.message(exn))
    97             false
    98           }
    99         case _ => false
   100       }
   101   }
   102 }
   103 
   104 
   105 class Session(val thy_load: Thy_Load)
   106 {
   107   /* global flags */
   108 
   109   @volatile var timing: Boolean = false
   110   @volatile var verbose: Boolean = false
   111 
   112 
   113   /* tuning parameters */
   114 
   115   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   116   def message_delay: Time = Time.seconds(0.01)  // incoming prover messages
   117   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   118   def prune_size: Int = 0  // size of retained history
   119   def syslog_limit: Int = 100
   120   def reparse_limit: Int = 0
   121 
   122 
   123   /* pervasive event buses */
   124 
   125   val statistics = new Event_Bus[Session.Statistics]
   126   val global_options = new Event_Bus[Session.Global_Options]
   127   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
   128   val raw_edits = new Event_Bus[Session.Raw_Edits]
   129   val commands_changed = new Event_Bus[Session.Commands_Changed]
   130   val phase_changed = new Event_Bus[Session.Phase]
   131   val syslog_messages = new Event_Bus[Isabelle_Process.Output]
   132   val raw_output_messages = new Event_Bus[Isabelle_Process.Output]
   133   val all_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
   134 
   135 
   136 
   137   /** buffered command changes (delay_first discipline) **/
   138 
   139   //{{{
   140   private case object Stop
   141 
   142   private val (_, commands_changed_buffer) =
   143     Simple_Thread.actor("commands_changed_buffer", daemon = true)
   144   {
   145     var finished = false
   146     while (!finished) {
   147       receive {
   148         case Stop => finished = true; reply(())
   149         case changed: Session.Commands_Changed => commands_changed.event(changed)
   150         case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
   151       }
   152     }
   153   }
   154   //}}}
   155 
   156 
   157   /** pipelined change parsing **/
   158 
   159   //{{{
   160   private case class Text_Edits(
   161     previous: Future[Document.Version],
   162     text_edits: List[Document.Edit_Text],
   163     version_result: Promise[Document.Version])
   164 
   165   private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
   166   {
   167     var finished = false
   168     while (!finished) {
   169       receive {
   170         case Stop => finished = true; reply(())
   171 
   172         case Text_Edits(previous, text_edits, version_result) =>
   173           val prev = previous.get_finished
   174           val (doc_edits, version) =
   175             Timing.timeit("Thy_Load.text_edits", timing) {
   176               thy_load.text_edits(reparse_limit, prev, text_edits)
   177             }
   178           version_result.fulfill(version)
   179           sender ! Change(doc_edits, prev, version)
   180 
   181         case bad => System.err.println("change_parser: ignoring bad message " + bad)
   182       }
   183     }
   184   }
   185   //}}}
   186 
   187 
   188 
   189   /** main protocol actor **/
   190 
   191   /* global state */
   192 
   193   private val syslog = Volatile(Queue.empty[XML.Elem])
   194   def current_syslog(): String = cat_lines(syslog().iterator.map(XML.content))
   195 
   196   @volatile private var _phase: Session.Phase = Session.Inactive
   197   private def phase_=(new_phase: Session.Phase)
   198   {
   199     _phase = new_phase
   200     phase_changed.event(new_phase)
   201   }
   202   def phase = _phase
   203   def is_ready: Boolean = phase == Session.Ready
   204 
   205   private val global_state = Volatile(Document.State.init)
   206   def current_state(): Document.State = global_state()
   207 
   208   def recent_syntax(): Outer_Syntax =
   209   {
   210     val version = current_state().recent_finished.version.get_finished
   211     if (version.is_init) thy_load.base_syntax
   212     else version.syntax
   213   }
   214 
   215   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   216       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   217     global_state().snapshot(name, pending_edits)
   218 
   219 
   220   /* theory files */
   221 
   222   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   223   {
   224     val header1 =
   225       if (thy_load.loaded_theories(name.theory))
   226         header.error("Attempt to update loaded theory " + quote(name.theory))
   227       else header
   228     (name, Document.Node.Deps(header1))
   229   }
   230 
   231 
   232   /* actor messages */
   233 
   234   private case class Start(args: List[String])
   235   private case object Cancel_Execution
   236   private case class Change(
   237     doc_edits: List[Document.Edit_Command],
   238     previous: Document.Version,
   239     version: Document.Version)
   240   private case class Messages(msgs: List[Isabelle_Process.Message])
   241   private case class Update_Options(options: Options)
   242 
   243   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   244   {
   245     val this_actor = self
   246 
   247     var protocol_handlers = new Session.Protocol_Handlers()
   248 
   249     var prune_next = System.currentTimeMillis() + prune_delay.ms
   250 
   251 
   252     /* buffered prover messages */
   253 
   254     object receiver
   255     {
   256       private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   257 
   258       def flush(): Unit = synchronized {
   259         if (!buffer.isEmpty) {
   260           val msgs = buffer.toList
   261           this_actor ! Messages(msgs)
   262           buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   263         }
   264       }
   265       def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
   266         buffer += msg
   267         msg match {
   268           case output: Isabelle_Process.Output =>
   269             if (output.is_syslog)
   270               syslog >> (queue =>
   271                 {
   272                   val queue1 = queue.enqueue(output.message)
   273                   if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   274                 })
   275             if (output.is_protocol) flush()
   276           case _ =>
   277         }
   278       }
   279 
   280       private val timer = new Timer("session_actor.receiver", true)
   281       timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   282 
   283       def cancel() { timer.cancel() }
   284     }
   285 
   286     var prover: Option[Session.Prover] = None
   287 
   288 
   289     /* delayed command changes */
   290 
   291     object delay_commands_changed
   292     {
   293       private var changed_assignment: Boolean = false
   294       private var changed_nodes: Set[Document.Node.Name] = Set.empty
   295       private var changed_commands: Set[Command] = Set.empty
   296 
   297       private var flush_time: Option[Long] = None
   298 
   299       def flush_timeout: Long =
   300         flush_time match {
   301           case None => 5000L
   302           case Some(time) => (time - System.currentTimeMillis()) max 0
   303         }
   304 
   305       def flush()
   306       {
   307         if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
   308           commands_changed_buffer !
   309             Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
   310         changed_assignment = false
   311         changed_nodes = Set.empty
   312         changed_commands = Set.empty
   313         flush_time = None
   314       }
   315 
   316       def invoke(assign: Boolean, commands: List[Command])
   317       {
   318         changed_assignment |= assign
   319         for (command <- commands) {
   320           changed_nodes += command.node_name
   321           changed_commands += command
   322         }
   323         val now = System.currentTimeMillis()
   324         flush_time match {
   325           case None => flush_time = Some(now + output_delay.ms)
   326           case Some(time) => if (now >= time) flush()
   327         }
   328       }
   329     }
   330 
   331 
   332     /* resulting changes */
   333 
   334     def handle_change(change: Change)
   335     //{{{
   336     {
   337       val previous = change.previous
   338       val version = change.version
   339       val doc_edits = change.doc_edits
   340 
   341       def id_command(command: Command)
   342       {
   343         if (!global_state().defined_command(command.id)) {
   344           global_state >> (_.define_command(command))
   345           prover.get.define_command(command)
   346         }
   347       }
   348       doc_edits foreach {
   349         case (_, edit) =>
   350           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   351       }
   352 
   353       val assignment = global_state().the_assignment(previous).check_finished
   354       global_state >> (_.define_version(version, assignment))
   355       prover.get.update(previous.id, version.id, doc_edits)
   356     }
   357     //}}}
   358 
   359 
   360     /* prover output */
   361 
   362     def handle_output(output: Isabelle_Process.Output)
   363     //{{{
   364     {
   365       def bad_output()
   366       {
   367         if (verbose)
   368           System.err.println("Ignoring prover output: " + output.message.toString)
   369       }
   370 
   371       def accumulate(state_id: Document.ID, message: XML.Elem)
   372       {
   373         try {
   374           val st = global_state >>> (_.accumulate(state_id, message))
   375           delay_commands_changed.invoke(false, List(st.command))
   376         }
   377         catch {
   378           case _: Document.State.Fail => bad_output()
   379         }
   380       }
   381 
   382       if (output.is_protocol) {
   383         val handled = protocol_handlers.invoke(output)
   384         if (!handled) {
   385           output.properties match {
   386             case Markup.Protocol_Handler(name) =>
   387               protocol_handlers = protocol_handlers.add(prover.get, name)
   388 
   389             case Protocol.Command_Timing(state_id, timing) =>
   390               val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   391               accumulate(state_id, prover.get.xml_cache.elem(message))
   392 
   393             case Markup.Assign_Execs =>
   394               XML.content(output.body) match {
   395                 case Protocol.Assign(id, assign) =>
   396                   try {
   397                     val cmds = global_state >>> (_.assign(id, assign))
   398                     delay_commands_changed.invoke(true, cmds)
   399                   }
   400                   catch { case _: Document.State.Fail => bad_output() }
   401                 case _ => bad_output()
   402               }
   403               // FIXME separate timeout event/message!?
   404               if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   405                 val old_versions = global_state >>> (_.prune_history(prune_size))
   406                 if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   407                 prune_next = System.currentTimeMillis() + prune_delay.ms
   408               }
   409 
   410             case Markup.Removed_Versions =>
   411               XML.content(output.body) match {
   412                 case Protocol.Removed(removed) =>
   413                   try {
   414                     global_state >> (_.removed_versions(removed))
   415                   }
   416                   catch { case _: Document.State.Fail => bad_output() }
   417                 case _ => bad_output()
   418               }
   419 
   420             case Markup.ML_Statistics(props) =>
   421               statistics.event(Session.Statistics(props))
   422 
   423             case Markup.Task_Statistics(props) =>
   424               // FIXME
   425 
   426             case _ => bad_output()
   427           }
   428         }
   429       }
   430       else {
   431         output.properties match {
   432           case Position.Id(state_id) =>
   433             accumulate(state_id, output.message)
   434 
   435           case _ if output.is_init =>
   436             phase = Session.Ready
   437 
   438           case Markup.Return_Code(rc) if output.is_exit =>
   439             if (rc == 0) phase = Session.Inactive
   440             else phase = Session.Failed
   441 
   442           case _ => bad_output()
   443         }
   444       }
   445     }
   446     //}}}
   447 
   448 
   449     /* main loop */
   450 
   451     //{{{
   452     var finished = false
   453     while (!finished) {
   454       receiveWithin(delay_commands_changed.flush_timeout) {
   455         case TIMEOUT => delay_commands_changed.flush()
   456 
   457         case Start(args) if prover.isEmpty =>
   458           if (phase == Session.Inactive || phase == Session.Failed) {
   459             phase = Session.Startup
   460             prover = Some(new Isabelle_Process(receiver.invoke _, args) with Protocol)
   461           }
   462 
   463         case Stop =>
   464           if (phase == Session.Ready) {
   465             global_state >> (_ => Document.State.init)  // FIXME event bus!?
   466             phase = Session.Shutdown
   467             prover.get.terminate
   468             prover = None
   469             phase = Session.Inactive
   470           }
   471           finished = true
   472           receiver.cancel()
   473           reply(())
   474 
   475         case Update_Options(options) if prover.isDefined =>
   476           if (is_ready) prover.get.options(options)
   477           global_options.event(Session.Global_Options(options))
   478           reply(())
   479 
   480         case Cancel_Execution if prover.isDefined =>
   481           prover.get.cancel_execution()
   482 
   483         case raw @ Session.Raw_Edits(edits) if prover.isDefined =>
   484           prover.get.discontinue_execution()
   485 
   486           val previous = global_state().history.tip.version
   487           val version = Future.promise[Document.Version]
   488           val change = global_state >>> (_.continue_history(previous, edits, version))
   489           raw_edits.event(raw)
   490           change_parser ! Text_Edits(previous, edits, version)
   491 
   492           reply(())
   493 
   494         case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   495           prover.get.dialog_result(serial, result)
   496           handle_output(new Isabelle_Process.Output(Protocol.Dialog_Result(id, serial, result)))
   497 
   498         case Messages(msgs) =>
   499           msgs foreach {
   500             case input: Isabelle_Process.Input =>
   501               all_messages.event(input)
   502 
   503             case output: Isabelle_Process.Output =>
   504               if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   505               else handle_output(output)
   506               if (output.is_syslog) syslog_messages.event(output)
   507               all_messages.event(output)
   508           }
   509 
   510         case change: Change
   511         if prover.isDefined && global_state().is_assigned(change.previous) =>
   512           handle_change(change)
   513 
   514         case bad if !bad.isInstanceOf[Change] =>
   515           System.err.println("session_actor: ignoring bad message " + bad)
   516       }
   517     }
   518     //}}}
   519   }
   520 
   521 
   522   /* actions */
   523 
   524   def start(args: List[String])
   525   {
   526     session_actor ! Start(args)
   527   }
   528 
   529   def stop()
   530   {
   531     commands_changed_buffer !? Stop
   532     change_parser !? Stop
   533     session_actor !? Stop
   534   }
   535 
   536   def cancel_execution() { session_actor ! Cancel_Execution }
   537 
   538   def update(edits: List[Document.Edit_Text])
   539   { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(edits) }
   540 
   541   def update_options(options: Options)
   542   { session_actor !? Update_Options(options) }
   543 
   544   def dialog_result(id: Document.ID, serial: Long, result: String)
   545   { session_actor ! Session.Dialog_Result(id, serial, result) }
   546 }