merged
authorwenzelm
Fri, 25 Apr 2014 14:39:11 +0200
changeset 56721 f2ffead641d4
parent 56684 d8f32f55e463 (current diff)
parent 56720 e1317a26f8c0 (diff)
child 56722 ba1ac087b3a7
merged
src/Pure/Concurrent/volatile.scala
src/Pure/System/event_bus.scala
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Concurrent/consumer_thread.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -0,0 +1,84 @@
+/*  Title:      Pure/Concurrent/consumer_thread.scala
+    Module:     PIDE
+    Author:     Makarius
+
+Consumer thread with unbounded queueing of requests, and optional
+acknowledgment.
+*/
+
+package isabelle
+
+
+import scala.annotation.tailrec
+
+
+object Consumer_Thread
+{
+  def fork[A](name: String = "", daemon: Boolean = false)(
+      consume: A => Boolean,
+      finish: () => Unit = () => ()): Consumer_Thread[A] =
+    new Consumer_Thread[A](name, daemon, consume, finish)
+
+
+  /* internal messages */
+
+  private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
+  private type Request[A] = (A, Option[Ack])
+}
+
+final class Consumer_Thread[A] private(
+  name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
+{
+  private var active = true
+  private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
+
+  private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
+  def is_active: Boolean = active && thread.isAlive
+
+  private def failure(exn: Throwable): Unit =
+    System.err.println(
+      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
+
+  private def robust_finish(): Unit =
+    try { finish() } catch { case exn: Throwable => failure(exn) }
+
+  @tailrec private def main_loop(): Unit =
+    mbox.receive match {
+      case Some((arg, ack)) =>
+        val result = Exn.capture { consume(arg) }
+        val continue =
+          result match {
+            case Exn.Res(cont) => cont
+            case Exn.Exn(exn) =>
+              if (!ack.isDefined) failure(exn)
+              true
+          }
+        ack.foreach(a => a.change(_ => Some(result)))
+        if (continue) main_loop() else robust_finish()
+      case None => robust_finish()
+    }
+
+  assert(is_active)
+
+
+  /* main methods */
+
+  private def request(x: A, ack: Option[Consumer_Thread.Ack])
+  {
+    synchronized {
+      if (is_active) mbox.send(Some((x, ack)))
+      else error("Consumer thread not active: " + quote(thread.getName))
+    }
+    ack.foreach(a =>
+      Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
+  }
+
+  def send(arg: A) { request(arg, None) }
+  def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
+
+  def shutdown(): Unit =
+  {
+    synchronized { if (is_active) { active = false; mbox.send(None) } }
+    thread.join
+  }
+}
--- a/src/Pure/Concurrent/counter.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/Concurrent/counter.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -25,5 +25,7 @@
     count -= 1
     count
   }
+
+  override def toString: String = count.toString
 }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Concurrent/mailbox.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -0,0 +1,37 @@
+/*  Title:      Pure/Concurrent/mailbox.scala
+    Module:     PIDE
+    Author:     Makarius
+
+Message exchange via mailbox, with non-blocking send (due to unbounded
+queueing) and potentially blocking receive.
+*/
+
+package isabelle
+
+
+import scala.collection.immutable.Queue
+
+
+object Mailbox
+{
+  def apply[A]: Mailbox[A] = new Mailbox[A]()
+}
+
+
+class Mailbox[A] private()
+{
+  private val mailbox = Synchronized(Queue.empty[A])
+  override def toString: String = mailbox.value.mkString("Mailbox(", ",", ")")
+
+  def send(msg: A): Unit =
+    mailbox.change(_.enqueue(msg))
+
+  def receive: A =
+    mailbox.guarded_access(_.dequeueOption)
+
+  def receive_timeout(timeout: Time): Option[A] =
+    mailbox.timed_access(_ => Some(Time.now() + timeout), _.dequeueOption)
+
+  def await_empty: Unit =
+    mailbox.guarded_access(queue => if (queue.isEmpty) Some(((), queue)) else None)
+}
--- a/src/Pure/Concurrent/simple_thread.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/Concurrent/simple_thread.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -10,8 +10,6 @@
 
 import java.lang.Thread
 
-import scala.actors.Actor
-
 
 object Simple_Thread
 {
@@ -42,15 +40,5 @@
     val thread = fork(name, daemon) { result.fulfill_result(Exn.capture(body)) }
     (thread, result)
   }
-
-
-  /* thread as actor */
-
-  def actor(name: String, daemon: Boolean = false)(body: => Unit): (Thread, Actor) =
-  {
-    val actor = Future.promise[Actor]
-    val thread = fork(name, daemon) { actor.fulfill(Actor.self); body }
-    (thread, actor.join)
-  }
 }
 
--- a/src/Pure/Concurrent/synchronized.ML	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/Concurrent/synchronized.ML	Fri Apr 25 14:39:11 2014 +0200
@@ -1,7 +1,7 @@
 (*  Title:      Pure/Concurrent/synchronized.ML
     Author:     Fabian Immler and Makarius
 
-State variables with synchronized access.
+Synchronized variables.
 *)
 
 signature SYNCHRONIZED =
@@ -18,7 +18,7 @@
 structure Synchronized: SYNCHRONIZED =
 struct
 
