# HG changeset patch # User wenzelm # Date 1398345551 -7200 # Node ID ac5b66fa2a56321f73c60f168ddb9e538e635c30 # Parent c84bf6f63dfe0186f0221cab954813b4c1c36d0c more robust thread: continue after failure; diff -r c84bf6f63dfe -r ac5b66fa2a56 src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 15:02:13 2014 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 15:19:11 2014 +0200 @@ -25,17 +25,28 @@ private var active = true private val mbox = Mailbox[Option[A]] + private def failure(exn: Throwable): Unit = + System.err.println("Consumer thread failure:\n" + Exn.message(exn)) + + private def robust_finish(): Unit = + try { finish() } catch { case exn: Throwable => failure(exn) } + @tailrec private def loop(): Unit = mbox.receive match { case Some(x) => - val continue = consume(x) - if (continue) loop() else finish() - case None => finish() + val continue = + try { consume(x) } + catch { case exn: Throwable => failure(exn); true } + if (continue) loop() else robust_finish() + case None => robust_finish() } private val thread = Simple_Thread.fork(name, daemon) { loop() } private def is_active: Boolean = active && thread.isAlive + + /* main methods */ + def send(x: A): Unit = synchronized { if (is_active) mbox.send(Some(x)) else error("Consumer thread not active")