--- a/src/Pure/Concurrent/consumer_thread.scala Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Fri Jun 27 22:08:55 2014 +0200
@@ -30,9 +30,9 @@
name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
{
private var active = true
- private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
+ private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]]
- private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
+ private val thread = Simple_Thread.fork(name, daemon) { main_loop(Nil) }
def is_active: Boolean = active && thread.isAlive
private def failure(exn: Throwable): Unit =
@@ -42,9 +42,10 @@
private def robust_finish(): Unit =
try { finish() } catch { case exn: Throwable => failure(exn) }
- @tailrec private def main_loop(): Unit =
- mbox.receive match {
- case Some((arg, ack)) =>
+ @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit =
+ msgs match {
+ case Nil => main_loop(mailbox.receive(None))
+ case Some((arg, ack)) :: rest =>
val result = Exn.capture { consume(arg) }
val continue =
result match {
@@ -54,8 +55,8 @@
true
}
ack.foreach(a => a.change(_ => Some(result)))
- if (continue) main_loop() else robust_finish()
- case None => robust_finish()
+ if (continue) main_loop(rest) else robust_finish()
+ case None :: _ => robust_finish()
}
assert(is_active)
@@ -66,7 +67,7 @@
private def request(x: A, ack: Option[Consumer_Thread.Ack])
{
synchronized {
- if (is_active) mbox.send(Some((x, ack)))
+ if (is_active) mailbox.send(Some((x, ack)))
else error("Consumer thread not active: " + quote(thread.getName))
}
ack.foreach(a =>
@@ -78,7 +79,7 @@
def shutdown(): Unit =
{
- synchronized { if (is_active) { active = false; mbox.send(None) } }
+ synchronized { if (is_active) { active = false; mailbox.send(None) } }
thread.join
}
}