-(* state variables *)
+(* state variable *)
 
 abstype 'a var = Var of
  {name: string,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Concurrent/synchronized.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -0,0 +1,79 @@
+/*  Title:      Pure/Concurrent/synchronized.scala
+    Module:     PIDE
+    Author:     Makarius
+
+Synchronized variables.
+*/
+
+package isabelle
+
+
+import scala.annotation.tailrec
+
+
+object Synchronized
+{
+  def apply[A](init: A): Synchronized[A] = new Synchronized(init)
+}
+
+
+final class Synchronized[A] private(init: A)
+{
+  /* state variable */
+
+  private var state: A = init
+
+  def value: A = synchronized { state }
+  override def toString: String = value.toString
+
+
+  /* synchronized access */
+
+  def timed_access[B](time_limit: A => Option[Time], f: A => Option[(B, A)]): Option[B] =
+    synchronized {
+      def check(x: A): Option[B] =
+        f(x) match {
+          case None => None
+          case Some((y, x1)) =>
+            state = x1
+            notifyAll()
+            Some(y)
+        }
+      @tailrec def try_change(): Option[B] =
+      {
+        val x = state
+        check(x) match {
+          case None =>
+            time_limit(x) match {
+              case Some(t) =>
+                val timeout = (t - Time.now()).ms
+                if (timeout > 0L) {
+                  wait(timeout)
+                  check(state)
+                }
+                else None
+              case None =>
+                wait()
+                try_change()
+            }
+          case some => some
+        }
+      }
+      try_change()
+    }
+
+  def guarded_access[B](f: A => Option[(B, A)]): B =
+    timed_access(_ => None, f).get
+
+
+  /* unconditional change */
+
+  def change(f: A => A): Unit = synchronized { state = f(state); notifyAll() }
+
+  def change_result[B](f: A => (B, A)): B = synchronized {
+    val (result, new_state) = f(state)
+    state = new_state
+    notifyAll()
+    result
+  }
+}
--- a/src/Pure/Concurrent/volatile.scala	Fri Apr 25 12:09:15 2014 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,29 +0,0 @@
-/*  Title:      Pure/Concurrent/volatile.scala
-    Module:     PIDE
-    Author:     Makarius
-
-Volatile variables.
-*/
-
-package isabelle
-
-
-object Volatile
-{
-  def apply[A](init: A): Volatile[A] = new Volatile(init)
-}
-
-
-final class Volatile[A] private(init: A)
-{
-  @volatile private var state: A = init
-  def apply(): A = state
-  def >> (f: A => A) { state = f(state) }
-  def >>>[B] (f: A => (B, A)): B =
-  {
-    val (result, new_state) = f(state)
-    state = new_state
-    result
-  }
-}
-
--- a/src/Pure/General/time.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/General/time.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -16,6 +16,7 @@
   def seconds(s: Double): Time = new Time((s * 1000.0).round)
   def ms(m: Long): Time = new Time(m)
   val zero: Time = ms(0)
+  def now(): Time = ms(System.currentTimeMillis())
 
   def print_seconds(s: Double): String =
     String.format(Locale.ROOT, "%.3f", s.asInstanceOf[AnyRef])
@@ -23,12 +24,18 @@
 
 final class Time private(val ms: Long) extends AnyVal
 {
-  def + (t: Time): Time = new Time(ms + t.ms)
-
   def seconds: Double = ms / 1000.0
 
-  def min(t: Time): Time = if (ms < t.ms) this else t
-  def max(t: Time): Time = if (ms > t.ms) this else t
+  def + (t: Time): Time = new Time(ms + t.ms)
+  def - (t: Time): Time = new Time(ms - t.ms)
+
+  def < (t: Time): Boolean = ms < t.ms
+  def <= (t: Time): Boolean = ms <= t.ms
+  def > (t: Time): Boolean = ms > t.ms
+  def >= (t: Time): Boolean = ms >= t.ms
+
+  def min(t: Time): Time = if (this < t) this else t
+  def max(t: Time): Time = if (this > t) this else t
 
   def is_zero: Boolean = ms == 0
   def is_relevant: Boolean = ms >= 1
--- a/src/Pure/General/timing.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/General/timing.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -14,11 +14,11 @@
 
   def timeit[A](message: String, enabled: Boolean = true)(e: => A) =
     if (enabled) {
-      val start = System.currentTimeMillis()
+      val start = Time.now()
       val result = Exn.capture(e)
-      val stop = System.currentTimeMillis()
+      val stop = Time.now()
 
-      val timing = Time.ms(stop - start)
+      val timing = stop - start
       if (timing.is_relevant)
         System.err.println(
           (if (message == null || message.isEmpty) "" else message + ": ") +
--- a/src/Pure/PIDE/document.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/PIDE/document.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -608,12 +608,12 @@
     def tip_version: Version = history.tip.version.get_finished
 
     def continue_history(
-        previous: Future[Version],
-        edits: List[Edit_Text],
-        version: Future[Version]): (Change, State) =
+      previous: Future[Version],
+      edits: List[Edit_Text],
+      version: Future[Version]): State =
     {
       val change = Change.make(previous, edits, version)
-      (change, copy(history = history + change))
+      copy(history = history + change)
     }
 
     def prune_history(retain: Int = 0): (List[Version], State) =
--- a/src/Pure/PIDE/query_operation.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/PIDE/query_operation.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -8,9 +8,6 @@
 package isabelle
 
 
-import scala.actors.Actor._
-
-
 object Query_Operation
 {
   object Status extends Enumeration
@@ -33,12 +30,12 @@
 
   /* implicit state -- owned by Swing thread */
 
-  private var current_location: Option[Command] = None
-  private var current_query: List[String] = Nil
-  private var current_update_pending = false
-  private var current_output: List[XML.Tree] = Nil
-  private var current_status = Query_Operation.Status.FINISHED
-  private var current_exec_id = Document_ID.none
+  @volatile private var current_location: Option[Command] = None
+  @volatile private var current_query: List[String] = Nil
+  @volatile private var current_update_pending = false
+  @volatile private var current_output: List[XML.Tree] = Nil
+  @volatile private var current_status = Query_Operation.Status.FINISHED
+  @volatile private var current_exec_id = Document_ID.none
 
   private def reset_state()
   {
@@ -209,32 +206,27 @@
   }
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case changed: Session.Commands_Changed =>
-          current_location match {
-            case Some(command)
-            if current_update_pending ||
-              (current_status != Query_Operation.Status.FINISHED &&
-                changed.commands.contains(command)) =>
-              Swing_Thread.later { content_update() }
-            case _ =>
-          }
-        case bad =>
-          System.err.println("Query_Operation: ignoring bad message " + bad)
-      }
+  private val main =
+    Session.Consumer[Session.Commands_Changed](getClass.getName) {
+      case changed =>
+        current_location match {
+          case Some(command)
+          if current_update_pending ||
+            (current_status != Query_Operation.Status.FINISHED &&
+              changed.commands.contains(command)) =>
+            Swing_Thread.later { content_update() }
+          case _ =>
+        }
     }
-  }
 
   def activate() {
-    editor.session.commands_changed += main_actor
+    editor.session.commands_changed += main
   }
 
   def deactivate() {
-    editor.session.commands_changed -= main_actor
+    editor.session.commands_changed -= main
     remove_overlay()
     reset_state()
     consume_output(Document.Snapshot.init, Command.Results.empty, Nil)
--- a/src/Pure/PIDE/session.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/PIDE/session.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -12,12 +12,40 @@
 
 import scala.collection.mutable
 import scala.collection.immutable.Queue
-import scala.actors.TIMEOUT
-import scala.actors.Actor._
 
 
 object Session
 {
+  /* outlets */
+
+  object Consumer
+  {
+    def apply[A](name: String)(consume: A => Unit): Consumer[A] =
+      new Consumer[A](name, consume)
+  }
+  final class Consumer[-A] private(val name: String, val consume: A => Unit)
+
+  class Outlet[A](dispatcher: Consumer_Thread[() => Unit])
+  {
+    private val consumers = Synchronized(List.empty[Consumer[A]])
+
+    def += (c: Consumer[A]) { consumers.change(Library.update(c)) }
+    def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) }
+
+    def post(a: A)
+    {
+      for (c <- consumers.value.iterator) {
+        dispatcher.send(() =>
+          try { c.consume(a) }
+          catch {
+            case exn: Throwable =>
+              System.err.println("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn))
+          })
+      }
+    }
+  }
+
+
   /* change */
 
   sealed case class Change(
@@ -136,91 +164,65 @@
   def reparse_limit: Int = 0
 
 
-  /* pervasive event buses */
+  /* outlets */
 
-  val statistics = new Event_Bus[Session.Statistics]
-  val global_options = new Event_Bus[Session.Global_Options]
-  val caret_focus = new Event_Bus[Session.Caret_Focus.type]
-  val raw_edits = new Event_Bus[Session.Raw_Edits]
-  val commands_changed = new Event_Bus[Session.Commands_Changed]
-  val phase_changed = new Event_Bus[Session.Phase]
-  val syslog_messages = new Event_Bus[Prover.Output]
-  val raw_output_messages = new Event_Bus[Prover.Output]
-  val all_messages = new Event_Bus[Prover.Message]  // potential bottle-neck
-  val trace_events = new Event_Bus[Simplifier_Trace.Event.type]
-
-
-  /** buffered command changes (delay_first discipline) **/
+  private val dispatcher =
+    Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true }
 
-  //{{{
-  private case object Stop
+  val statistics = new Session.Outlet[Session.Statistics](dispatcher)
+  val global_options = new Session.Outlet[Session.Global_Options](dispatcher)
+  val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher)
+  val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher)
+  val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher)
+  val phase_changed = new Session.Outlet[Session.Phase](dispatcher)
+  val syslog_messages = new Session.Outlet[Prover.Output](dispatcher)
+  val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher)
+  val all_messages = new Session.Outlet[Prover.Message](dispatcher)  // potential bottle-neck
+  val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher)
 
-  private val (_, commands_changed_buffer) =
-    Simple_Thread.actor("commands_changed_buffer", daemon = true)
-  {
-    var finished = false
-    while (!finished) {
-      receive {
-        case Stop => finished = true; reply(())
-        case changed: Session.Commands_Changed => commands_changed.event(changed)
-        case bad => System.err.println("commands_changed_buffer: ignoring bad message " + bad)
-      }
-    }
-  }
-  //}}}
 
 
   /** pipelined change parsing **/
 
-  //{{{
   private case class Text_Edits(
     previous: Future[Document.Version],
     doc_blobs: Document.Blobs,
     text_edits: List[Document.Edit_Text],
     version_result: Promise[Document.Version])
 
-  private val (_, change_parser) = Simple_Thread.actor("change_parser", daemon = true)
+  private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true)
   {
-    var finished = false
-    while (!finished) {
-      receive {
-        case Stop => finished = true; reply(())
-
-        case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
-          val prev = previous.get_finished
-          val change =
-            Timing.timeit("parse_change", timing) {
-              resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
-            }
-          version_result.fulfill(change.version)
-          sender ! change
-
-        case bad => System.err.println("change_parser: ignoring bad message " + bad)
-      }
-    }
+    case Text_Edits(previous, doc_blobs, text_edits, version_result) =>
+      val prev = previous.get_finished
+      val change =
+        Timing.timeit("parse_change", timing) {
+          resources.parse_change(reparse_limit, prev, doc_blobs, text_edits)
+        }
+      version_result.fulfill(change.version)
+      manager.send(change)
+      true
   }
-  //}}}
 
 
 
-  /** main protocol actor **/
+  /** main protocol manager **/
 
   /* global state */
 
-  private val syslog = Volatile(Queue.empty[XML.Elem])
-  def current_syslog(): String = cat_lines(syslog().iterator.map(XML.content))
+  private val syslog = Synchronized(Queue.empty[XML.Elem])
+  def current_syslog(): String = cat_lines(syslog.value.iterator.map(XML.content))
 
   @volatile private var _phase: Session.Phase = Session.Inactive
   private def phase_=(new_phase: Session.Phase)
   {
     _phase = new_phase
-    phase_changed.event(new_phase)
+    phase_changed.post(new_phase)
   }
   def phase = _phase
   def is_ready: Boolean = phase == Session.Ready
 
-  private val global_state = Volatile(Document.State.init)
-  def current_state(): Document.State = global_state()
+  private val global_state = Synchronized(Document.State.init)
+  def current_state(): Document.State = global_state.value
 
   def recent_syntax(): Prover.Syntax =
   {
@@ -230,7 +232,7 @@
 
   def snapshot(name: Document.Node.Name = Document.Node.Name.empty,
       pending_edits: List[Text.Edit] = Nil): Document.Snapshot =
-    global_state().snapshot(name, pending_edits)
+    global_state.value.snapshot(name, pending_edits)
 
 
   /* protocol handlers */
@@ -253,116 +255,130 @@
   }
 
 
-  /* actor messages */
+  /* internal messages */
 
   private case class Start(name: String, args: List[String])
+  private case object Stop
   private case class Cancel_Exec(exec_id: Document_ID.Exec)
   private case class Protocol_Command(name: String, args: List[String])
   private case class Messages(msgs: List[Prover.Message])
   private case class Update_Options(options: Options)
 
