src/Pure/Concurrent/consumer_thread.scala
changeset 56701 ac5b66fa2a56
parent 56698 e0655270d3f3
child 56702 f96ad2b19c38
--- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 15:02:13 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 15:19:11 2014 +0200
@@ -25,17 +25,28 @@
   private var active = true
   private val mbox = Mailbox[Option[A]]
 
+  private def failure(exn: Throwable): Unit =
+    System.err.println("Consumer thread failure:\n" + Exn.message(exn))
+
+  private def robust_finish(): Unit =
+    try { finish() } catch { case exn: Throwable => failure(exn) }
+
   @tailrec private def loop(): Unit =
     mbox.receive match {
       case Some(x) =>
-        val continue = consume(x)
-        if (continue) loop() else finish()
-      case None => finish()
+        val continue =
+          try { consume(x) }
+          catch { case exn: Throwable => failure(exn); true }
+        if (continue) loop() else robust_finish()
+      case None => robust_finish()
     }
   private val thread = Simple_Thread.fork(name, daemon) { loop() }
 
   private def is_active: Boolean = active && thread.isAlive
 
+
+  /* main methods */
+
   def send(x: A): Unit = synchronized {
     if (is_active) mbox.send(Some(x))
     else error("Consumer thread not active")