# HG changeset patch # User wenzelm # Date 1398372063 -7200 # Node ID d39148de6eee99ee255491fa83d7707d845e16f3 # Parent aa4631879df8265ed3a898ab192a92f05d87c83e misc tuning; diff -r aa4631879df8 -r d39148de6eee src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 22:20:36 2014 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 22:41:03 2014 +0200 @@ -32,13 +32,17 @@ private var active = true private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]] + private val thread = Simple_Thread.fork(name, daemon) { main_loop() } + private def is_active: Boolean = active && thread.isAlive + private def failure(exn: Throwable): Unit = - System.err.println("Consumer thread failure:\n" + Exn.message(exn)) + System.err.println( + "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn)) private def robust_finish(): Unit = try { finish() } catch { case exn: Throwable => failure(exn) } - @tailrec private def loop(): Unit = + @tailrec private def main_loop(): Unit = mbox.receive match { case Some((arg, ack)) => val result = Exn.capture { consume(arg) } @@ -50,29 +54,27 @@ true } ack.foreach(a => a.change(_ => Some(result))) - if (continue) loop() else robust_finish() + if (continue) main_loop() else robust_finish() case None => robust_finish() } - private val thread = Simple_Thread.fork(name, daemon) { loop() } - private def is_active: Boolean = active && thread.isAlive + assert(is_active) /* main methods */ - private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized { - if (is_active) mbox.send(Some((x, ack))) - else error("Consumer thread not active") + private def request(x: A, ack: Option[Consumer_Thread.Ack]) + { + synchronized { + if (is_active) mbox.send(Some((x, ack))) + else error("Consumer thread not active: " + quote(thread.getName)) + } + ack.foreach(a => + Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))) } def send(arg: A) { request(arg, None) } - - def send_wait(arg: A) { - val ack: Consumer_Thread.Ack = Synchronized(None) - request(arg, Some(ack)) - val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) }) - Exn.release(result) - } + def send_wait(arg: A) { request(arg, Some(Synchronized(None))) } def shutdown(): Unit = {