-  private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
+
+  /* buffered changes */
+
+  private object change_buffer
   {
-    val this_actor = self
+    private var assignment: Boolean = false
+    private var nodes: Set[Document.Node.Name] = Set.empty
+    private var commands: Set[Command] = Set.empty
+
+    def flush(): Unit = synchronized {
+      if (assignment || !nodes.isEmpty || !commands.isEmpty)
+        commands_changed.post(Session.Commands_Changed(assignment, nodes, commands))
+      assignment = false
+      nodes = Set.empty
+      commands = Set.empty
+    }
 
-    var prune_next = System.currentTimeMillis() + prune_delay.ms
+    def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized {
+      assignment |= assign
+      for (command <- cmds) {
+        nodes += command.node_name
+        commands += command
+      }
+    }
+
+    private val timer = new Timer("change_buffer", true)
+    timer.schedule(new TimerTask { def run = flush() }, output_delay.ms, output_delay.ms)
+
+    def shutdown()
+    {
+      timer.cancel()
+      flush()
+    }
+  }
 
 
-    /* buffered prover messages */
+  /* buffered prover messages */
+
+  private object receiver
+  {
+    private var buffer = new mutable.ListBuffer[Prover.Message]
 
-    object receiver
-    {
-      private var buffer = new mutable.ListBuffer[Prover.Message]
+    private def flush(): Unit = synchronized {
+      if (!buffer.isEmpty) {
+        val msgs = buffer.toList
+        manager.send(Messages(msgs))
+        buffer = new mutable.ListBuffer[Prover.Message]
+      }
+    }
 
-      private def flush(): Unit = synchronized {
-        if (!buffer.isEmpty) {
-          val msgs = buffer.toList
-          this_actor ! Messages(msgs)
-          buffer = new mutable.ListBuffer[Prover.Message]
-        }
+    def invoke(msg: Prover.Message): Unit = synchronized {
+      msg match {
+        case _: Prover.Input =>
+          buffer += msg
+        case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
+          flush()
+        case output: Prover.Output =>
+          buffer += msg
+          if (output.is_syslog)
+            syslog.change(queue =>
+              {
+                val queue1 = queue.enqueue(output.message)
+                if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
+              })
       }
-      def invoke(msg: Prover.Message): Unit = synchronized {
-        msg match {
-          case _: Prover.Input =>
-            buffer += msg
-          case output: Prover.Protocol_Output if output.properties == Markup.Flush =>
-            flush()
-          case output: Prover.Output =>
-            buffer += msg
-            if (output.is_syslog)
-              syslog >> (queue =>
-                {
-                  val queue1 = queue.enqueue(output.message)
-                  if (queue1.length > syslog_limit) queue1.dequeue._2 else queue1
-                })
-        }
-      }
+    }
+
+    private val timer = new Timer("receiver", true)
+    timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
+
+    def shutdown() { timer.cancel(); flush() }
+  }
+
+
+  /* postponed changes */
+
+  private object postponed_changes
+  {
+    private var postponed: List[Session.Change] = Nil
 
-      private val timer = new Timer("session_actor.receiver", true)
-      timer.schedule(new TimerTask { def run = flush }, message_delay.ms, message_delay.ms)
+    def store(change: Session.Change): Unit = synchronized { postponed ::= change }
 
-      def cancel() { timer.cancel() }
+    def flush(): Unit = synchronized {
+      val state = global_state.value
+      val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous))
+      postponed = unassigned
+      assigned.reverseIterator.foreach(change => manager.send(change))
     }
+  }
+
+
+  /* manager thread */
+
+  private val manager: Consumer_Thread[Any] =
+  {
+    var prune_next = Time.now() + prune_delay
 
     var prover: Option[Prover] = None
 
 
-    /* delayed command changes */
-
-    object delay_commands_changed
-    {
-      private var changed_assignment: Boolean = false
-      private var changed_nodes: Set[Document.Node.Name] = Set.empty
-      private var changed_commands: Set[Command] = Set.empty
-
-      private var flush_time: Option[Long] = None
-
-      def flush_timeout: Long =
-        flush_time match {
-          case None => 5000L
-          case Some(time) => (time - System.currentTimeMillis()) max 0
-        }
-
-      def flush()
-      {
-        if (changed_assignment || !changed_nodes.isEmpty || !changed_commands.isEmpty)
-          commands_changed_buffer !
-            Session.Commands_Changed(changed_assignment, changed_nodes, changed_commands)
-        changed_assignment = false
-        changed_nodes = Set.empty
-        changed_commands = Set.empty
-        flush_time = None
-      }
-
-      def invoke(assign: Boolean, commands: List[Command])
-      {
-        changed_assignment |= assign
-        for (command <- commands) {
-          changed_nodes += command.node_name
-          changed_commands += command
-        }
-        val now = System.currentTimeMillis()
-        flush_time match {
-          case None => flush_time = Some(now + output_delay.ms)
-          case Some(time) => if (now >= time) flush()
-        }
-      }
-    }
-
-
     /* raw edits */
 
     def handle_raw_edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
     //{{{
     {
+      require(prover.isDefined)
+
       prover.get.discontinue_execution()
 
-      val previous = global_state().history.tip.version
+      val previous = global_state.value.history.tip.version
       val version = Future.promise[Document.Version]
-      val change = global_state >>> (_.continue_history(previous, edits, version))
+      global_state.change(_.continue_history(previous, edits, version))
 
-      raw_edits.event(Session.Raw_Edits(doc_blobs, edits))
-      change_parser ! Text_Edits(previous, doc_blobs, edits, version)
+      raw_edits.post(Session.Raw_Edits(doc_blobs, edits))
+      change_parser.send(Text_Edits(previous, doc_blobs, edits, version))
     }
     //}}}
 
@@ -372,23 +388,25 @@
     def handle_change(change: Session.Change)
     //{{{
     {
+      require(prover.isDefined)
+
       def id_command(command: Command)
       {
         for {
           digest <- command.blobs_digests
-          if !global_state().defined_blob(digest)
+          if !global_state.value.defined_blob(digest)
         } {
           change.doc_blobs.get(digest) match {
             case Some(blob) =>
-              global_state >> (_.define_blob(digest))
+              global_state.change(_.define_blob(digest))
               prover.get.define_blob(digest, blob.bytes)
             case None =>
               System.err.println("Missing blob for SHA1 digest " + digest)
           }
         }
 
-        if (!global_state().defined_command(command.id)) {
-          global_state >> (_.define_command(command))
+        if (!global_state.value.defined_command(command.id)) {
+          global_state.change(_.define_command(command))
           prover.get.define_command(command)
         }
       }
@@ -397,8 +415,8 @@
           edit foreach { case (c1, c2) => c1 foreach id_command; c2 foreach id_command }
       }
 
-      val assignment = global_state().the_assignment(change.previous).check_finished
-      global_state >> (_.define_version(change.version, assignment))
+      val assignment = global_state.value.the_assignment(change.previous).check_finished
+      global_state.change(_.define_version(change.version, assignment))
       prover.get.update(change.previous.id, change.version.id, change.doc_edits)
       resources.commit(change)
     }
@@ -413,14 +431,14 @@
       def bad_output()
       {
         if (verbose)
-          System.err.println("Ignoring prover output: " + output.message.toString)
+          System.err.println("Ignoring bad prover output: " + output.message.toString)
       }
 
       def accumulate(state_id: Document_ID.Generic, message: XML.Elem)
       {
         try {
-          val st = global_state >>> (_.accumulate(state_id, message))
-          delay_commands_changed.invoke(false, List(st.command))
+          val st = global_state.change_result(_.accumulate(state_id, message))
+          change_buffer.invoke(false, List(st.command))
         }
         catch {
           case _: Document.State.Fail => bad_output()
@@ -432,10 +450,10 @@
           val handled = _protocol_handlers.invoke(msg)
           if (!handled) {
             msg.properties match {
-              case Markup.Protocol_Handler(name) =>
+              case Markup.Protocol_Handler(name) if prover.isDefined =>
                 _protocol_handlers = _protocol_handlers.add(prover.get, name)
 
-              case Protocol.Command_Timing(state_id, timing) =>
+              case Protocol.Command_Timing(state_id, timing) if prover.isDefined =>
                 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
                 accumulate(state_id, prover.get.xml_cache.elem(message))
 
@@ -443,31 +461,32 @@
                 msg.text match {
                   case Protocol.Assign_Update(id, update) =>
                     try {
-                      val cmds = global_state >>> (_.assign(id, update))
-                      delay_commands_changed.invoke(true, cmds)
+                      val cmds = global_state.change_result(_.assign(id, update))
+                      change_buffer.invoke(true, cmds)
                     }
                     catch { case _: Document.State.Fail => bad_output() }
+                    postponed_changes.flush()
                   case _ => bad_output()
                 }
                 // FIXME separate timeout event/message!?
-                if (prover.isDefined && System.currentTimeMillis() > prune_next) {
-                  val old_versions = global_state >>> (_.prune_history(prune_size))
+                if (prover.isDefined && Time.now() > prune_next) {
+                  val old_versions = global_state.change_result(_.prune_history(prune_size))
                   if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
-                  prune_next = System.currentTimeMillis() + prune_delay.ms
+                  prune_next = Time.now() + prune_delay
                 }
 
               case Markup.Removed_Versions =>
                 msg.text match {
                   case Protocol.Removed(removed) =>
                     try {
-                      global_state >> (_.removed_versions(removed))
+                      global_state.change(_.removed_versions(removed))
                     }
                     catch { case _: Document.State.Fail => bad_output() }
                   case _ => bad_output()
                 }
 
               case Markup.ML_Statistics(props) =>
-                statistics.event(Session.Statistics(props))
+                statistics.post(Session.Statistics(props))
 
               case Markup.Task_Statistics(props) =>
                 // FIXME
@@ -484,114 +503,108 @@
               phase = Session.Ready
 
             case Markup.Return_Code(rc) if output.is_exit =>
+              prover = None
               if (rc == 0) phase = Session.Inactive
               else phase = Session.Failed
 
-            case _ => raw_output_messages.event(output)
+            case _ => raw_output_messages.post(output)
           }
         }
     }
     //}}}
 
 
-    /* main loop */
+    /* main thread */
 
-    //{{{
-    var finished = false
-    while (!finished) {
-      receiveWithin(delay_commands_changed.flush_timeout) {
-        case TIMEOUT => delay_commands_changed.flush()
-
-        case Start(name, args) if prover.isEmpty =>
-          if (phase == Session.Inactive || phase == Session.Failed) {
-            phase = Session.Startup
-            prover = Some(resources.start_prover(receiver.invoke _, name, args))
-          }
+    Consumer_Thread.fork[Any]("Session.manager", daemon = true)
+    {
+      case arg: Any =>
+        //{{{
+        arg match {
+          case Start(name, args) if prover.isEmpty =>
+            if (phase == Session.Inactive || phase == Session.Failed) {
+              phase = Session.Startup
+              prover = Some(resources.start_prover(receiver.invoke _, name, args))
+            }
 
-        case Stop =>
-          if (phase == Session.Ready) {
-            _protocol_handlers = _protocol_handlers.stop(prover.get)
-            global_state >> (_ => Document.State.init)  // FIXME event bus!?
-            phase = Session.Shutdown
-            prover.get.terminate
-            prover = None
-            phase = Session.Inactive
-          }
-          finished = true
-          receiver.cancel()
-          reply(())
+          case Stop =>
+            if (prover.isDefined && is_ready) {
+              _protocol_handlers = _protocol_handlers.stop(prover.get)
+              global_state.change(_ => Document.State.init)  // FIXME event bus!?
+              phase = Session.Shutdown
+              prover.get.terminate
+            }
 
-        case Update_Options(options) =>
-          if (prover.isDefined && is_ready) {
-            prover.get.options(options)
-            handle_raw_edits(Document.Blobs.empty, Nil)
-          }
-          global_options.event(Session.Global_Options(options))
-          reply(())
+          case Update_Options(options) =>
+            if (prover.isDefined && is_ready) {
+              prover.get.options(options)
+              handle_raw_edits(Document.Blobs.empty, Nil)
+            }
+            global_options.post(Session.Global_Options(options))
+
+          case Cancel_Exec(exec_id) if prover.isDefined =>
+            prover.get.cancel_exec(exec_id)
 
-        case Cancel_Exec(exec_id) if prover.isDefined =>
-          prover.get.cancel_exec(exec_id)
+          case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
+            handle_raw_edits(doc_blobs, edits)
 
-        case Session.Raw_Edits(doc_blobs, edits) if prover.isDefined =>
-          handle_raw_edits(doc_blobs, edits)
-          reply(())
+          case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
+            prover.get.dialog_result(serial, result)
+            handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
 
-        case Session.Dialog_Result(id, serial, result) if prover.isDefined =>
-          prover.get.dialog_result(serial, result)
-          handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result)))
+          case Protocol_Command(name, args) if prover.isDefined =>
+            prover.get.protocol_command(name, args:_*)
 
-        case Protocol_Command(name, args) if prover.isDefined =>
-          prover.get.protocol_command(name, args:_*)
+          case Messages(msgs) =>
+            msgs foreach {
+              case input: Prover.Input =>
+                all_messages.post(input)
 
-        case Messages(msgs) =>
-          msgs foreach {
-            case input: Prover.Input =>
-              all_messages.event(input)
+              case output: Prover.Output =>
+                if (output.is_stdout || output.is_stderr) raw_output_messages.post(output)
+                else handle_output(output)
+                if (output.is_syslog) syslog_messages.post(output)
+                all_messages.post(output)
+            }
 
-            case output: Prover.Output =>
-              if (output.is_stdout || output.is_stderr) raw_output_messages.event(output)
-              else handle_output(output)
-              if (output.is_syslog) syslog_messages.event(output)
-              all_messages.event(output)
-          }
-
-        case change: Session.Change
-        if prover.isDefined && global_state().is_assigned(change.previous) =>
-          handle_change(change)
-
-        case bad if !bad.isInstanceOf[Session.Change] =>
-          System.err.println("session_actor: ignoring bad message " + bad)
-      }
+          case change: Session.Change if prover.isDefined =>
+            if (global_state.value.is_assigned(change.previous))
+              handle_change(change)
+            else postponed_changes.store(change)
+        }
+        true
+        //}}}
     }
-    //}}}
   }
 
 
   /* actions */
 
   def start(name: String, args: List[String])
