src/Pure/System/session.scala
author wenzelm
Sun Feb 10 22:03:21 2013 +0100 (2013-02-10 ago)
changeset 51083 10062c40ddaa
parent 50975 73ec6ad6700e
child 51662 3391a493f39a
permissions -rw-r--r--
avoid crash (NPE) when properties are changed during prover startup (e.g. by font scaling);
     1 /*  Title:      Pure/System/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Main Isabelle/Scala session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.lang.System
    12 import java.util.{Timer, TimerTask}
    13 
    14 import scala.collection.mutable
    15 import scala.collection.immutable.Queue
    16 import scala.actors.TIMEOUT
    17 import scala.actors.Actor._
    18 
    19 
    20 object Session
    21 {
    22   /* events */
    23 
    24   //{{{
    25   case class Statistics(props: Properties.T)
    26   case class Global_Options(options: Options)
    27   case object Caret_Focus
    28   case class Raw_Edits(edits: List[Document.Edit_Text])
    29   case class Dialog_Result(id: Document.ID, serial: Long, result: String)
    30   case class Commands_Changed(
    31     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    32 
    33   sealed abstract class Phase
    34   case object Inactive extends Phase
    35   case object Startup extends Phase  // transient
    36   case object Failed extends Phase
    37   case object Ready extends Phase
    38   case object Shutdown extends Phase  // transient
    39   //}}}
    40 }
    41 
    42 
    43 class Session(val thy_load: Thy_Load)
    44 {
    45   /* global flags */
    46 
    47   @volatile var timing: Boolean = false
    48   @volatile var verbose: Boolean = false
    49 
    50 
    51   /* tuning parameters */
    52 
    53   def output_delay: Time = Time.seconds(0.1)  // prover output (markup, common messages)
    54   def message_delay: Time = Time.seconds(0.01)  // incoming prover messages
    55   def prune_delay: Time = Time.seconds(60.0)  // prune history -- delete old versions
    56   def prune_size: Int = 0  // size of retained history
    57   def syslog_limit: Int = 100
    58   def reparse_limit: Int = 0
    59 
    60 
    61   /* pervasive event buses */
    62 
    63   val statistics = new Event_Bus[Session.Statistics]
    64   val global_options = new Event_Bus[Session.Global_Options]
    65   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
    66   val raw_edits = new Event_Bus[Session.Raw_Edits]
    67   val commands_changed = new Event_Bus[Session.Commands_Changed]
    68   val phase_changed = new Event_Bus[Session.Phase]
    69   val syslog_messages = new Event_Bus[Isabelle_Process.Output]
    70   val raw_output_messages = new Event_Bus[Isabelle_Process.Output]
    71   val all_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
    72 
    73 
    74 
    75   /** buffered command changes (delay_first discipline) **/
    76 
    77   //{{{
    78   private case object Stop
    79 
    80   private val (_, commands_changed_buffer) =
    81     Simple_Thread.actor("commands_changed_buffer", daemon = true)
    82   {
    83     var finished = false
    84     while (!finished) {
    85       receive {
    86         case Stop => finished = true; reply(())
    87         case changed: Session.Commands_Changed => commands_changed.event(changed)
    88         case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
    89       }
    90     }
    91   }
    92   //}}}
    93 
    94 
    95   /** pipelined change parsing **/
    96 
    97   //{{{
    98   private case class Text_Edits(
    99     previous: Future[Document.Version],
   100     text_edits: List[Document.Edit_Text],
   101     version_result: Promise[Document.Version])
   102 
   103   private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
   104   {
   105     var finished = false
   106     while (!finished) {
   107       receive {
   108         case Stop => finished = true; reply(())
   109 
   110         case Text_Edits(previous, text_edits, version_result) =>
   111           val prev = previous.get_finished
   112           val (doc_edits, version) =
   113             Timing.timeit("Thy_Load.text_edits", timing) {
   114               thy_load.text_edits(reparse_limit, prev, text_edits)
   115             }
   116           version_result.fulfill(version)
   117           sender ! Change(doc_edits, prev, version)
   118 
   119         case bad => System.err.println("change_parser: ignoring bad message " + bad)
   120       }
   121     }
   122   }
   123   //}}}
   124 
   125 
   126 
   127   /** main protocol actor **/
   128 
   129   /* global state */
   130 
   131   private val syslog = Volatile(Queue.empty[XML.Elem])
   132   def current_syslog(): String = cat_lines(syslog().iterator.map(XML.content))
   133 
   134   @volatile private var _phase: Session.Phase = Session.Inactive
   135   private def phase_=(new_phase: Session.Phase)
   136   {
   137     _phase = new_phase
   138     phase_changed.event(new_phase)
   139   }
   140   def phase = _phase
   141   def is_ready: Boolean = phase == Session.Ready
   142 
   143   private val global_state = Volatile(Document.State.init)
   144   def current_state(): Document.State = global_state()
   145 
   146   def recent_syntax(): Outer_Syntax =
   147   {
   148     val version = current_state().recent_finished.version.get_finished
   149     if (version.is_init) thy_load.base_syntax
   150     else version.syntax
   151   }
   152 
   153   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   154       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   155     global_state().snapshot(name, pending_edits)
   156 
   157 
   158   /* theory files */
   159 
   160   def header_edit(name: Document.Node.Name, header: Document.Node.Header): Document.Edit_Text =
   161   {
   162     val header1 =
   163       if (thy_load.loaded_theories(name.theory))
   164         header.error("Attempt to update loaded theory " + quote(name.theory))
   165       else header
   166     (name, Document.Node.Deps(header1))
   167   }
   168 
   169 
   170   /* actor messages */
   171 
   172   private case class Start(args: List[String])
   173   private case object Cancel_Execution
   174   private case class Change(
   175     doc_edits: List[Document.Edit_Command],
   176     previous: Document.Version,
   177     version: Document.Version)
   178   private case class Messages(msgs: List[Isabelle_Process.Message])
   179   private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String)
   180 
   181   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   182   {
   183     val this_actor = self
   184 
   185     var prune_next = System.currentTimeMillis() + prune_delay.ms
   186 
   187     var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
   188 
   189 
   190     /* buffered prover messages */
   191 
   192     object receiver
   193     {
   194       private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   195 
   196       def flush(): Unit = synchronized {
   197         if (!buffer.isEmpty) {
   198           val msgs = buffer.toList
   199           this_actor ! Messages(msgs)
   200           buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   201         }
   202       }
   203       def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
   204         buffer += msg
   205         msg match {
   206           case output: Isabelle_Process.Output =>
   207             if (output.is_syslog)
   208               syslog >> (queue =>
   209                 {
   210                   val queue1 = queue.enqueue(output.message)
   211                   if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   212                 })
   213             if (output.is_protocol) flush()
   214           case _ =>
   215         }
   216       }
   217 
   218       private val timer = new Timer("session_actor.receiver", true)
   219       timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   220 
   221       def cancel() { timer.cancel() }
   222     }
   223 
   224     var prover: Option[Isabelle_Process with Protocol] = None
   225 
   226 
   227     /* delayed command changes */
   228 
   229     object delay_commands_changed
   230     {
   231       private var changed_assignment: Boolean = false
   232       private var changed_nodes: Set[Document.Node.Name] = Set.empty
   233       private var changed_commands: Set[Command] = Set.empty
   234 
   235       private var flush_time: Option[Long] = None
   236 
   237       def flush_timeout: Long =
   238         flush_time match {
   239           case None => 5000L
   240           case Some(time) => (time - System.currentTimeMillis()) max 0
   241         }
   242 
   243       def flush()
   244       {
   245         if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
   246           commands_changed_buffer !
   247             Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
   248         changed_assignment = false
   249         changed_nodes = Set.empty
   250         changed_commands = Set.empty
   251         flush_time = None
   252       }
   253 
   254       def invoke(assign: Boolean, commands: List[Command])
   255       {
   256         changed_assignment |= assign
   257         for (command <- commands) {
   258           changed_nodes += command.node_name
   259           changed_commands += command
   260         }
   261         val now = System.currentTimeMillis()
   262         flush_time match {
   263           case None => flush_time = Some(now + output_delay.ms)
   264           case Some(time) => if (now >= time) flush()
   265         }
   266       }
   267     }
   268 
   269 
   270     /* resulting changes */
   271 
   272     def handle_change(change: Change)
   273     //{{{
   274     {
   275       val previous = change.previous
   276       val version = change.version
   277       val doc_edits = change.doc_edits
   278 
   279       def id_command(command: Command)
   280       {
   281         if (!global_state().defined_command(command.id)) {
   282           global_state >> (_.define_command(command))
   283           prover.get.define_command(command)
   284         }
   285       }
   286       doc_edits foreach {
   287         case (_, edit) =>
   288           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   289       }
   290 
   291       val assignment = global_state().the_assignment(previous).check_finished
   292       global_state >> (_.define_version(version, assignment))
   293       prover.get.update(previous.id, version.id, doc_edits)
   294     }
   295     //}}}
   296 
   297 
   298     /* prover output */
   299 
   300     def handle_output(output: Isabelle_Process.Output)
   301     //{{{
   302     {
   303       def bad_output(output: Isabelle_Process.Output)
   304       {
   305         if (verbose)
   306           System.err.println("Ignoring prover output: " + output.message.toString)
   307       }
   308 
   309       output.properties match {
   310 
   311         case Position.Id(state_id) if !output.is_protocol =>
   312           try {
   313             val st = global_state >>> (_.accumulate(state_id, output.message))
   314             delay_commands_changed.invoke(false, List(st.command))
   315           }
   316           catch {
   317             case _: Document.State.Fail => bad_output(output)
   318           }
   319 
   320         case Markup.Assign_Execs if output.is_protocol =>
   321           XML.content(output.body) match {
   322             case Protocol.Assign(id, assign) =>
   323               try {
   324                 val cmds = global_state >>> (_.assign(id, assign))
   325                 delay_commands_changed.invoke(true, cmds)
   326               }
   327               catch { case _: Document.State.Fail => bad_output(output) }
   328             case _ => bad_output(output)
   329           }
   330           // FIXME separate timeout event/message!?
   331           if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   332             val old_versions = global_state >>> (_.prune_history(prune_size))
   333             if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   334             prune_next = System.currentTimeMillis() + prune_delay.ms
   335           }
   336 
   337         case Markup.Removed_Versions if output.is_protocol =>
   338           XML.content(output.body) match {
   339             case Protocol.Removed(removed) =>
   340               try {
   341                 global_state >> (_.removed_versions(removed))
   342               }
   343               catch { case _: Document.State.Fail => bad_output(output) }
   344             case _ => bad_output(output)
   345           }
   346 
   347         case Markup.Invoke_Scala(name, id) if output.is_protocol =>
   348           futures += (id ->
   349             default_thread_pool.submit(() =>
   350               {
   351                 val arg = XML.content(output.body)
   352                 val (tag, result) = Invoke_Scala.method(name, arg)
   353                 this_actor ! Finished_Scala(id, tag, result)
   354               }))
   355 
   356         case Markup.Cancel_Scala(id) if output.is_protocol =>
   357           futures.get(id) match {
   358             case Some(future) =>
   359               future.cancel(true)
   360               this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "")
   361             case None =>
   362           }
   363 
   364         case Markup.ML_Statistics(props) if output.is_protocol =>
   365           statistics.event(Session.Statistics(props))
   366 
   367         case Markup.Task_Statistics(props) if output.is_protocol =>
   368           // FIXME
   369 
   370         case _ if output.is_init =>
   371           phase = Session.Ready
   372 
   373         case Markup.Return_Code(rc) if output.is_exit =>
   374           if (rc == 0) phase = Session.Inactive
   375           else phase = Session.Failed
   376 
   377         case _ => bad_output(output)
   378       }
   379     }
   380     //}}}
   381 
   382 
   383     /* main loop */
   384 
   385     //{{{
   386     var finished = false
   387     while (!finished) {
   388       receiveWithin(delay_commands_changed.flush_timeout) {
   389         case TIMEOUT => delay_commands_changed.flush()
   390 
   391         case Start(args) if prover.isEmpty =>
   392           if (phase == Session.Inactive || phase == Session.Failed) {
   393             phase = Session.Startup
   394             prover = Some(new Isabelle_Process(receiver.invoke _, args) with Protocol)
   395           }
   396 
   397         case Stop =>
   398           if (phase == Session.Ready) {
   399             global_state >> (_ => Document.State.init)  // FIXME event bus!?
   400             phase = Session.Shutdown
   401             prover.get.terminate
   402             prover = None
   403             phase = Session.Inactive
   404           }
   405           finished = true
   406           receiver.cancel()
   407           reply(())
   408 
   409         case Session.Global_Options(options) if prover.isDefined =>
   410           if (is_ready) prover.get.options(options)
   411 
   412         case Cancel_Execution if prover.isDefined =>
   413           prover.get.cancel_execution()
   414 
   415         case raw @ Session.Raw_Edits(edits) if prover.isDefined =>
   416           prover.get.discontinue_execution()
   417 
   418           val previous = global_state().history.tip.version
   419           val version = Future.promise[Document.Version]
   420           val change = global_state >>> (_.continue_history(previous, edits, version))
   421           raw_edits.event(raw)
   422           change_parser ! Text_Edits(previous, edits, version)
   423 
   424           reply(())
   425 
   426         case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
   427           prover.get.dialog_result(serial, result)
   428           handle_output(new Isabelle_Process.Output(Protocol.Dialog_Result(id, serial, result)))
   429 
   430         case Messages(msgs) =>
   431           msgs foreach {
   432             case input: Isabelle_Process.Input =>
   433               all_messages.event(input)
   434 
   435             case output: Isabelle_Process.Output =>
   436               if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   437               else handle_output(output)
   438               if (output.is_syslog) syslog_messages.event(output)
   439               all_messages.event(output)
   440           }
   441 
   442         case change: Change
   443         if prover.isDefined && global_state().is_assigned(change.previous) =>
   444           handle_change(change)
   445 
   446         case Finished_Scala(id, tag, result) if prover.isDefined =>
   447           if (futures.isDefinedAt(id)) {
   448             prover.get.invoke_scala(id, tag, result)
   449             futures -= id
   450           }
   451 
   452         case bad if !bad.isInstanceOf[Change] =>
   453           System.err.println("session_actor: ignoring bad message " + bad)
   454       }
   455     }
   456     //}}}
   457   }
   458 
   459 
   460   /* actions */
   461 
   462   def start(args: List[String])
   463   {
   464     global_options += session_actor
   465     session_actor ! Start(args)
   466   }
   467 
   468   def stop()
   469   {
   470     global_options -= session_actor
   471     commands_changed_buffer !? Stop
   472     change_parser !? Stop
   473     session_actor !? Stop
   474   }
   475 
   476   def cancel_execution() { session_actor ! Cancel_Execution }
   477 
   478   def update(edits: List[Document.Edit_Text])
   479   { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(edits) }
   480 
   481   def dialog_result(id: Document.ID, serial: Long, result: String)
   482   { session_actor ! Session.Dialog_Result(id, serial, result) }
   483 }