src/Pure/System/session.scala
author wenzelm
Thu Apr 05 14:14:51 2012 +0200 (2012-04-05)
changeset 47343 b8aeab386414
parent 47027 fc3bb6c02a3c
child 47346 cd3ab7625519
permissions -rw-r--r--
less aggressive discontinue_execution before document update, to avoid unstable execs that need to be re-assigned;
     1 /*  Title:      Pure/System/session.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Main Isabelle/Scala session, potentially with running prover process.
     6 */
     7 
     8 package isabelle
     9 
    10 
    11 import java.lang.System
    12 import java.util.{Timer, TimerTask}
    13 
    14 import scala.collection.mutable
    15 import scala.collection.immutable.Queue
    16 import scala.actors.TIMEOUT
    17 import scala.actors.Actor._
    18 
    19 
    20 object Session
    21 {
    22   /* events */
    23 
    24   //{{{
    25   case object Global_Settings
    26   case object Caret_Focus
    27   case class Commands_Changed(
    28     assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command])
    29 
    30   sealed abstract class Phase
    31   case object Inactive extends Phase
    32   case object Startup extends Phase  // transient
    33   case object Failed extends Phase
    34   case object Ready extends Phase
    35   case object Shutdown extends Phase  // transient
    36   //}}}
    37 }
    38 
    39 
    40 class Session(thy_load: Thy_Load = new Thy_Load)
    41 {
    42   /* tuning parameters */  // FIXME properties or settings (!?)
    43 
    44   val message_delay = Time.seconds(0.01)  // prover messages
    45   val input_delay = Time.seconds(0.3)  // user input (e.g. text edits, cursor movement)
    46   val output_delay = Time.seconds(0.1)  // prover output (markup, common messages)
    47   val update_delay = Time.seconds(0.5)  // GUI layout updates
    48   val load_delay = Time.seconds(0.5)  // file load operations (new buffers etc.)
    49   val prune_delay = Time.seconds(60.0)  // prune history -- delete old versions
    50   val prune_size = 0  // size of retained history
    51   val syslog_limit = 100
    52 
    53 
    54   /* pervasive event buses */
    55 
    56   val global_settings = new Event_Bus[Session.Global_Settings.type]
    57   val caret_focus = new Event_Bus[Session.Caret_Focus.type]
    58   val commands_changed = new Event_Bus[Session.Commands_Changed]
    59   val phase_changed = new Event_Bus[Session.Phase]
    60   val syslog_messages = new Event_Bus[Isabelle_Process.Output]
    61   val raw_output_messages = new Event_Bus[Isabelle_Process.Output]
    62   val all_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
    63 
    64 
    65 
    66   /** buffered command changes (delay_first discipline) **/
    67 
    68   //{{{
    69   private case object Stop
    70 
    71   private val (_, commands_changed_buffer) =
    72     Simple_Thread.actor("commands_changed_buffer", daemon = true)
    73   {
    74     var finished = false
    75     while (!finished) {
    76       receive {
    77         case Stop => finished = true; reply(())
    78         case changed: Session.Commands_Changed => commands_changed.event(changed)
    79         case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
    80       }
    81     }
    82   }
    83   //}}}
    84 
    85 
    86   /** pipelined change parsing **/
    87 
    88   //{{{
    89   private case class Text_Edits(
    90     name: Document.Node.Name,
    91     previous: Future[Document.Version],
    92     text_edits: List[Document.Edit_Text],
    93     version_result: Promise[Document.Version])
    94 
    95   private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
    96   {
    97     var finished = false
    98     while (!finished) {
    99       receive {
   100         case Stop => finished = true; reply(())
   101 
   102         case Text_Edits(name, previous, text_edits, version_result) =>
   103           val prev = previous.get_finished
   104           val (doc_edits, version) = Thy_Syntax.text_edits(prover_syntax, prev, text_edits)
   105           version_result.fulfill(version)
   106           sender ! Change_Node(name, doc_edits, prev, version)
   107 
   108         case bad => System.err.println("change_parser: ignoring bad message " + bad)
   109       }
   110     }
   111   }
   112   //}}}
   113 
   114 
   115   /** main protocol actor **/
   116 
   117   /* global state */
   118 
   119   @volatile var verbose: Boolean = false
   120 
   121   @volatile private var prover_syntax =
   122     Outer_Syntax.init() +
   123       // FIXME avoid hardwired stuff!?
   124       ("hence", Keyword.PRF_ASM_GOAL, "then have") +
   125       ("thus", Keyword.PRF_ASM_GOAL, "then show")
   126 
   127   private val syslog = Volatile(Queue.empty[XML.Elem])
   128   def current_syslog(): String = cat_lines(syslog().iterator.map(msg => XML.content(msg).mkString))
   129 
   130   @volatile private var _phase: Session.Phase = Session.Inactive
   131   private def phase_=(new_phase: Session.Phase)
   132   {
   133     _phase = new_phase
   134     phase_changed.event(new_phase)
   135   }
   136   def phase = _phase
   137   def is_ready: Boolean = phase == Session.Ready
   138 
   139   private val global_state = Volatile(Document.State.init)
   140   def current_state(): Document.State = global_state()
   141 
   142   def recent_syntax(): Outer_Syntax =
   143   {
   144     val version = current_state().recent_finished.version.get_finished
   145     if (version.is_init) prover_syntax
   146     else version.syntax
   147   }
   148 
   149   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
   150       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
   151     global_state().snapshot(name, pending_edits)
   152 
   153 
   154   /* theory files */
   155 
   156   def header_edit(name: Document.Node.Name, header: Document.Node_Header): Document.Edit_Text =
   157   {
   158     val header1: Document.Node_Header =
   159       header match {
   160         case Exn.Res(_) if (thy_load.is_loaded(name.theory)) =>
   161           Exn.Exn(ERROR("Attempt to update loaded theory " + quote(name.theory)))
   162         case _ => header
   163       }
   164     (name, Document.Node.Header(header1))
   165   }
   166 
   167 
   168   /* actor messages */
   169 
   170   private case class Start(timeout: Time, args: List[String])
   171   private case object Cancel_Execution
   172   private case class Init_Node(name: Document.Node.Name,
   173     header: Document.Node_Header, perspective: Text.Perspective, text: String)
   174   private case class Edit_Node(name: Document.Node.Name,
   175     header: Document.Node_Header, perspective: Text.Perspective, edits: List[Text.Edit])
   176   private case class Change_Node(
   177     name: Document.Node.Name,
   178     doc_edits: List[Document.Edit_Command],
   179     previous: Document.Version,
   180     version: Document.Version)
   181   private case class Messages(msgs: List[Isabelle_Process.Message])
   182 
   183   private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
   184   {
   185     val this_actor = self
   186 
   187     var prune_next = System.currentTimeMillis() + prune_delay.ms
   188 
   189 
   190     /* buffered prover messages */
   191 
   192     object receiver
   193     {
   194       private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   195 
   196       def flush(): Unit = synchronized {
   197         if (!buffer.isEmpty) {
   198           val msgs = buffer.toList
   199           this_actor ! Messages(msgs)
   200           buffer = new mutable.ListBuffer[Isabelle_Process.Message]
   201         }
   202       }
   203       def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
   204         buffer += msg
   205         msg match {
   206           case output: Isabelle_Process.Output =>
   207             if (output.is_syslog)
   208               syslog >> (queue =>
   209                 {
   210                   val queue1 = queue.enqueue(output.message)
   211                   if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
   212                 })
   213             if (output.is_protocol) flush()
   214           case _ =>
   215         }
   216       }
   217 
   218       private val timer = new Timer("session_actor.receiver", true)
   219       timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
   220 
   221       def cancel() { timer.cancel() }
   222     }
   223 
   224     var prover: Option[Isabelle_Process with Protocol] = None
   225 
   226 
   227     /* delayed command changes */
   228 
   229     object delay_commands_changed
   230     {
   231       private var changed_assignment: Boolean = false
   232       private var changed_nodes: Set[Document.Node.Name] = Set.empty
   233       private var changed_commands: Set[Command] = Set.empty
   234 
   235       private var flush_time: Option[Long] = None
   236 
   237       def flush_timeout: Long =
   238         flush_time match {
   239           case None => 5000L
   240           case Some(time) => (time - System.currentTimeMillis()) max 0
   241         }
   242 
   243       def flush()
   244       {
   245         if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
   246           commands_changed_buffer !
   247             Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
   248         changed_assignment = false
   249         changed_nodes = Set.empty
   250         changed_commands = Set.empty
   251         flush_time = None
   252       }
   253 
   254       def invoke(assign: Boolean, commands: List[Command])
   255       {
   256         changed_assignment |= assign
   257         for (command <- commands) {
   258           changed_nodes += command.node_name
   259           changed_commands += command
   260         }
   261         val now = System.currentTimeMillis()
   262         flush_time match {
   263           case None => flush_time = Some(now + output_delay.ms)
   264           case Some(time) => if (now >= time) flush()
   265         }
   266       }
   267     }
   268 
   269 
   270     /* perspective */
   271 
   272     def update_perspective(name: Document.Node.Name, text_perspective: Text.Perspective)
   273     {
   274       val previous = global_state().tip_version
   275       val (perspective, version) = Thy_Syntax.edit_perspective(previous, name, text_perspective)
   276 
   277       val text_edits: List[Document.Edit_Text] =
   278         List((name, Document.Node.Perspective(text_perspective)))
   279       val change =
   280         global_state >>>
   281           (_.continue_history(Future.value(previous), text_edits, Future.value(version)))
   282 
   283       val assignment = global_state().the_assignment(previous).check_finished
   284       global_state >> (_.define_version(version, assignment))
   285       global_state >>> (_.assign(version.id))
   286 
   287       prover.get.update_perspective(previous.id, version.id, name, perspective)
   288     }
   289 
   290 
   291     /* incoming edits */
   292 
   293     def handle_edits(name: Document.Node.Name,
   294         header: Document.Node_Header, edits: List[Document.Node.Edit[Text.Edit, Text.Perspective]])
   295     //{{{
   296     {
   297       val previous = global_state().history.tip.version
   298 
   299       prover.get.discontinue_execution()
   300 
   301       val text_edits = header_edit(name, header) :: edits.map(edit => (name, edit))
   302       val version = Future.promise[Document.Version]
   303       val change = global_state >>> (_.continue_history(previous, text_edits, version))
   304 
   305       change_parser ! Text_Edits(name, previous, text_edits, version)
   306     }
   307     //}}}
   308 
   309 
   310     /* exec state assignment */
   311 
   312     def handle_assign(id: Document.Version_ID, assign: Document.Assign)
   313     {
   314       val cmds = global_state >>> (_.assign(id, assign))
   315       delay_commands_changed.invoke(true, cmds)
   316     }
   317 
   318 
   319     /* removed versions */
   320 
   321     def handle_removed(removed: List[Document.Version_ID]): Unit =
   322       global_state >> (_.removed_versions(removed))
   323 
   324 
   325     /* resulting changes */
   326 
   327     def handle_change(change: Change_Node)
   328     //{{{
   329     {
   330       val previous = change.previous
   331       val version = change.version
   332       val name = change.name
   333       val doc_edits = change.doc_edits
   334 
   335       def id_command(command: Command)
   336       {
   337         if (!global_state().defined_command(command.id)) {
   338           global_state >> (_.define_command(command))
   339           prover.get.define_command(command)
   340         }
   341       }
   342       doc_edits foreach {
   343         case (_, edit) =>
   344           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
   345       }
   346 
   347       val assignment = global_state().the_assignment(previous).check_finished
   348       global_state >> (_.define_version(version, assignment))
   349       prover.get.update(previous.id, version.id, doc_edits)
   350     }
   351     //}}}
   352 
   353 
   354     /* prover output */
   355 
   356     def handle_output(output: Isabelle_Process.Output)
   357     //{{{
   358     {
   359       def bad_output(output: Isabelle_Process.Output)
   360       {
   361         if (verbose)
   362           System.err.println("Ignoring prover output: " + output.message.toString)
   363       }
   364 
   365       output.properties match {
   366 
   367         case Position.Id(state_id) if !output.is_protocol =>
   368           try {
   369             val st = global_state >>> (_.accumulate(state_id, output.message))
   370             delay_commands_changed.invoke(false, List(st.command))
   371           }
   372           catch {
   373             case _: Document.State.Fail => bad_output(output)
   374           }
   375 
   376         case Isabelle_Markup.Assign_Execs if output.is_protocol =>
   377           XML.content(output.body).mkString match {
   378             case Protocol.Assign(id, assign) =>
   379               try { handle_assign(id, assign) }
   380               catch { case _: Document.State.Fail => bad_output(output) }
   381             case _ => bad_output(output)
   382           }
   383           // FIXME separate timeout event/message!?
   384           if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   385             val old_versions = global_state >>> (_.prune_history(prune_size))
   386             if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   387             prune_next = System.currentTimeMillis() + prune_delay.ms
   388           }
   389 
   390         case Isabelle_Markup.Removed_Versions if output.is_protocol =>
   391           XML.content(output.body).mkString match {
   392             case Protocol.Removed(removed) =>
   393               try { handle_removed(removed) }
   394               catch { case _: Document.State.Fail => bad_output(output) }
   395             case _ => bad_output(output)
   396           }
   397 
   398         case Isabelle_Markup.Invoke_Scala(name, id) if output.is_protocol =>
   399           Future.fork {
   400             val arg = XML.content(output.body).mkString
   401             val (tag, res) = Invoke_Scala.method(name, arg)
   402             prover.get.invoke_scala(id, tag, res)
   403           }
   404 
   405         case Isabelle_Markup.Cancel_Scala(id) if output.is_protocol =>
   406           System.err.println("cancel_scala " + id)  // FIXME actually cancel JVM task
   407 
   408         case Isabelle_Markup.Ready if output.is_protocol =>
   409             phase = Session.Ready
   410 
   411         case Isabelle_Markup.Loaded_Theory(name) if output.is_protocol =>
   412           thy_load.register_thy(name)
   413 
   414         case Isabelle_Markup.Command_Decl(name, kind) if output.is_protocol =>
   415           prover_syntax += (name, kind)
   416 
   417         case Isabelle_Markup.Keyword_Decl(name) if output.is_protocol =>
   418           prover_syntax += name
   419 
   420         case _ =>
   421           if (output.is_exit && phase == Session.Startup) phase = Session.Failed
   422           else if (output.is_exit) phase = Session.Inactive
   423           else if (output.is_stdout) { }
   424           else bad_output(output)
   425       }
   426     }
   427     //}}}
   428 
   429 
   430     /* main loop */
   431 
   432     //{{{
   433     var finished = false
   434     while (!finished) {
   435       receiveWithin(delay_commands_changed.flush_timeout) {
   436         case TIMEOUT => delay_commands_changed.flush()
   437 
   438         case Start(timeout, args) if prover.isEmpty =>
   439           if (phase == Session.Inactive || phase == Session.Failed) {
   440             phase = Session.Startup
   441             prover = Some(new Isabelle_Process(timeout, receiver.invoke _, args) with Protocol)
   442           }
   443 
   444         case Stop =>
   445           if (phase == Session.Ready) {
   446             global_state >> (_ => Document.State.init)  // FIXME event bus!?
   447             phase = Session.Shutdown
   448             prover.get.terminate
   449             prover = None
   450             phase = Session.Inactive
   451           }
   452           finished = true
   453           receiver.cancel()
   454           reply(())
   455 
   456         case Cancel_Execution if prover.isDefined =>
   457           prover.get.cancel_execution()
   458 
   459         case Init_Node(name, header, perspective, text) if prover.isDefined =>
   460           // FIXME compare with existing node
   461           handle_edits(name, header,
   462             List(Document.Node.Clear(),
   463               Document.Node.Edits(List(Text.Edit.insert(0, text))),
   464               Document.Node.Perspective(perspective)))
   465           reply(())
   466 
   467         case Edit_Node(name, header, perspective, text_edits) if prover.isDefined =>
   468           if (text_edits.isEmpty && global_state().tip_stable &&
   469               perspective.range.stop <= global_state().last_exec_offset(name))
   470             update_perspective(name, perspective)
   471           else
   472             handle_edits(name, header,
   473               List(Document.Node.Edits(text_edits), Document.Node.Perspective(perspective)))
   474           reply(())
   475 
   476         case Messages(msgs) =>
   477           msgs foreach {
   478             case input: Isabelle_Process.Input =>
   479               all_messages.event(input)
   480 
   481             case output: Isabelle_Process.Output =>
   482               handle_output(output)
   483               if (output.is_syslog) syslog_messages.event(output)
   484               if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
   485               all_messages.event(output)
   486           }
   487 
   488         case change: Change_Node
   489         if prover.isDefined && global_state().is_assigned(change.previous) =>
   490           handle_change(change)
   491 
   492         case bad if !bad.isInstanceOf[Change_Node] =>
   493           System.err.println("session_actor: ignoring bad message " + bad)
   494       }
   495     }
   496     //}}}
   497   }
   498 
   499 
   500   /* actions */
   501 
   502   def start(timeout: Time, args: List[String])
   503   { session_actor ! Start(timeout, args) }
   504 
   505   def start(args: List[String]) { start (Time.seconds(25), args) }
   506 
   507   def stop() { commands_changed_buffer !? Stop; change_parser !? Stop; session_actor !? Stop }
   508 
   509   def cancel_execution() { session_actor ! Cancel_Execution }
   510 
   511   def init_node(name: Document.Node.Name,
   512     header: Document.Node_Header, perspective: Text.Perspective, text: String)
   513   { session_actor !? Init_Node(name, header, perspective, text) }
   514 
   515   def edit_node(name: Document.Node.Name,
   516     header: Document.Node_Header, perspective: Text.Perspective, edits: List[Text.Edit])
   517   { session_actor !? Edit_Node(name, header, perspective, edits) }
   518 }