-  {
-    session_actor ! Start(name, args)
-  }
+  { manager.send(Start(name, args)) }
 
   def stop()
   {
-    commands_changed_buffer !? Stop
-    change_parser !? Stop
-    session_actor !? Stop
+    manager.send_wait(Stop)
+    receiver.shutdown()
+    change_parser.shutdown()
+    change_buffer.shutdown()
+    manager.shutdown()
+    dispatcher.shutdown()
   }
 
   def protocol_command(name: String, args: String*)
-  { session_actor ! Protocol_Command(name, args.toList) }
+  { manager.send(Protocol_Command(name, args.toList)) }
 
-  def cancel_exec(exec_id: Document_ID.Exec) { session_actor ! Cancel_Exec(exec_id) }
+  def cancel_exec(exec_id: Document_ID.Exec)
+  { manager.send(Cancel_Exec(exec_id)) }
 
   def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text])
-  { if (!edits.isEmpty) session_actor !? Session.Raw_Edits(doc_blobs, edits) }
+  { if (!edits.isEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) }
 
   def update_options(options: Options)
-  { session_actor !? Update_Options(options) }
+  { manager.send_wait(Update_Options(options)) }
 
   def dialog_result(id: Document_ID.Generic, serial: Long, result: String)
-  { session_actor ! Session.Dialog_Result(id, serial, result) }
+  { manager.send(Session.Dialog_Result(id, serial, result)) }
 }
--- a/src/Pure/System/event_bus.scala	Fri Apr 25 12:09:15 2014 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,37 +0,0 @@
-/*  Title:      Pure/System/event_bus.scala
-    Module:     PIDE
-    Author:     Makarius
-
-Generic event bus with multiple receiving actors.
-*/
-
-package isabelle
-
-
-import scala.actors.Actor, Actor._
-import scala.collection.mutable.ListBuffer
-
-
-class Event_Bus[Event]
-{
-  /* receivers */
-
-  private val receivers = new ListBuffer[Actor]
-
-  def += (r: Actor) { synchronized { receivers += r } }
-  def + (r: Actor): Event_Bus[Event] = { this += r; this }
-
-  def += (f: Event => Unit) {
-    this += actor { loop { react { case x => f(x.asInstanceOf[Event]) } } }
-  }
-
-  def + (f: Event => Unit): Event_Bus[Event] = { this += f; this }
-
-  def -= (r: Actor) { synchronized { receivers -= r } }
-  def - (r: Actor) = { this -= r; this }
-
-
-  /* event invocation */
-
-  def event(x: Event) { synchronized { receivers.foreach(_ ! x) } }
-}
--- a/src/Pure/System/isabelle_process.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/System/isabelle_process.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -8,12 +8,7 @@
 package isabelle
 
 
-import java.util.concurrent.LinkedBlockingQueue
-import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
-  InputStream, OutputStream, BufferedOutputStream, IOException}
-
-import scala.actors.Actor
-import Actor._
+import java.io.{InputStream, OutputStream, BufferedOutputStream, IOException}
 
 
 class Isabelle_Process(
@@ -55,22 +50,6 @@
   }
 
 
-  /* command input actor */
-
-  private case class Input_Chunks(chunks: List[Bytes])
-
-  private case object Close
-  private def close(p: (Thread, Actor))
-  {
-    if (p != null && p._1.isAlive) {
-      p._2 ! Close
-      p._1.join
-    }
-  }
-
-  @volatile private var command_input: (Thread, Actor) = null
-
-
 
   /** process manager **/
 
@@ -126,16 +105,15 @@
     else {
       val (command_stream, message_stream) = system_channel.rendezvous()
 
-      val stdout = physical_output_actor(false)
-      val stderr = physical_output_actor(true)
-      command_input = input_actor(command_stream)
-      val message = message_actor(message_stream)
+      command_input_init(command_stream)
+      val stdout = physical_output(false)
+      val stderr = physical_output(true)
+      val message = message_output(message_stream)
 
       val rc = process_result.join
       system_output("process terminated")
-      close(command_input)
-      for ((thread, _) <- List(stdout, stderr, command_input, message))
-        thread.join
+      command_input_close()
+      for (thread <- List(stdout, stderr, message)) thread.join
       system_output("process_manager terminated")
       exit_message(rc)
     }
@@ -155,24 +133,54 @@
 
   def terminate()
   {
-    close(command_input)
+    command_input_close()
     system_output("Terminating Isabelle process")
     terminate_process()
   }
 
 
 
-  /** stream actors **/
+  /** process streams **/
+
+  /* command input */
+
+  private var command_input: Option[Consumer_Thread[List[Bytes]]] = None
+
+  private def command_input_close(): Unit = command_input.foreach(_.shutdown)
+
+  private def command_input_init(raw_stream: OutputStream)
+  {
+    val name = "command_input"
+    val stream = new BufferedOutputStream(raw_stream)
+    command_input =
+      Some(
+        Consumer_Thread.fork(name)(
+          consume =
+            {
+              case chunks =>
+                try {
+                  Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
+                  chunks.foreach(_.write(stream))
+                  stream.flush
+                  true
+                }
+                catch { case e: IOException => system_output(name + ": " + e.getMessage); false }
+            },
+          finish = { case () => stream.close; system_output(name + " terminated") }
+        )
+      )
+  }
+
 
   /* physical output */
 
