src/Pure/System/session.scala
author wenzelm
Mon Jul 15 10:25:35 2013 +0200 (2013-07-15 ago)
changeset 52655 3b2b1ef13979
parent 52649 f45ab3e8211b
child 52760 8517172b9626
permissions -rw-r--r--
more careful termination of removed execs, leaving running execs undisturbed;
     1 /*  Title:      Pure/System/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Main Isabelle/Scala session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.lang.System
    12 import java.util.{Timer, TimerTask}
    13 
    14 import scala.collection.mutable
    15 import scala.collection.immutable.Queue
    16 import scala.actors.TIMEOUT
    17 import scala.actors.Actor._
    18 
    19 
    20 object Session
    21 {
    22   /* events */
    23 
    24   //{{{
    25   case class Statistics(props: Properties.T)
    26   case class Global_Options(options: Options)
    27   case object Caret_Focus
    28   case class Raw_Edits(edits: List[Document.Edit_Text])
    29   case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String)
    30   case class Commands_Changed(
    31     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    32 
    33   sealed abstract class Phase
    34   case object Inactive extends Phase
    35   case object Startup extends Phase  // transient
    36   case object Failed extends Phase
    37   case object Ready extends Phase
    38   case object Shutdown extends Phase  // transient
    39   //}}}
    40 
    41 
    42   /* protocol handlers */
    43 
    44   type Prover = Isabelle_Process with Protocol
    45 
    46   abstract class Protocol_Handler
    47   {
    48     def stop(prover: Prover): Unit = {}
    49     val functions: Map[String, (Prover, Isabelle_Process.Output) => Boolean]
    50   }
    51 
    52   class Protocol_Handlers(
    53     handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    54     functions: Map[String, Isabelle_Process.Output => Boolean] = Map.empty)
    55   {
    56     def add(prover: Prover, name: String): Protocol_Handlers =
    57     {
    58       val (handlers1, functions1) =
    59         handlers.get(name) match {
    60           case Some(old_handler) =>
    61             System.err.println("Redefining protocol handler: " + name)
    62             old_handler.stop(prover)
    63             (handlers - name, functions -- old_handler.functions.keys)
    64           case None => (handlers, functions)
    65         }
    66 
    67       val (handlers2, functions2) =
    68         try {
    69           val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
    70           val new_functions =
    71             for ((a, f) <- new_handler.functions.toList) yield
    72               (a, (output: Isabelle_Process.Output) => f(prover, output))
    73 
    74           val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
    75           if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
    76 
    77           (handlers1 + (name -> new_handler), functions1 ++ new_functions)
    78         }
    79         catch {
    80           case exn: Throwable =>
    81             System.err.println("Failed to initialize protocol handler: " +
    82               name + "\n" + Exn.message(exn))
    83             (handlers1, functions1)
    84         }
    85 
    86       new Protocol_Handlers(handlers2, functions2)
    87     }
    88 
    89     def invoke(output: Isabelle_Process.Output): Boolean =
    90       output.properties match {
    91         case Markup.Function(a) if functions.isDefinedAt(a) =>
    92           try { functions(a)(output) }
    93           catch {
    94             case exn: Throwable =>
    95               System.err.println("Failed invocation of protocol function: " +
    96                 quote(a) + "\n" + Exn.message(exn))
    97             false
    98           }
    99         case _ => false
   100       }
   101 
   102     def stop(prover: Prover): Protocol_Handlers =
   103     {
   104       for ((_, handler) <- handlers) handler.stop(prover)
   105       new Protocol_Handlers()
   106     }
   107   }
   108 }
   109 
   110 
   111 class Session(val thy_load: Thy_Load)
   112 {
   113   /* global flags */
   114 
   115   @volatile var timing: Boolean = false
   116   @volatile var verbose: Boolean = false
   117 
   118 
   119   /* tuning parameters */
   120 
   121   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
   122   def message_delay: Time = Time.seconds(0.01)  // incoming prover messages
   123   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
   124   def prune_size: Int = 0  // size of retained history
   125   def syslog_limit: Int = 100
   126   def reparse_limit: Int = 0
   127 
   128 
   129   /* pervasive event buses */
   130 
   131   val statistics = new Event_Bus[Session.Statistics]
   132   val global_options = new Event_Bus[Session.Global_Options]
   133   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
   134   val raw_edits = new Event_Bus[Session.Raw_Edits]
   135   val commands_changed = new Event_Bus[Session.Commands_Changed]
   136   val phase_changed = new Event_Bus[Session.Phase]
   137   val syslog_messages = new Event_Bus[Isabelle_Process.Output]
   138   val raw_output_messages = new Event_Bus[Isabelle_Process.Output]
   139   val all_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
   140 
   141 
   142 
   143   /** buffered command changes (delay_first discipline) **/
   144 
   145   //{{{
   146   private case object Stop
   147 
   148   private val (_, commands_changed_buffer) =
   149     Simple_Thread.actor("commands_changed_buffer", daemon = true)
   150   {
   151     var finished = false
   152     while (!finished) {
   153       receive {
   154         case Stop => finished = true; reply(())
   155         case changed: Session.Commands_Changed => commands_changed.event(changed)
   156         case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
   157       }
   158     }
   159   }
   160   //}}}
   161 
   162 
   163   /** pipelined change parsing **/
   164 
   165   //{{{
   166   private case class Text_Edits(
   167     previous: Future[Document.Version],
   168     text_edits: List[Document.Edit_Text],
   169     version_result: Promise[Document.Version])
   170 
   171   private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
   172   {
   173     var finished = false
   174     while (!finished) {
   175       receive {
   176         case Stop => finished = true; reply(())
   177 
   178         case Text_Edits(previous, text_edits, version_result) =>
   179           val prev = previous.get_finished
   180           val (doc_edits, version) =
   181             Timing.timeit("Thy_Load.text_edits", timing) {
   182               thy_load.text_edits(reparse_limit, prev, text_edits)
   183             }
   184           version_result.fulfill(version)
   185           sender ! Change(doc_edits, prev, version)
   186 
   187         case bad => System.err.println("change_parser: ignoring bad message " + bad)
   188       }
   189     }
   190   }
   191   //}}}
   192 
   193 
   194 
   195   /** main protocol actor **/
   196 
   197   /* global state */
   198 
   199   private val syslog = Volatile(Queue.empty[XML.Elem])
   200   def current_syslog(): String = cat_lines(syslog().iterator.map(XML.content))
   201 
   202   @volatile private var _phase: Session.Phase = Session.Inactive
   203   private def phase_=(new_phase: Session.Phase)
   204   {
   205     _phase = new_phase
   206     phase_changed.event(new_phase)
   207   }
   208   def phase = _phase
   209   def is_ready: Boolean = phase == Session.Ready
   210 
   211   private val global_state = Volatile(Document.State.init)
   212   def current_state(): Document.State = global_state()
   213 
   214   def recent_syntax(): Outer_Syntax =
   215   {
   216     val version = current_state().recent_finished.version.get_finished
   217     if (version.is_init) thy_load.base_syntax
   218     else version.syntax
   219   }
   220 
   221   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   222       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   223     global_state().snapshot(name, pending_edits)
   224 
   225 
   226   /* theory files */
   227 
   228   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   229   {
   230     val header1 =
   231       if (thy_load.loaded_theories(name.theory))
   232         header.error("Attempt to update loaded theory " + quote(name.theory))
   233       else header
   234     (name, Document.Node.Deps(header1))
   235   }
   236 
   237 
   238   /* actor messages */
   239 
   240   private case class Start(args: List[String])
   241   private case object Cancel_Execution
   242   private case class Change(
   243     doc_edits: List[Document.Edit_Command],
   244     previous: Document.Version,
   245     version: Document.Version)
   246   private case class Messages(msgs: List[Isabelle_Process.Message])
   247   private case class Update_Options(options: Options)
   248 
   249   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   250   {
   251     val this_actor = self
   252 
   253     var protocol_handlers = new Session.Protocol_Handlers()
   254 
   255     var prune_next = System.currentTimeMillis() + prune_delay.ms
   256 
   257 
   258     /* buffered prover messages */
   259 
   260     object receiver
   261     {
   262       private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   263 
   264       def flush(): Unit = synchronized {
   265         if (!buffer.isEmpty) {
   266           val msgs = buffer.toList
   267           this_actor ! Messages(msgs)
   268           buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   269         }
   270       }
   271       def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
   272         buffer += msg
   273         msg match {
   274           case output: Isabelle_Process.Output =>
   275             if (output.is_syslog)
   276               syslog >> (queue =>
   277                 {
   278                   val queue1 = queue.enqueue(output.message)
   279                   if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   280                 })
   281             if (output.is_protocol) flush()
   282           case _ =>
   283         }
   284       }
   285 
   286       private val timer = new Timer("session_actor.receiver", true)
   287       timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   288 
   289       def cancel() { timer.cancel() }
   290     }
   291 
   292     var prover: Option[Session.Prover] = None
   293 
   294 
   295     /* delayed command changes */
   296 
   297     object delay_commands_changed
   298     {
   299       private var changed_assignment: Boolean = false
   300       private var changed_nodes: Set[Document.Node.Name] = Set.empty
   301       private var changed_commands: Set[Command] = Set.empty
   302 
   303       private var flush_time: Option[Long] = None
   304 
   305       def flush_timeout: Long =
   306         flush_time match {
   307           case None => 5000L
   308           case Some(time) => (time - System.currentTimeMillis()) max 0
   309         }
   310 
   311       def flush()
   312       {
   313         if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
   314           commands_changed_buffer !
   315             Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
   316         changed_assignment = false
   317         changed_nodes = Set.empty
   318         changed_commands = Set.empty
   319         flush_time = None
   320       }
   321 
   322       def invoke(assign: Boolean, commands: List[Command])
   323       {
   324         changed_assignment |= assign
   325         for (command <- commands) {
   326           changed_nodes += command.node_name
   327           changed_commands += command
   328         }
   329         val now = System.currentTimeMillis()
   330         flush_time match {
   331           case None => flush_time = Some(now + output_delay.ms)
   332           case Some(time) => if (now >= time) flush()
   333         }
   334       }
   335     }
   336 
   337 
   338     /* raw edits */
   339 
   340     def handle_raw_edits(edits: List[Document.Edit_Text])
   341     //{{{
   342     {
   343       prover.get.discontinue_execution()
   344 
   345       val previous = global_state().history.tip.version
   346       val version = Future.promise[Document.Version]
   347       val change = global_state >>> (_.continue_history(previous, edits, version))
   348 
   349       raw_edits.event(Session.Raw_Edits(edits))
   350       change_parser ! Text_Edits(previous, edits, version)
   351     }
   352     //}}}
   353 
   354 
   355     /* resulting changes */
   356 
   357     def handle_change(change: Change)
   358     //{{{
   359     {
   360       val previous = change.previous
   361       val version = change.version
   362       val doc_edits = change.doc_edits
   363 
   364       def id_command(command: Command)
   365       {
   366         if (!global_state().defined_command(command.id)) {
   367           global_state >> (_.define_command(command))
   368           prover.get.define_command(command)
   369         }
   370       }
   371       doc_edits foreach {
   372         case (_, edit) =>
   373           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   374       }
   375 
   376       val assignment = global_state().the_assignment(previous).check_finished
   377       global_state >> (_.define_version(version, assignment))
   378       prover.get.update(previous.id, version.id, doc_edits)
   379     }
   380     //}}}
   381 
   382 
   383     /* prover output */
   384 
   385     def handle_output(output: Isabelle_Process.Output)
   386     //{{{
   387     {
   388       def bad_output()
   389       {
   390         if (verbose)
   391           System.err.println("Ignoring prover output: " + output.message.toString)
   392       }
   393 
   394       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
   395       {
   396         try {
   397           val st = global_state >>> (_.accumulate(state_id, message))
   398           delay_commands_changed.invoke(false, List(st.command))
   399         }
   400         catch {
   401           case _: Document.State.Fail => bad_output()
   402         }
   403       }
   404 
   405       if (output.is_protocol) {
   406         val handled = protocol_handlers.invoke(output)
   407         if (!handled) {
   408           output.properties match {
   409             case Markup.Protocol_Handler(name) =>
   410               protocol_handlers = protocol_handlers.add(prover.get, name)
   411 
   412             case Protocol.Command_Timing(state_id, timing) =>
   413               val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   414               accumulate(state_id, prover.get.xml_cache.elem(message))
   415 
   416             case Markup.Assign_Update =>
   417               XML.content(output.body) match {
   418                 case Protocol.Assign_Update(id, update) =>
   419                   try {
   420                     val cmds = global_state >>> (_.assign(id, update))
   421                     delay_commands_changed.invoke(true, cmds)
   422                   }
   423                   catch { case _: Document.State.Fail => bad_output() }
   424                 case _ => bad_output()
   425               }
   426               // FIXME separate timeout event/message!?
   427               if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   428                 val old_versions = global_state >>> (_.prune_history(prune_size))
   429                 if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   430                 prune_next = System.currentTimeMillis() + prune_delay.ms
   431               }
   432 
   433             case Markup.Removed_Versions =>
   434               XML.content(output.body) match {
   435                 case Protocol.Removed(removed) =>
   436                   try {
   437                     global_state >> (_.removed_versions(removed))
   438                   }
   439                   catch { case _: Document.State.Fail => bad_output() }
   440                 case _ => bad_output()
   441               }
   442 
   443             case Markup.ML_Statistics(props) =>
   444               statistics.event(Session.Statistics(props))
   445 
   446             case Markup.Task_Statistics(props) =>
   447               // FIXME
   448 
   449             case _ => bad_output()
   450           }
   451         }
   452       }
   453       else {
   454         output.properties match {
   455           case Position.Id(state_id) =>
   456             accumulate(state_id, output.message)
   457 
   458           case _ if output.is_init =>
   459             phase = Session.Ready
   460 
   461           case Markup.Return_Code(rc) if output.is_exit =>
   462             if (rc == 0) phase = Session.Inactive
   463             else phase = Session.Failed
   464 
   465           case _ => bad_output()
   466         }
   467       }
   468     }
   469     //}}}
   470 
   471 
   472     /* main loop */
   473 
   474     //{{{
   475     var finished = false
   476     while (!finished) {
   477       receiveWithin(delay_commands_changed.flush_timeout) {
   478         case TIMEOUT => delay_commands_changed.flush()
   479 
   480         case Start(args) if prover.isEmpty =>
   481           if (phase == Session.Inactive || phase == Session.Failed) {
   482             phase = Session.Startup
   483             prover = Some(new Isabelle_Process(receiver.invoke _, args) with Protocol)
   484           }
   485 
   486         case Stop =>
   487           if (phase == Session.Ready) {
   488             protocol_handlers = protocol_handlers.stop(prover.get)
   489             global_state >> (_ => Document.State.init)  // FIXME event bus!?
   490             phase = Session.Shutdown
   491             prover.get.terminate
   492             prover = None
   493             phase = Session.Inactive
   494           }
   495           finished = true
   496           receiver.cancel()
   497           reply(())
   498 
   499         case Update_Options(options) if prover.isDefined =>
   500           if (is_ready) {
   501             prover.get.options(options)
   502             handle_raw_edits(Nil)
   503           }
   504           global_options.event(Session.Global_Options(options))
   505           reply(())
   506 
   507         case Cancel_Execution if prover.isDefined =>
   508           prover.get.cancel_execution()
   509 
   510         case Session.Raw_Edits(edits) if prover.isDefined =>
   511           handle_raw_edits(edits)
   512           reply(())
   513 
   514         case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   515           prover.get.dialog_result(serial, result)
   516           handle_output(new Isabelle_Process.Output(Protocol.Dialog_Result(id, serial, result)))
   517 
   518         case Messages(msgs) =>
   519           msgs foreach {
   520             case input: Isabelle_Process.Input =>
   521               all_messages.event(input)
   522 
   523             case output: Isabelle_Process.Output =>
   524               if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   525               else handle_output(output)
   526               if (output.is_syslog) syslog_messages.event(output)
   527               all_messages.event(output)
   528           }
   529 
   530         case change: Change
   531         if prover.isDefined && global_state().is_assigned(change.previous) =>
   532           handle_change(change)
   533 
   534         case bad if !bad.isInstanceOf[Change] =>
   535           System.err.println("session_actor: ignoring bad message " + bad)
   536       }
   537     }
   538     //}}}
   539   }
   540 
   541 
   542   /* actions */
   543 
   544   def start(args: List[String])
   545   {
   546     session_actor ! Start(args)
   547   }
   548 
   549   def stop()
   550   {
   551     commands_changed_buffer !? Stop
   552     change_parser !? Stop
   553     session_actor !? Stop
   554   }
   555 
   556   def cancel_execution() { session_actor ! Cancel_Execution }
   557 
   558   def update(edits: List[Document.Edit_Text])
   559   { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(edits) }
   560 
   561   def update_options(options: Options)
   562   { session_actor !? Update_Options(options) }
   563 
   564   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
   565   { session_actor ! Session.Dialog_Result(id, serial, result) }
   566 }