diff -r 9188d901209d -r 29fe9bac501b src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Fri Jun 27 19:38:32 2014 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Fri Jun 27 22:08:55 2014 +0200 @@ -30,9 +30,9 @@ name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) { private var active = true - private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]] + private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]] - private val thread = Simple_Thread.fork(name, daemon) { main_loop() } + private val thread = Simple_Thread.fork(name, daemon) { main_loop(Nil) } def is_active: Boolean = active && thread.isAlive private def failure(exn: Throwable): Unit = @@ -42,9 +42,10 @@ private def robust_finish(): Unit = try { finish() } catch { case exn: Throwable => failure(exn) } - @tailrec private def main_loop(): Unit = - mbox.receive match { - case Some((arg, ack)) => + @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit = + msgs match { + case Nil => main_loop(mailbox.receive(None)) + case Some((arg, ack)) :: rest => val result = Exn.capture { consume(arg) } val continue = result match { @@ -54,8 +55,8 @@ true } ack.foreach(a => a.change(_ => Some(result))) - if (continue) main_loop() else robust_finish() - case None => robust_finish() + if (continue) main_loop(rest) else robust_finish() + case None :: _ => robust_finish() } assert(is_active) @@ -66,7 +67,7 @@ private def request(x: A, ack: Option[Consumer_Thread.Ack]) { synchronized { - if (is_active) mbox.send(Some((x, ack))) + if (is_active) mailbox.send(Some((x, ack))) else error("Consumer thread not active: " + quote(thread.getName)) } ack.foreach(a => @@ -78,7 +79,7 @@ def shutdown(): Unit = { - synchronized { if (is_active) { active = false; mbox.send(None) } } + synchronized { if (is_active) { active = false; mailbox.send(None) } } thread.join } }