-  private def physical_output_actor(err: Boolean): (Thread, Actor) =
+  private def physical_output(err: Boolean): Thread =
   {
     val (name, reader, markup) =
       if (err) ("standard_error", process.stderr, Markup.STDERR)
       else ("standard_output", process.stdout, Markup.STDOUT)
 
-    Simple_Thread.actor(name) {
+    Simple_Thread.fork(name) {
       try {
         var result = new StringBuilder(100)
         var finished = false
@@ -202,45 +210,15 @@
   }
 
 
-  /* command input */
-
-  private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
-  {
-    val name = "command_input"
-    Simple_Thread.actor(name) {
-      try {
-        val stream = new BufferedOutputStream(raw_stream)
-        var finished = false
-        while (!finished) {
-          //{{{
-          receive {
-            case Input_Chunks(chunks) =>
-              Bytes(chunks.map(_.length).mkString("", ",", "\n")).write(stream)
-              chunks.foreach(_.write(stream))
-              stream.flush
-            case Close =>
-              stream.close
-              finished = true
-            case bad => System.err.println(name + ": ignoring bad message " + bad)
-          }
-          //}}}
-        }
-      }
-      catch { case e: IOException => system_output(name + ": " + e.getMessage) }
-      system_output(name + " terminated")
-    }
-  }
-
-
   /* message output */
 
-  private def message_actor(stream: InputStream): (Thread, Actor) =
+  private def message_output(stream: InputStream): Thread =
   {
     class EOF extends Exception
     class Protocol_Error(msg: String) extends Exception(msg)
 
     val name = "message_output"
-    Simple_Thread.actor(name) {
+    Simple_Thread.fork(name) {
       val default_buffer = new Array[Byte](65536)
       var c = -1
 
@@ -328,7 +306,10 @@
   /** protocol commands **/
 
   def protocol_command_bytes(name: String, args: Bytes*): Unit =
-    command_input._2 ! Input_Chunks(Bytes(name) :: args.toList)
+    command_input match {
+      case Some(thread) => thread.send(Bytes(name) :: args.toList)
+      case None => error("Uninitialized command input thread")
+    }
 
   def protocol_command(name: String, args: String*)
   {
--- a/src/Pure/Tools/simplifier_trace.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/Tools/simplifier_trace.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -7,7 +7,6 @@
 package isabelle
 
 
-import scala.actors.Actor._
 import scala.annotation.tailrec
 import scala.collection.immutable.SortedMap
 
@@ -102,23 +101,21 @@
   case object Event
 
 
-  /* manager actor */
+  /* manager thread */
 
-  private case class Handle_Results(session: Session, id: Document_ID.Command, results: Command.Results)
-  private case class Generate_Trace(results: Command.Results)
+  private case class Handle_Results(
+    session: Session, id: Document_ID.Command, results: Command.Results, slot: Promise[Context])
+  private case class Generate_Trace(results: Command.Results, slot: Promise[Trace])
   private case class Cancel(serial: Long)
   private object Clear_Memory
-  private object Stop
   case class Reply(session: Session, serial: Long, answer: Answer)
 
   case class Question(data: Item.Data, answers: List[Answer], default_answer: Answer)
 
   case class Context(
     last_serial: Long = 0L,
-    questions: SortedMap[Long, Question] = SortedMap.empty
-  )
+    questions: SortedMap[Long, Question] = SortedMap.empty)
   {
-
     def +(q: Question): Context =
       copy(questions = questions + ((q.data.serial, q)))
 
@@ -127,7 +124,6 @@
 
     def with_serial(s: Long): Context =
       copy(last_serial = Math.max(last_serial, s))
-
   }
 
   case class Trace(entries: List[Item.Data])
@@ -141,18 +137,27 @@
   }
 
   def handle_results(session: Session, id: Document_ID.Command, results: Command.Results): Context =
-    (manager !? Handle_Results(session, id, results)).asInstanceOf[Context]
+  {
+    val slot = Future.promise[Context]
+    manager.send(Handle_Results(session, id, results, slot))
+    slot.join
+  }
 
   def generate_trace(results: Command.Results): Trace =
-    (manager !? Generate_Trace(results)).asInstanceOf[Trace]
+  {
+    val slot = Future.promise[Trace]
+    manager.send(Generate_Trace(results, slot))
+    slot.join
+  }
 
   def clear_memory() =
-    manager ! Clear_Memory
+    manager.send(Clear_Memory)
 
   def send_reply(session: Session, serial: Long, answer: Answer) =
-    manager ! Reply(session, serial, answer)
+    manager.send(Reply(session, serial, answer))
 
-  private val manager = actor {
+  private lazy val manager: Consumer_Thread[Any] =
+  {
     var contexts = Map.empty[Document_ID.Command, Context]
 
     var memory_children = Map.empty[Long, Set[Long]]
@@ -175,124 +180,125 @@
 
     def do_reply(session: Session, serial: Long, answer: Answer)
     {
-      session.protocol_command("Simplifier_Trace.reply", Properties.Value.Long(serial), answer.name)
+      session.protocol_command(
+        "Simplifier_Trace.reply", Properties.Value.Long(serial), answer.name)
     }
 
-    loop {
-      react {
-        case Handle_Results(session, id, results) =>
-          var new_context = contexts.getOrElse(id, Context())
-          var new_serial = new_context.last_serial
+    Consumer_Thread.fork[Any]("Simplifier_Trace.manager", daemon = true)(
+      consume = (arg: Any) =>
+      {
+        arg match {
+          case Handle_Results(session, id, results, slot) =>
+            var new_context = contexts.getOrElse(id, Context())
+            var new_serial = new_context.last_serial
 
-          for ((serial, result) <- results.iterator if serial > new_context.last_serial)
-          {
-            result match {
-              case Item(markup, data) =>
-                memory_children += (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial))
-
-                markup match {
+            for ((serial, result) <- results.iterator if serial > new_context.last_serial)
+            {
+              result match {
+                case Item(markup, data) =>
+                  memory_children +=
+                    (data.parent -> (memory_children.getOrElse(data.parent, Set.empty) + serial))
 
-                  case Markup.SIMP_TRACE_STEP =>
-                    val index = Index.of_data(data)
-                    memory.get(index) match {
-                      case Some(answer) if data.memory =>
-                        do_reply(session, serial, answer)
-                      case _ =>
-                        new_context += Question(data, Answer.step.all, Answer.step.default)
-                    }
+                  markup match {
 
-                  case Markup.SIMP_TRACE_HINT =>
-                    data.props match {
-                      case Success(false) =>
-                        results.get(data.parent) match {
-                          case Some(Item(Markup.SIMP_TRACE_STEP, _)) =>
-                            new_context += Question(data, Answer.hint_fail.all, Answer.hint_fail.default)
-                          case _ =>
-                            // unknown, better send a default reply
-                            do_reply(session, data.serial, Answer.hint_fail.default)
-                        }
-                      case _ =>
-                    }
+                    case Markup.SIMP_TRACE_STEP =>
+                      val index = Index.of_data(data)
+                      memory.get(index) match {
+                        case Some(answer) if data.memory =>
+                          do_reply(session, serial, answer)
+                        case _ =>
+                          new_context += Question(data, Answer.step.all, Answer.step.default)
+                      }
 
-                  case Markup.SIMP_TRACE_IGNORE =>
-                    // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP'
-                    // entry, and that that 'STEP' entry is about to be replayed. Hence, we need
-                    // to selectively purge the replies which have been memorized, going down from
-                    // the parent to all leaves.
-
-                    @tailrec
-                    def purge(queue: Vector[Long]): Unit =
-                      queue match {
-                        case s +: rest =>
-                          for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s))
-                            memory -= Index.of_data(data)
-                          val children = memory_children.getOrElse(s, Set.empty)
-                          memory_children -= s
-                          purge(rest ++ children.toVector)
+                    case Markup.SIMP_TRACE_HINT =>
+                      data.props match {
+                        case Success(false) =>
+                          results.get(data.parent) match {
+                            case Some(Item(Markup.SIMP_TRACE_STEP, _)) =>
+                              new_context +=
+                                Question(data, Answer.hint_fail.all, Answer.hint_fail.default)
+                            case _ =>
+                              // unknown, better send a default reply
+                              do_reply(session, data.serial, Answer.hint_fail.default)
+                          }
                         case _ =>
                       }
 
-                    purge(Vector(data.parent))
+                    case Markup.SIMP_TRACE_IGNORE =>
+                      // At this point, we know that the parent of this 'IGNORE' entry is a 'STEP'
+                      // entry, and that that 'STEP' entry is about to be replayed. Hence, we need
+                      // to selectively purge the replies which have been memorized, going down from
+                      // the parent to all leaves.
 
-                  case _ =>
-                }
+                      @tailrec
+                      def purge(queue: Vector[Long]): Unit =
+                        queue match {
+                          case s +: rest =>
+                            for (Item(Markup.SIMP_TRACE_STEP, data) <- results.get(s))
+                              memory -= Index.of_data(data)
+                            val children = memory_children.getOrElse(s, Set.empty)
+                            memory_children -= s
+                            purge(rest ++ children.toVector)
+                          case _ =>
+                        }
 
-              case _ =>
+                      purge(Vector(data.parent))
+
+                    case _ =>
+                  }
+
+                case _ =>
+              }
+
+              new_serial = serial
             }
 
-            new_serial = serial
-          }
+            new_context = new_context.with_serial(new_serial)
+            contexts += (id -> new_context)
+            slot.fulfill(new_context)
 
-          new_context = new_context.with_serial(new_serial)
-          contexts += (id -> new_context)
-          reply(new_context)
-
-        case Generate_Trace(results) =>
-          // Since there are potentially lots of trace messages, we do not cache them here again.
-          // Instead, everytime the trace is being requested, we re-assemble it based on the
-          // current results.
+          case Generate_Trace(results, slot) =>
+            // Since there are potentially lots of trace messages, we do not cache them here again.
+            // Instead, everytime the trace is being requested, we re-assemble it based on the
+            // current results.
 
-          val items =
-            (for { (_, Item(_, data)) <- results.iterator }
-              yield data).toList
+            val items =
+              (for { (_, Item(_, data)) <- results.iterator }
+                yield data).toList
 
-          reply(Trace(items))
+            slot.fulfill(Trace(items))
 
-        case Cancel(serial) =>
-          find_question(serial) match {
-            case Some((id, _)) =>
-              do_cancel(serial, id)
-            case None =>
-          }
+          case Cancel(serial) =>
+            find_question(serial) match {
+              case Some((id, _)) =>
+                do_cancel(serial, id)
+              case None =>
+            }
 
-        case Clear_Memory =>
-          memory_children = Map.empty
-          memory = Map.empty
-
-        case Stop =>
-          contexts = Map.empty
-          exit("Simplifier_Trace: manager actor stopped")
+          case Clear_Memory =>
+            memory_children = Map.empty
+            memory = Map.empty
 
-        case Reply(session, serial, answer) =>
-          find_question(serial) match {
-            case Some((id, Question(data, _, _))) =>
-              if (data.markup == Markup.SIMP_TRACE_STEP && data.memory)
-              {
-                val index = Index.of_data(data)
-                memory += (index -> answer)
-              }
-              do_cancel(serial, id)
-            case None =>
-              System.err.println("send_reply: unknown serial " + serial)
-          }
+          case Reply(session, serial, answer) =>
+            find_question(serial) match {
+              case Some((id, Question(data, _, _))) =>
+                if (data.markup == Markup.SIMP_TRACE_STEP && data.memory)
+                {
+                  val index = Index.of_data(data)
+                  memory += (index -> answer)
+                }
+                do_cancel(serial, id)
+              case None =>
+                System.err.println("send_reply: unknown serial " + serial)
+            }
 
-          do_reply(session, serial, answer)
-          session.trace_events.event(Event)
-
-        case bad =>
-          System.err.println("context_manager: bad message " + bad)
-      }
-    }
+            do_reply(session, serial, answer)
+            session.trace_events.post(Event)
+        }
+        true
+      },
+      finish = () => contexts = Map.empty
+    )
   }
 
 
@@ -300,10 +306,12 @@
 
   class Handler extends Session.Protocol_Handler
   {
+    assert(manager.is_active)
+
     private def cancel(prover: Prover, msg: Prover.Protocol_Output): Boolean =
       msg.properties match {
         case Markup.Simp_Trace_Cancel(serial) =>
-          manager ! Cancel(serial)
+          manager.send(Cancel(serial))
           true
         case _ =>
           false
@@ -311,8 +319,8 @@
 
     override def stop(prover: Prover) =
     {
-      manager ! Clear_Memory
-      manager ! Stop
+      manager.send(Clear_Memory)
+      manager.shutdown()
     }
 
     val functions = Map(Markup.SIMP_TRACE_CANCEL -> cancel _)
--- a/src/Pure/build-jars	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/build-jars	Fri Apr 25 14:39:11 2014 +0200
@@ -9,10 +9,12 @@
 ## sources
 
 declare -a SOURCES=(
+  Concurrent/consumer_thread.scala
   Concurrent/counter.scala
   Concurrent/future.scala
+  Concurrent/mailbox.scala
   Concurrent/simple_thread.scala
-  Concurrent/volatile.scala
+  Concurrent/synchronized.scala
   General/antiquote.scala
   General/bytes.scala
   General/completion.scala
@@ -62,7 +64,6 @@
   PIDE/xml.scala
   PIDE/yxml.scala
   System/command_line.scala
-  System/event_bus.scala
   System/interrupt.scala
   System/invoke_scala.scala
   System/isabelle_charset.scala
--- a/src/Pure/library.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Pure/library.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -153,6 +153,14 @@
   }
 
 
+  /* canonical list operations */
+
+  def member[A, B](xs: List[A])(x: B): Boolean = xs.exists(_ == x)
+  def insert[A](x: A)(xs: List[A]): List[A] = if (xs.contains(x)) xs else x :: xs
+  def remove[A, B](x: B)(xs: List[A]): List[A] = if (member(xs)(x)) xs.filterNot(_ == x) else xs
+  def update[A](x: A)(xs: List[A]): List[A] = x :: remove(x)(xs)
+
+
   /* Java futures */
 
   def future_value[A](x: A) = new JFuture[A]
--- a/src/Tools/jEdit/etc/settings	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/etc/settings	Fri Apr 25 14:39:11 2014 +0200
@@ -4,9 +4,9 @@
 JEDIT_SETTINGS="$ISABELLE_HOME_USER/jedit"
 
 JEDIT_OPTIONS="-reuseview -noserver -nobackground -log=9"
-#JEDIT_JAVA_OPTIONS="-Xms128m -Xmx512m -Xss1m -Dactors.corePoolSize=4 -Dactors.enableForkJoin=false"
-JEDIT_JAVA_OPTIONS="-Xms128m -Xmx1024m -Xss2m -Dactors.corePoolSize=4 -Dactors.enableForkJoin=false"
-#JEDIT_JAVA_OPTIONS="-Xms512m -Xmx4096m -Xss8m -Dactors.corePoolSize=4 -Dactors.enableForkJoin=false"
+#JEDIT_JAVA_OPTIONS="-Xms128m -Xmx512m -Xss1m -Dscala.concurrent.context.minThreads=1 -Dscala.concurrent.context.numThreads=x0.5 -Dscala.concurrent.context.maxThreads=8"
+JEDIT_JAVA_OPTIONS="-Xms128m -Xmx1024m -Xss2m -Dscala.concurrent.context.minThreads=1 -Dscala.concurrent.context.numThreads=x0.5 -Dscala.concurrent.context.maxThreads=8"
+#JEDIT_JAVA_OPTIONS="-Xms512m -Xmx4096m -Xss8m -Dscala.concurrent.context.minThreads=1 -Dscala.concurrent.context.numThreads=x0.5 -Dscala.concurrent.context.maxThreads=8"
 JEDIT_SYSTEM_OPTIONS="-Dawt.useSystemAAFontSettings=on -Dswing.aatext=true -Dapple.laf.useScreenMenuBar=true -Dapple.awt.application.name=Isabelle -Dscala.repl.no-threads=true"
 
 ISABELLE_JEDIT_OPTIONS=""
--- a/src/Tools/jEdit/src/document_view.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/document_view.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -10,8 +10,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
-
 import java.awt.Graphics2D
 import java.awt.event.KeyEvent
 import javax.swing.event.{CaretListener, CaretEvent}
@@ -176,7 +174,7 @@
 
   private val delay_caret_update =
     Swing_Thread.delay_last(PIDE.options.seconds("editor_input_delay")) {
-      session.caret_focus.event(Session.Caret_Focus)
+      session.caret_focus.post(Session.Caret_Focus)
     }
 
   private val caret_listener = new CaretListener {
@@ -193,60 +191,54 @@
   }
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case _: Session.Raw_Edits =>
-          Swing_Thread.later {
-            overview.delay_repaint.postpone(PIDE.options.seconds("editor_input_delay"))
-          }
+  private val main =
+    Session.Consumer[Any](getClass.getName) {
+      case _: Session.Raw_Edits =>
+        Swing_Thread.later {
+          overview.delay_repaint.postpone(PIDE.options.seconds("editor_input_delay"))
+        }
 
-        case changed: Session.Commands_Changed =>
-          val buffer = model.buffer
-          Swing_Thread.later {
-            JEdit_Lib.buffer_lock(buffer) {
-              if (model.buffer == text_area.getBuffer) {
-                val snapshot = model.snapshot()
+      case changed: Session.Commands_Changed =>
+        val buffer = model.buffer
+        Swing_Thread.later {
+          JEdit_Lib.buffer_lock(buffer) {
+            if (model.buffer == text_area.getBuffer) {
+              val snapshot = model.snapshot()
 
-                val load_changed =
-                  snapshot.load_commands.exists(changed.commands.contains)
+              val load_changed =
+                snapshot.load_commands.exists(changed.commands.contains)
 
-                if (changed.assignment || load_changed ||
-                    (changed.nodes.contains(model.node_name) &&
-                     changed.commands.exists(snapshot.node.commands.contains)))
-                  Swing_Thread.later { overview.delay_repaint.invoke() }
+              if (changed.assignment || load_changed ||
+                  (changed.nodes.contains(model.node_name) &&
+                   changed.commands.exists(snapshot.node.commands.contains)))
+                Swing_Thread.later { overview.delay_repaint.invoke() }
 
-                val visible_lines = text_area.getVisibleLines
-                if (visible_lines > 0) {
-                  if (changed.assignment || load_changed)
-                    text_area.invalidateScreenLineRange(0, visible_lines)
-                  else {
-                    val visible_range = JEdit_Lib.visible_range(text_area).get
-                    val visible_iterator =
-                      snapshot.node.command_iterator(snapshot.revert(visible_range)).map(_._1)
-                    if (visible_iterator.exists(changed.commands)) {
-                      for {
-                        line <- (0 until visible_lines).iterator
-                        start = text_area.getScreenLineStartOffset(line) if start >= 0
-                        end = text_area.getScreenLineEndOffset(line) if end >= 0
-                        range = Text.Range(start, end)
-                        line_cmds = snapshot.node.command_iterator(snapshot.revert(range)).map(_._1)
-                        if line_cmds.exists(changed.commands)
-                      } text_area.invalidateScreenLineRange(line, line)
-                    }
+              val visible_lines = text_area.getVisibleLines
+              if (visible_lines > 0) {
+                if (changed.assignment || load_changed)
+                  text_area.invalidateScreenLineRange(0, visible_lines)
+                else {
+                  val visible_range = JEdit_Lib.visible_range(text_area).get
+                  val visible_iterator =
+                    snapshot.node.command_iterator(snapshot.revert(visible_range)).map(_._1)
+                  if (visible_iterator.exists(changed.commands)) {
+                    for {
+                      line <- (0 until visible_lines).iterator
+                      start = text_area.getScreenLineStartOffset(line) if start >= 0
+                      end = text_area.getScreenLineEndOffset(line) if end >= 0
+                      range = Text.Range(start, end)
+                      line_cmds = snapshot.node.command_iterator(snapshot.revert(range)).map(_._1)
+                      if line_cmds.exists(changed.commands)
+                    } text_area.invalidateScreenLineRange(line, line)
                   }
                 }
               }
             }
           }
-
-        case bad =>
-          System.err.println("command_change_actor: ignoring bad message " + bad)
-      }
+        }
     }
-  }
 
 
   /* activation */
@@ -261,16 +253,16 @@
     text_area.addKeyListener(key_listener)
     text_area.addCaretListener(caret_listener)
     text_area.addLeftOfScrollBar(overview)
-    session.raw_edits += main_actor
-    session.commands_changed += main_actor
+    session.raw_edits += main
+    session.commands_changed += main
   }
 
   private def deactivate()
   {
     val painter = text_area.getPainter
 
-    session.raw_edits -= main_actor
-    session.commands_changed -= main_actor
+    session.raw_edits -= main
+    session.commands_changed -= main
     text_area.removeLeftOfScrollBar(overview); overview.delay_repaint.revoke()
     text_area.removeCaretListener(caret_listener); delay_caret_update.revoke()
     text_area.removeKeyListener(key_listener)
--- a/src/Tools/jEdit/src/find_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/find_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,8 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
-
 import scala.swing.{Button, Component, TextField, CheckBox, Label, ComboBox}
 import scala.swing.event.ButtonClicked
 
@@ -68,23 +66,16 @@
   })
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case _: Session.Global_Options =>
-          Swing_Thread.later { handle_resize() }
-
-        case bad =>
-          System.err.println("Find_Dockable: ignoring bad message " + bad)
-      }
+  private val main =
+    Session.Consumer[Session.Global_Options](getClass.getName) {
+      case _: Session.Global_Options => Swing_Thread.later { handle_resize() }
     }
-  }
 
   override def init()
   {
-    PIDE.session.global_options += main_actor
+    PIDE.session.global_options += main
     handle_resize()
     find_theorems.activate()
   }
@@ -92,7 +83,7 @@
   override def exit()
   {
     find_theorems.deactivate()
-    PIDE.session.global_options -= main_actor
+    PIDE.session.global_options -= main
     delay_resize.revoke()
   }
 
--- a/src/Tools/jEdit/src/info_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/info_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,8 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
-
 import scala.swing.Button
 import scala.swing.event.ButtonClicked
 
@@ -97,30 +95,24 @@
   add(controls.peer, BorderLayout.NORTH)
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case _: Session.Global_Options =>
-          Swing_Thread.later { handle_resize() }
-
-        case bad => System.err.println("Info_Dockable: ignoring bad message " + bad)
-      }
+  private val main =
+    Session.Consumer[Session.Global_Options](getClass.getName) {
+      case _: Session.Global_Options => Swing_Thread.later { handle_resize() }
     }
-  }
 
   override def init()
   {
     GUI.parent_window(this).map(_.addWindowFocusListener(window_focus_listener))
-    PIDE.session.global_options += main_actor
+    PIDE.session.global_options += main
     handle_resize()
   }
 
   override def exit()
   {
     GUI.parent_window(this).map(_.removeWindowFocusListener(window_focus_listener))
-    PIDE.session.global_options -= main_actor
+    PIDE.session.global_options -= main
     delay_resize.revoke()
   }
 }
--- a/src/Tools/jEdit/src/jedit_lib.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/jedit_lib.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -215,9 +215,12 @@
     val buffer = text_area.getBuffer
     buffer_range(buffer).try_restrict(range) match {
       case Some(range1) if !range1.is_singularity =>
-        text_area.invalidateLineRange(
-          buffer.getLineOfOffset(range1.start),
-          buffer.getLineOfOffset(range1.stop))
+        try {
+          text_area.invalidateLineRange(
+            buffer.getLineOfOffset(range1.start),
+            buffer.getLineOfOffset(range1.stop))
+        }
+        catch { case _: ArrayIndexOutOfBoundsException => }
       case _ =>
     }
   }
--- a/src/Tools/jEdit/src/monitor_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/monitor_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,7 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
 import scala.swing.{TextArea, ScrollPane, Component}
 
 import org.jfree.chart.ChartPanel
@@ -35,23 +34,18 @@
   set_content(new ChartPanel(chart))
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case Session.Statistics(props) =>
-          Swing_Thread.later {
-            rev_stats ::= props
-            delay_update.invoke()
-          }
+  private val main =
+    Session.Consumer[Session.Statistics](getClass.getName) {
+      case stats =>
+        Swing_Thread.later {
+          rev_stats ::= stats.props
+          delay_update.invoke()
+        }
+    }
 
-        case bad => System.err.println("Monitor_Dockable: ignoring bad message " + bad)
-      }
-    }
-  }
-
-  override def init() { PIDE.session.statistics += main_actor }
-  override def exit() { PIDE.session.statistics -= main_actor }
+  override def init() { PIDE.session.statistics += main }
+  override def exit() { PIDE.session.statistics -= main }
 }
 
