--- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 15:02:13 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 15:19:11 2014 +0200
@@ -25,17 +25,28 @@
private var active = true
private val mbox = Mailbox[Option[A]]
+ private def failure(exn: Throwable): Unit =
+ System.err.println("Consumer thread failure:\n" + Exn.message(exn))
+
+ private def robust_finish(): Unit =
+ try { finish() } catch { case exn: Throwable => failure(exn) }
+
@tailrec private def loop(): Unit =
mbox.receive match {
case Some(x) =>
- val continue = consume(x)
- if (continue) loop() else finish()
- case None => finish()
+ val continue =
+ try { consume(x) }
+ catch { case exn: Throwable => failure(exn); true }
+ if (continue) loop() else robust_finish()
+ case None => robust_finish()
}
private val thread = Simple_Thread.fork(name, daemon) { loop() }
private def is_active: Boolean = active && thread.isAlive
+
+ /* main methods */
+
def send(x: A): Unit = synchronized {
if (is_active) mbox.send(Some(x))
else error("Consumer thread not active")