src/Pure/System/session.scala
author wenzelm
Tue Apr 09 20:16:52 2013 +0200 (2013-04-09 ago)
changeset 51662 3391a493f39a
parent 51083 10062c40ddaa
child 51664 080ef458f21a
permissions -rw-r--r--
just one timing protocol function, with 3 implementations: TTY/PG, PIDE/document, build;
     1 /*  Title:      Pure/System/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Main Isabelle/Scala session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.lang.System
    12 import java.util.{Timer, TimerTask}
    13 
    14 import scala.collection.mutable
    15 import scala.collection.immutable.Queue
    16 import scala.actors.TIMEOUT
    17 import scala.actors.Actor._
    18 
    19 
    20 object Session
    21 {
    22   /* events */
    23 
    24   //{{{
    25   case class Statistics(props: Properties.T)
    26   case class Global_Options(options: Options)
    27   case object Caret_Focus
    28   case class Raw_Edits(edits: List[Document.Edit_Text])
    29   case class Dialog_Result(id: Document.ID, serial: Long, result: String)
    30   case class Commands_Changed(
    31     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    32 
    33   sealed abstract class Phase
    34   case object Inactive extends Phase
    35   case object Startup extends Phase  // transient
    36   case object Failed extends Phase
    37   case object Ready extends Phase
    38   case object Shutdown extends Phase  // transient
    39   //}}}
    40 }
    41 
    42 
    43 class Session(val thy_load: Thy_Load)
    44 {
    45   /* global flags */
    46 
    47   @volatile var timing: Boolean = false
    48   @volatile var verbose: Boolean = false
    49 
    50 
    51   /* tuning parameters */
    52 
    53   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
    54   def message_delay: Time = Time.seconds(0.01)  // incoming prover messages
    55   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
    56   def prune_size: Int = 0  // size of retained history
    57   def syslog_limit: Int = 100
    58   def reparse_limit: Int = 0
    59 
    60 
    61   /* pervasive event buses */
    62 
    63   val statistics = new Event_Bus[Session.Statistics]
    64   val global_options = new Event_Bus[Session.Global_Options]
    65   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
    66   val raw_edits = new Event_Bus[Session.Raw_Edits]
    67   val commands_changed = new Event_Bus[Session.Commands_Changed]
    68   val phase_changed = new Event_Bus[Session.Phase]
    69   val syslog_messages = new Event_Bus[Isabelle_Process.Output]
    70   val raw_output_messages = new Event_Bus[Isabelle_Process.Output]
    71   val all_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
    72 
    73 
    74 
    75   /** buffered command changes (delay_first discipline) **/
    76 
    77   //{{{
    78   private case object Stop
    79 
    80   private val (_, commands_changed_buffer) =
    81     Simple_Thread.actor("commands_changed_buffer", daemon = true)
    82   {
    83     var finished = false
    84     while (!finished) {
    85       receive {
    86         case Stop => finished = true; reply(())
    87         case changed: Session.Commands_Changed => commands_changed.event(changed)
    88         case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
    89       }
    90     }
    91   }
    92   //}}}
    93 
    94 
    95   /** pipelined change parsing **/
    96 
    97   //{{{
    98   private case class Text_Edits(
    99     previous: Future[Document.Version],
   100     text_edits: List[Document.Edit_Text],
   101     version_result: Promise[Document.Version])
   102 
   103   private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
   104   {
   105     var finished = false
   106     while (!finished) {
   107       receive {
   108         case Stop => finished = true; reply(())
   109 
   110         case Text_Edits(previous, text_edits, version_result) =>
   111           val prev = previous.get_finished
   112           val (doc_edits, version) =
   113             Timing.timeit("Thy_Load.text_edits", timing) {
   114               thy_load.text_edits(reparse_limit, prev, text_edits)
   115             }
   116           version_result.fulfill(version)
   117           sender ! Change(doc_edits, prev, version)
   118 
   119         case bad => System.err.println("change_parser: ignoring bad message " + bad)
   120       }
   121     }
   122   }
   123   //}}}
   124 
   125 
   126 
   127   /** main protocol actor **/
   128 
   129   /* global state */
   130 
   131   private val syslog = Volatile(Queue.empty[XML.Elem])
   132   def current_syslog(): String = cat_lines(syslog().iterator.map(XML.content))
   133 
   134   @volatile private var _phase: Session.Phase = Session.Inactive
   135   private def phase_=(new_phase: Session.Phase)
   136   {
   137     _phase = new_phase
   138     phase_changed.event(new_phase)
   139   }
   140   def phase = _phase
   141   def is_ready: Boolean = phase == Session.Ready
   142 
   143   private val global_state = Volatile(Document.State.init)
   144   def current_state(): Document.State = global_state()
   145 
   146   def recent_syntax(): Outer_Syntax =
   147   {
   148     val version = current_state().recent_finished.version.get_finished
   149     if (version.is_init) thy_load.base_syntax
   150     else version.syntax
   151   }
   152 
   153   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   154       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   155     global_state().snapshot(name, pending_edits)
   156 
   157 
   158   /* theory files */
   159 
   160   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   161   {
   162     val header1 =
   163       if (thy_load.loaded_theories(name.theory))
   164         header.error("Attempt to update loaded theory " + quote(name.theory))
   165       else header
   166     (name, Document.Node.Deps(header1))
   167   }
   168 
   169 
   170   /* actor messages */
   171 
   172   private case class Start(args: List[String])
   173   private case object Cancel_Execution
   174   private case class Change(
   175     doc_edits: List[Document.Edit_Command],
   176     previous: Document.Version,
   177     version: Document.Version)
   178   private case class Messages(msgs: List[Isabelle_Process.Message])
   179   private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String)
   180 
   181   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   182   {
   183     val this_actor = self
   184 
   185     var prune_next = System.currentTimeMillis() + prune_delay.ms
   186 
   187     var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
   188 
   189 
   190     /* buffered prover messages */
   191 
   192     object receiver
   193     {
   194       private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   195 
   196       def flush(): Unit = synchronized {
   197         if (!buffer.isEmpty) {
   198           val msgs = buffer.toList
   199           this_actor ! Messages(msgs)
   200           buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   201         }
   202       }
   203       def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
   204         buffer += msg
   205         msg match {
   206           case output: Isabelle_Process.Output =>
   207             if (output.is_syslog)
   208               syslog >> (queue =>
   209                 {
   210                   val queue1 = queue.enqueue(output.message)
   211                   if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   212                 })
   213             if (output.is_protocol) flush()
   214           case _ =>
   215         }
   216       }
   217 
   218       private val timer = new Timer("session_actor.receiver", true)
   219       timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   220 
   221       def cancel() { timer.cancel() }
   222     }
   223 
   224     var prover: Option[Isabelle_Process with Protocol] = None
   225 
   226 
   227     /* delayed command changes */
   228 
   229     object delay_commands_changed
   230     {
   231       private var changed_assignment: Boolean = false
   232       private var changed_nodes: Set[Document.Node.Name] = Set.empty
   233       private var changed_commands: Set[Command] = Set.empty
   234 
   235       private var flush_time: Option[Long] = None
   236 
   237       def flush_timeout: Long =
   238         flush_time match {
   239           case None => 5000L
   240           case Some(time) => (time - System.currentTimeMillis()) max 0
   241         }
   242 
   243       def flush()
   244       {
   245         if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
   246           commands_changed_buffer !
   247             Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
   248         changed_assignment = false
   249         changed_nodes = Set.empty
   250         changed_commands = Set.empty
   251         flush_time = None
   252       }
   253 
   254       def invoke(assign: Boolean, commands: List[Command])
   255       {
   256         changed_assignment |= assign
   257         for (command <- commands) {
   258           changed_nodes += command.node_name
   259           changed_commands += command
   260         }
   261         val now = System.currentTimeMillis()
   262         flush_time match {
   263           case None => flush_time = Some(now + output_delay.ms)
   264           case Some(time) => if (now >= time) flush()
   265         }
   266       }
   267     }
   268 
   269 
   270     /* resulting changes */
   271 
   272     def handle_change(change: Change)
   273     //{{{
   274     {
   275       val previous = change.previous
   276       val version = change.version
   277       val doc_edits = change.doc_edits
   278 
   279       def id_command(command: Command)
   280       {
   281         if (!global_state().defined_command(command.id)) {
   282           global_state >> (_.define_command(command))
   283           prover.get.define_command(command)
   284         }
   285       }
   286       doc_edits foreach {
   287         case (_, edit) =>
   288           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   289       }
   290 
   291       val assignment = global_state().the_assignment(previous).check_finished
   292       global_state >> (_.define_version(version, assignment))
   293       prover.get.update(previous.id, version.id, doc_edits)
   294     }
   295     //}}}
   296 
   297 
   298     /* prover output */
   299 
   300     def handle_output(output: Isabelle_Process.Output)
   301     //{{{
   302     {
   303       def bad_output()
   304       {
   305         if (verbose)
   306           System.err.println("Ignoring prover output: " + output.message.toString)
   307       }
   308 
   309       def accumulate(state_id: Document.ID, message: XML.Elem)
   310       {
   311         try {
   312           val st = global_state >>> (_.accumulate(state_id, message))
   313           delay_commands_changed.invoke(false, List(st.command))
   314         }
   315         catch {
   316           case _: Document.State.Fail => bad_output()
   317         }
   318       }
   319 
   320       output.properties match {
   321 
   322         case Position.Id(state_id) if !output.is_protocol =>
   323           accumulate(state_id, output.message)
   324 
   325         case Markup.Command_Timing(state_id, timing) if output.is_protocol =>
   326           // FIXME XML.cache (!?)
   327           accumulate(state_id, XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))))
   328 
   329         case Markup.Assign_Execs if output.is_protocol =>
   330           XML.content(output.body) match {
   331             case Protocol.Assign(id, assign) =>
   332               try {
   333                 val cmds = global_state >>> (_.assign(id, assign))
   334                 delay_commands_changed.invoke(true, cmds)
   335               }
   336               catch { case _: Document.State.Fail => bad_output() }
   337             case _ => bad_output()
   338           }
   339           // FIXME separate timeout event/message!?
   340           if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   341             val old_versions = global_state >>> (_.prune_history(prune_size))
   342             if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   343             prune_next = System.currentTimeMillis() + prune_delay.ms
   344           }
   345 
   346         case Markup.Removed_Versions if output.is_protocol =>
   347           XML.content(output.body) match {
   348             case Protocol.Removed(removed) =>
   349               try {
   350                 global_state >> (_.removed_versions(removed))
   351               }
   352               catch { case _: Document.State.Fail => bad_output() }
   353             case _ => bad_output()
   354           }
   355 
   356         case Markup.Invoke_Scala(name, id) if output.is_protocol =>
   357           futures += (id ->
   358             default_thread_pool.submit(() =>
   359               {
   360                 val arg = XML.content(output.body)
   361                 val (tag, result) = Invoke_Scala.method(name, arg)
   362                 this_actor ! Finished_Scala(id, tag, result)
   363               }))
   364 
   365         case Markup.Cancel_Scala(id) if output.is_protocol =>
   366           futures.get(id) match {
   367             case Some(future) =>
   368               future.cancel(true)
   369               this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "")
   370             case None =>
   371           }
   372 
   373         case Markup.ML_Statistics(props) if output.is_protocol =>
   374           statistics.event(Session.Statistics(props))
   375 
   376         case Markup.Task_Statistics(props) if output.is_protocol =>
   377           // FIXME
   378 
   379         case _ if output.is_init =>
   380           phase = Session.Ready
   381 
   382         case Markup.Return_Code(rc) if output.is_exit =>
   383           if (rc == 0) phase = Session.Inactive
   384           else phase = Session.Failed
   385 
   386         case _ => bad_output()
   387       }
   388     }
   389     //}}}
   390 
   391 
   392     /* main loop */
   393 
   394     //{{{
   395     var finished = false
   396     while (!finished) {
   397       receiveWithin(delay_commands_changed.flush_timeout) {
   398         case TIMEOUT => delay_commands_changed.flush()
   399 
   400         case Start(args) if prover.isEmpty =>
   401           if (phase == Session.Inactive || phase == Session.Failed) {
   402             phase = Session.Startup
   403             prover = Some(new Isabelle_Process(receiver.invoke _, args) with Protocol)
   404           }
   405 
   406         case Stop =>
   407           if (phase == Session.Ready) {
   408             global_state >> (_ => Document.State.init)  // FIXME event bus!?
   409             phase = Session.Shutdown
   410             prover.get.terminate
   411             prover = None
   412             phase = Session.Inactive
   413           }
   414           finished = true
   415           receiver.cancel()
   416           reply(())
   417 
   418         case Session.Global_Options(options) if prover.isDefined =>
   419           if (is_ready) prover.get.options(options)
   420 
   421         case Cancel_Execution if prover.isDefined =>
   422           prover.get.cancel_execution()
   423 
   424         case raw @ Session.Raw_Edits(edits) if prover.isDefined =>
   425           prover.get.discontinue_execution()
   426 
   427           val previous = global_state().history.tip.version
   428           val version = Future.promise[Document.Version]
   429           val change = global_state >>> (_.continue_history(previous, edits, version))
   430           raw_edits.event(raw)
   431           change_parser ! Text_Edits(previous, edits, version)
   432 
   433           reply(())
   434 
   435         case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   436           prover.get.dialog_result(serial, result)
   437           handle_output(new Isabelle_Process.Output(Protocol.Dialog_Result(id, serial, result)))
   438 
   439         case Messages(msgs) =>
   440           msgs foreach {
   441             case input: Isabelle_Process.Input =>
   442               all_messages.event(input)
   443 
   444             case output: Isabelle_Process.Output =>
   445               if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   446               else handle_output(output)
   447               if (output.is_syslog) syslog_messages.event(output)
   448               all_messages.event(output)
   449           }
   450 
   451         case change: Change
   452         if prover.isDefined && global_state().is_assigned(change.previous) =>
   453           handle_change(change)
   454 
   455         case Finished_Scala(id, tag, result) if prover.isDefined =>
   456           if (futures.isDefinedAt(id)) {
   457             prover.get.invoke_scala(id, tag, result)
   458             futures -= id
   459           }
   460 
   461         case bad if !bad.isInstanceOf[Change] =>
   462           System.err.println("session_actor: ignoring bad message " + bad)
   463       }
   464     }
   465     //}}}
   466   }
   467 
   468 
   469   /* actions */
   470 
   471   def start(args: List[String])
   472   {
   473     global_options += session_actor
   474     session_actor ! Start(args)
   475   }
   476 
   477   def stop()
   478   {
   479     global_options -= session_actor
   480     commands_changed_buffer !? Stop
   481     change_parser !? Stop
   482     session_actor !? Stop
   483   }
   484 
   485   def cancel_execution() { session_actor ! Cancel_Execution }
   486 
   487   def update(edits: List[Document.Edit_Text])
   488   { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(edits) }
   489 
   490   def dialog_result(id: Document.ID, serial: Long, result: String)
   491   { session_actor ! Session.Dialog_Result(id, serial, result) }
   492 }