--- a/src/Tools/jEdit/src/output_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/output_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,8 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
-
 import scala.swing.{Button, CheckBox}
 import scala.swing.event.ButtonClicked
 
@@ -82,39 +80,34 @@
   }
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case _: Session.Global_Options =>
-          Swing_Thread.later { handle_resize() }
+  private val main =
+    Session.Consumer[Any](getClass.getName) {
+      case _: Session.Global_Options =>
+        Swing_Thread.later { handle_resize() }
 
-        case changed: Session.Commands_Changed =>
-          val restriction = if (changed.assignment) None else Some(changed.commands)
-          Swing_Thread.later { handle_update(do_update, restriction) }
+      case changed: Session.Commands_Changed =>
+        val restriction = if (changed.assignment) None else Some(changed.commands)
+        Swing_Thread.later { handle_update(do_update, restriction) }
 
-        case Session.Caret_Focus =>
-          Swing_Thread.later { handle_update(do_update, None) }
-
-        case bad => System.err.println("Output_Dockable: ignoring bad message " + bad)
-      }
+      case Session.Caret_Focus =>
+        Swing_Thread.later { handle_update(do_update, None) }
     }
-  }
 
   override def init()
   {
-    PIDE.session.global_options += main_actor
-    PIDE.session.commands_changed += main_actor
-    PIDE.session.caret_focus += main_actor
+    PIDE.session.global_options += main
+    PIDE.session.commands_changed += main
+    PIDE.session.caret_focus += main
     handle_update(true, None)
   }
 
   override def exit()
   {
-    PIDE.session.global_options -= main_actor
-    PIDE.session.commands_changed -= main_actor
-    PIDE.session.caret_focus -= main_actor
+    PIDE.session.global_options -= main
+    PIDE.session.commands_changed -= main
+    PIDE.session.caret_focus -= main
     delay_resize.revoke()
   }
 
