--- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 22:20:36 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 22:41:03 2014 +0200
@@ -32,13 +32,17 @@
private var active = true
private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
+ private val thread = Simple_Thread.fork(name, daemon) { main_loop() }
+ private def is_active: Boolean = active && thread.isAlive
+
private def failure(exn: Throwable): Unit =
- System.err.println("Consumer thread failure:\n" + Exn.message(exn))
+ System.err.println(
+ "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
private def robust_finish(): Unit =
try { finish() } catch { case exn: Throwable => failure(exn) }
- @tailrec private def loop(): Unit =
+ @tailrec private def main_loop(): Unit =
mbox.receive match {
case Some((arg, ack)) =>
val result = Exn.capture { consume(arg) }
@@ -50,29 +54,27 @@
true
}
ack.foreach(a => a.change(_ => Some(result)))
- if (continue) loop() else robust_finish()
+ if (continue) main_loop() else robust_finish()
case None => robust_finish()
}
- private val thread = Simple_Thread.fork(name, daemon) { loop() }
- private def is_active: Boolean = active && thread.isAlive
+ assert(is_active)
/* main methods */
- private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized {
- if (is_active) mbox.send(Some((x, ack)))
- else error("Consumer thread not active")
+ private def request(x: A, ack: Option[Consumer_Thread.Ack])
+ {
+ synchronized {
+ if (is_active) mbox.send(Some((x, ack)))
+ else error("Consumer thread not active: " + quote(thread.getName))
+ }
+ ack.foreach(a =>
+ Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
}
def send(arg: A) { request(arg, None) }
-
- def send_wait(arg: A) {
- val ack: Consumer_Thread.Ack = Synchronized(None)
- request(arg, Some(ack))
- val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) })
- Exn.release(result)
- }
+ def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
def shutdown(): Unit =
{