buffer prover messages to prevent overloading of session_actor input channel -- which is critical due to synchronous messages wrt. GUI thread;
authorwenzelm
Tue Sep 06 11:18:19 2011 +0200 (2011-09-06 ago)
changeset 44733329320fc88df
parent 44732 c58b69d888ac
child 44734 7313e2db3d39
buffer prover messages to prevent overloading of session_actor input channel -- which is critical due to synchronous messages wrt. GUI thread;
src/Pure/System/isabelle_process.scala
src/Pure/System/session.scala
     1.1 --- a/src/Pure/System/isabelle_process.scala	Tue Sep 06 10:27:04 2011 +0200
     1.2 +++ b/src/Pure/System/isabelle_process.scala	Tue Sep 06 11:18:19 2011 +0200
     1.3 @@ -33,7 +33,7 @@
     1.4        ('H' : Int) -> Markup.RAW)
     1.5    }
     1.6  
     1.7 -  abstract class Message
     1.8 +  sealed abstract class Message
     1.9  
    1.10    class Input(name: String, args: List[String]) extends Message
    1.11    {
     2.1 --- a/src/Pure/System/session.scala	Tue Sep 06 10:27:04 2011 +0200
     2.2 +++ b/src/Pure/System/session.scala	Tue Sep 06 11:18:19 2011 +0200
     2.3 @@ -7,8 +7,11 @@
     2.4  
     2.5  package isabelle
     2.6  
     2.7 +
     2.8  import java.lang.System
     2.9 +import java.util.{Timer, TimerTask}
    2.10  
    2.11 +import scala.collection.mutable
    2.12  import scala.actors.TIMEOUT
    2.13  import scala.actors.Actor._
    2.14  
    2.15 @@ -37,6 +40,7 @@
    2.16  {
    2.17    /* real time parameters */  // FIXME properties or settings (!?)
    2.18  
    2.19 +  val message_delay = Time.seconds(0.01)  // prover messages
    2.20    val input_delay = Time.seconds(0.3)  // user input (e.g. text edits, cursor movement)
    2.21    val output_delay = Time.seconds(0.1)  // prover output (markup, common messages)
    2.22    val update_delay = Time.seconds(0.5)  // GUI layout updates
    2.23 @@ -52,7 +56,7 @@
    2.24    val assignments = new Event_Bus[Session.Assignment.type]
    2.25    val commands_changed = new Event_Bus[Session.Commands_Changed]
    2.26    val phase_changed = new Event_Bus[Session.Phase]
    2.27 -  val raw_messages = new Event_Bus[Isabelle_Process.Message]
    2.28 +  val raw_messages = new Event_Bus[Isabelle_Process.Message]  // potential bottle-neck
    2.29  
    2.30  
    2.31  
    2.32 @@ -141,16 +145,45 @@
    2.33      doc_edits: List[Document.Edit_Command],
    2.34      previous: Document.Version,
    2.35      version: Document.Version)
    2.36 +  private case class Messages(msgs: List[Isabelle_Process.Message])
    2.37  
    2.38    private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
    2.39    {
    2.40      val this_actor = self
    2.41 -    def receiver(msg: Isabelle_Process.Message) { this_actor ! msg }
    2.42 -    var prover: Option[Isabelle_Process with Isar_Document] = None
    2.43  
    2.44      var prune_next = System.currentTimeMillis() + prune_delay.ms
    2.45  
    2.46  
    2.47 +    /* buffered prover messages */
    2.48 +
    2.49 +    object receiver
    2.50 +    {
    2.51 +      private var buffer = new mutable.ListBuffer[Isabelle_Process.Message]
    2.52 +
    2.53 +      def flush(): Unit = synchronized {
    2.54 +        if (!buffer.isEmpty) {
    2.55 +          val msgs = buffer.toList
    2.56 +          this_actor ! Messages(msgs)
    2.57 +          buffer = new mutable.ListBuffer[Isabelle_Process.Message]
    2.58 +        }
    2.59 +      }
    2.60 +      def invoke(msg: Isabelle_Process.Message): Unit = synchronized {
    2.61 +        buffer += msg
    2.62 +        msg match {
    2.63 +          case result: Isabelle_Process.Result if result.is_raw => flush()
    2.64 +          case _ =>
    2.65 +        }
    2.66 +      }
    2.67 +
    2.68 +      private val timer = new Timer("session_actor.receiver", true)
    2.69 +      timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
    2.70 +
    2.71 +      def cancel() { timer.cancel() }
    2.72 +    }
    2.73 +
    2.74 +    var prover: Option[Isabelle_Process with Isar_Document] = None
    2.75 +
    2.76 +
    2.77      /* delayed command changes */
    2.78  
    2.79      object commands_changed_delay
    2.80 @@ -372,7 +405,8 @@
    2.81          case Start(timeout, args) if prover.isEmpty =>
    2.82            if (phase == Session.Inactive || phase == Session.Failed) {
    2.83              phase = Session.Startup
    2.84 -            prover = Some(new Isabelle_Process(timeout, receiver _, args:_*) with Isar_Document)
    2.85 +            prover =
    2.86 +              Some(new Isabelle_Process(timeout, receiver.invoke _, args:_*) with Isar_Document)
    2.87            }
    2.88  
    2.89          case Stop =>
    2.90 @@ -384,6 +418,7 @@
    2.91              phase = Session.Inactive
    2.92            }
    2.93            finished = true
    2.94 +          receiver.cancel()
    2.95            reply(())
    2.96  
    2.97          case Interrupt if prover.isDefined =>
    2.98 @@ -409,12 +444,15 @@
    2.99          case change: Change_Node if prover.isDefined =>
   2.100            handle_change(change)
   2.101  
   2.102 -        case input: Isabelle_Process.Input =>
   2.103 -          raw_messages.event(input)
   2.104 +        case Messages(msgs) =>
   2.105 +          msgs foreach {
   2.106 +            case input: Isabelle_Process.Input =>
   2.107 +              raw_messages.event(input)
   2.108  
   2.109 -        case result: Isabelle_Process.Result =>
   2.110 -          handle_result(result)
   2.111 -          raw_messages.event(result)
   2.112 +            case result: Isabelle_Process.Result =>
   2.113 +              handle_result(result)
   2.114 +              raw_messages.event(result)
   2.115 +          }
   2.116  
   2.117          case bad => System.err.println("session_actor: ignoring bad message " + bad)
   2.118        }