--- a/src/Tools/jEdit/src/plugin.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/plugin.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -22,8 +22,6 @@
 
 import org.gjt.sp.util.SyntaxUtilities
 
-import scala.actors.Actor._
-
 
 object PIDE
 {
@@ -174,7 +172,7 @@
 
   def options_changed()
   {
-    PIDE.session.global_options.event(Session.Global_Options(PIDE.options.value))
+    PIDE.session.global_options.post(Session.Global_Options(PIDE.options.value))
     Swing_Thread.later { delay_load.invoke() }
   }
 
@@ -244,34 +242,27 @@
     }
 
 
-  /* session manager */
+  /* session phase */
 
-  private val session_manager = actor {
-    loop {
-      react {
-        case phase: Session.Phase =>
-          phase match {
-            case Session.Inactive | Session.Failed =>
-              Swing_Thread.later {
-                GUI.error_dialog(jEdit.getActiveView, "Prover process terminated",
-                    "Isabelle Syslog", GUI.scrollable_text(PIDE.session.current_syslog()))
-              }
+  private val session_phase =
+    Session.Consumer[Session.Phase](getClass.getName) {
+      case Session.Inactive | Session.Failed =>
+        Swing_Thread.later {
+          GUI.error_dialog(jEdit.getActiveView, "Prover process terminated",
+              "Isabelle Syslog", GUI.scrollable_text(PIDE.session.current_syslog()))
+        }
 
-            case Session.Ready =>
-              PIDE.session.update_options(PIDE.options.value)
-              PIDE.init_models()
-              Swing_Thread.later { delay_load.invoke() }
+      case Session.Ready =>
+        PIDE.session.update_options(PIDE.options.value)
+        PIDE.init_models()
+        Swing_Thread.later { delay_load.invoke() }
 
-            case Session.Shutdown =>
-              PIDE.exit_models(JEdit_Lib.jedit_buffers().toList)
-              Swing_Thread.later { delay_load.revoke() }
+      case Session.Shutdown =>
+        PIDE.exit_models(JEdit_Lib.jedit_buffers().toList)
+        Swing_Thread.later { delay_load.revoke() }
 
-            case _ =>
-          }
-        case bad => System.err.println("session_manager: ignoring bad message " + bad)
-      }
+      case _ =>
     }
-  }
 
 
   /* main plugin plumbing */
@@ -366,7 +357,7 @@
         override def reparse_limit = PIDE.options.int("editor_reparse_limit")
       }
 
-      PIDE.session.phase_changed += session_manager
+      PIDE.session.phase_changed += session_phase
       PIDE.startup_failure = None
     }
     catch {
@@ -385,7 +376,7 @@
       PIDE.completion_history.value.save()
     }
 
-    PIDE.session.phase_changed -= session_manager
+    PIDE.session.phase_changed -= session_phase
     PIDE.exit_models(JEdit_Lib.jedit_buffers().toList)
     PIDE.session.stop()
   }
--- a/src/Tools/jEdit/src/pretty_text_area.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/pretty_text_area.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -36,7 +36,7 @@
     val nodes1 = nodes0 + (node_name -> nodes0(node_name).update_commands(Linear_Set(command)))
     val version1 = Document.Version.make(version0.syntax, nodes1)
     val state1 =
-      state0.continue_history(Future.value(version0), edits, Future.value(version1))._2
+      state0.continue_history(Future.value(version0), edits, Future.value(version1))
         .define_version(version1, state0.the_assignment(version0))
         .assign(version1.id, List(command.id -> List(Document_ID.make())))._2
 
--- a/src/Tools/jEdit/src/protocol_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/protocol_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,7 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
 import scala.swing.{TextArea, ScrollPane}
 
 import org.gjt.sp.jedit.View
