src/Pure/Concurrent/consumer_thread.scala
changeset 57417 29fe9bac501b
parent 56782 433cf57550fa
child 61556 0d4ee4168e41
--- a/src/Pure/Concurrent/consumer_thread.scala	Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Fri Jun 27 22:08:55 2014 +0200
@@ -30,9 +30,9 @@
   name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
 {
   private var active = true
-  private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
+  private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]]
 
-  private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
+  private val thread = Simple_Thread.fork(name, daemon) { main_loop(Nil) }
   def is_active: Boolean = active && thread.isAlive
 
   private def failure(exn: Throwable): Unit =
@@ -42,9 +42,10 @@
   private def robust_finish(): Unit =
     try { finish() } catch { case exn: Throwable => failure(exn) }
 
-  @tailrec private def main_loop(): Unit =
-    mbox.receive match {
-      case Some((arg, ack)) =>
+  @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit =
+    msgs match {
+      case Nil => main_loop(mailbox.receive(None))
+      case Some((arg, ack)) :: rest =>
         val result = Exn.capture { consume(arg) }
         val continue =
           result match {
@@ -54,8 +55,8 @@
               true
           }
         ack.foreach(a => a.change(_ => Some(result)))
-        if (continue) main_loop() else robust_finish()
-      case None => robust_finish()
+        if (continue) main_loop(rest) else robust_finish()
+      case None :: _ => robust_finish()
     }
 
   assert(is_active)
@@ -66,7 +67,7 @@
   private def request(x: A, ack: Option[Consumer_Thread.Ack])
   {
     synchronized {
-      if (is_active) mbox.send(Some((x, ack)))
+      if (is_active) mailbox.send(Some((x, ack)))
       else error("Consumer thread not active: " + quote(thread.getName))
     }
     ack.foreach(a =>
@@ -78,7 +79,7 @@
 
   def shutdown(): Unit =
   {
-    synchronized { if (is_active) { active = false; mbox.send(None) } }
+    synchronized { if (is_active) { active = false; mailbox.send(None) } }
     thread.join
   }
 }