@@ -21,22 +20,17 @@
   set_content(new ScrollPane(text_area))
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case input: Prover.Input =>
-          Swing_Thread.later { text_area.append(input.toString + "\n\n") }
+  private val main =
+    Session.Consumer[Prover.Message](getClass.getName) {
+      case input: Prover.Input =>
+        Swing_Thread.later { text_area.append(input.toString + "\n\n") }
 
-        case output: Prover.Output =>
-          Swing_Thread.later { text_area.append(output.message.toString + "\n\n") }
-
-        case bad => System.err.println("Protocol_Dockable: ignoring bad message " + bad)
-      }
+      case output: Prover.Output =>
+        Swing_Thread.later { text_area.append(output.message.toString + "\n\n") }
     }
-  }
 
-  override def init() { PIDE.session.all_messages += main_actor }
-  override def exit() { PIDE.session.all_messages -= main_actor }
+  override def init() { PIDE.session.all_messages += main }
+  override def exit() { PIDE.session.all_messages -= main }
 }
--- a/src/Tools/jEdit/src/raw_output_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/raw_output_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,7 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
 import scala.swing.{TextArea, ScrollPane}
 
 import org.gjt.sp.jedit.View
@@ -21,22 +20,17 @@
   set_content(new ScrollPane(text_area))
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case output: Prover.Output =>
-          Swing_Thread.later {
-            text_area.append(XML.content(output.message))
-            if (!output.is_stdout && !output.is_stderr) text_area.append("\n")
-          }
+  private val main =
+    Session.Consumer[Prover.Output](getClass.getName) {
+      case output: Prover.Output =>
+        Swing_Thread.later {
+          text_area.append(XML.content(output.message))
+          if (!output.is_stdout && !output.is_stderr) text_area.append("\n")
+        }
+    }
 
-        case bad => System.err.println("Raw_Output_Dockable: ignoring bad message " + bad)
-      }
-    }
-  }
-
-  override def init() { PIDE.session.raw_output_messages += main_actor }
-  override def exit() { PIDE.session.raw_output_messages -= main_actor }
+  override def init() { PIDE.session.raw_output_messages += main }
+  override def exit() { PIDE.session.raw_output_messages -= main }
 }
--- a/src/Tools/jEdit/src/simplifier_trace_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/simplifier_trace_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,7 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
 import scala.swing.{Button, CheckBox, Orientation, Separator}
 import scala.swing.event.ButtonClicked
 
@@ -127,32 +126,31 @@
   }
 
 
-  /* main actor */
+  /* main */
+
+  private val main =
+    Session.Consumer[Any](getClass.getName) {
+      case _: Session.Global_Options =>
+        Swing_Thread.later { handle_resize() }
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case _: Session.Global_Options =>
-          Swing_Thread.later { handle_resize() }
-        case changed: Session.Commands_Changed =>
-          Swing_Thread.later { handle_update(do_update) }
-        case Session.Caret_Focus =>
-          Swing_Thread.later { handle_update(do_update) }
-        case Simplifier_Trace.Event =>
-          Swing_Thread.later { handle_update(do_update) }
-        case bad => System.err.println("Simplifier_Trace_Dockable: ignoring bad message " + bad)
-      }
+      case changed: Session.Commands_Changed =>
+        Swing_Thread.later { handle_update(do_update) }
+
+      case Session.Caret_Focus =>
+        Swing_Thread.later { handle_update(do_update) }
+
+      case Simplifier_Trace.Event =>
+        Swing_Thread.later { handle_update(do_update) }
     }
-  }
 
   override def init()
   {
     Swing_Thread.require {}
 
-    PIDE.session.global_options += main_actor
-    PIDE.session.commands_changed += main_actor
-    PIDE.session.caret_focus += main_actor
-    PIDE.session.trace_events += main_actor
+    PIDE.session.global_options += main
+    PIDE.session.commands_changed += main
+    PIDE.session.caret_focus += main
+    PIDE.session.trace_events += main
     handle_update(true)
   }
 
@@ -160,10 +158,10 @@
   {
     Swing_Thread.require {}
 
-    PIDE.session.global_options -= main_actor
-    PIDE.session.commands_changed -= main_actor
-    PIDE.session.caret_focus -= main_actor
-    PIDE.session.trace_events -= main_actor
+    PIDE.session.global_options -= main
+    PIDE.session.commands_changed -= main
+    PIDE.session.caret_focus -= main
+    PIDE.session.trace_events -= main
     delay_resize.revoke()
   }
 
--- a/src/Tools/jEdit/src/sledgehammer_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/sledgehammer_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,8 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
-
 import scala.swing.{Button, Component, Label, TextField, CheckBox}
 import scala.swing.event.ButtonClicked
 
@@ -135,23 +133,16 @@
   override def focusOnDefaultComponent { provers.requestFocus }
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case _: Session.Global_Options =>
-          Swing_Thread.later { update_provers(); handle_resize() }
-
-        case bad =>
-          System.err.println("Sledgehammer_Dockable: ignoring bad message " + bad)
-      }
+  private val main =
+    Session.Consumer[Session.Global_Options](getClass.getName) {
+      case _: Session.Global_Options => Swing_Thread.later { update_provers(); handle_resize() }
     }
-  }
 
   override def init()
   {
-    PIDE.session.global_options += main_actor
+    PIDE.session.global_options += main
     update_provers()
     handle_resize()
     sledgehammer.activate()
@@ -160,7 +151,7 @@
   override def exit()
   {
     sledgehammer.deactivate()
-    PIDE.session.global_options -= main_actor
+    PIDE.session.global_options -= main
     delay_resize.revoke()
   }
 }
--- a/src/Tools/jEdit/src/syslog_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/syslog_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,7 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
 import scala.swing.TextArea
 
 import org.gjt.sp.jedit.View
@@ -32,27 +31,22 @@
   set_content(syslog)
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case output: Prover.Output =>
-          if (output.is_syslog) Swing_Thread.later { update_syslog() }
-
-        case bad => System.err.println("Syslog_Dockable: ignoring bad message " + bad)
-      }
+  private val main =
+    Session.Consumer[Prover.Output](getClass.getName) {
+      case output =>
+        if (output.is_syslog) Swing_Thread.later { update_syslog() }
     }
-  }
 
   override def init()
   {
-    PIDE.session.syslog_messages += main_actor
+    PIDE.session.syslog_messages += main
     update_syslog()
   }
 
   override def exit()
   {
-    PIDE.session.syslog_messages -= main_actor
+    PIDE.session.syslog_messages -= main
   }
 }
--- a/src/Tools/jEdit/src/theories_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/theories_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,7 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
 import scala.swing.{Button, TextArea, Label, ListView, Alignment,
   ScrollPane, Component, CheckBox, BorderPanel}
 import scala.swing.event.{MouseClicked, MouseMoved}
@@ -216,35 +215,30 @@
   }
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case phase: Session.Phase =>
-          Swing_Thread.later { handle_phase(phase) }
+  private val main =
+    Session.Consumer[Any](getClass.getName) {
+      case phase: Session.Phase =>
+        Swing_Thread.later { handle_phase(phase) }
 
-        case _: Session.Global_Options =>
-          Swing_Thread.later {
-            continuous_checking.load()
-            logic.load ()
-            update_nodes_required()
-            status.repaint()
-          }
+      case _: Session.Global_Options =>
+        Swing_Thread.later {
+          continuous_checking.load()
+          logic.load ()
+          update_nodes_required()
+          status.repaint()
+        }
 
-        case changed: Session.Commands_Changed =>
-          Swing_Thread.later { handle_update(Some(changed.nodes)) }
-
-        case bad => System.err.println("Theories_Dockable: ignoring bad message " + bad)
-      }
+      case changed: Session.Commands_Changed =>
+        Swing_Thread.later { handle_update(Some(changed.nodes)) }
     }
-  }
 
   override def init()
   {
-    PIDE.session.phase_changed += main_actor
-    PIDE.session.global_options += main_actor
-    PIDE.session.commands_changed += main_actor
+    PIDE.session.phase_changed += main
+    PIDE.session.global_options += main
+    PIDE.session.commands_changed += main
 
     handle_phase(PIDE.session.phase)
     handle_update()
@@ -252,8 +246,8 @@
 
   override def exit()
   {
-    PIDE.session.phase_changed -= main_actor
-    PIDE.session.global_options -= main_actor
-    PIDE.session.commands_changed -= main_actor
+    PIDE.session.phase_changed -= main
+    PIDE.session.global_options -= main
+    PIDE.session.commands_changed -= main
   }
 }
--- a/src/Tools/jEdit/src/timing_dockable.scala	Fri Apr 25 12:09:15 2014 +0200
+++ b/src/Tools/jEdit/src/timing_dockable.scala	Fri Apr 25 14:39:11 2014 +0200
@@ -9,7 +9,6 @@
 
 import isabelle._
 
-import scala.actors.Actor._
 import scala.swing.{Label, ListView, Alignment, ScrollPane, Component, TextField}
 import scala.swing.event.{MouseClicked, ValueChanged}
 
@@ -200,27 +199,22 @@
   }
 
 
-  /* main actor */
+  /* main */
 
-  private val main_actor = actor {
-    loop {
-      react {
-        case changed: Session.Commands_Changed =>
-          Swing_Thread.later { handle_update(Some(changed.nodes)) }
-
-        case bad => System.err.println("Timing_Dockable: ignoring bad message " + bad)
-      }
+  private val main =
+    Session.Consumer[Session.Commands_Changed](getClass.getName) {
+      case changed =>
+        Swing_Thread.later { handle_update(Some(changed.nodes)) }
     }
-  }
 
   override def init()
   {
-    PIDE.session.commands_changed += main_actor
+    PIDE.session.commands_changed += main
     handle_update()
   }
 
   override def exit()
   {
-    PIDE.session.commands_changed -= main_actor
+    PIDE.session.commands_changed -= main
   }
 }