more robust thread: continue after failure;
authorwenzelm
Thu Apr 24 15:19:11 2014 +0200 (2014-04-24)
changeset 56701ac5b66fa2a56
parent 56700 c84bf6f63dfe
child 56702 f96ad2b19c38
more robust thread: continue after failure;
src/Pure/Concurrent/consumer_thread.scala
     1.1 --- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 15:02:13 2014 +0200
     1.2 +++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 15:19:11 2014 +0200
     1.3 @@ -25,17 +25,28 @@
     1.4    private var active = true
     1.5    private val mbox = Mailbox[Option[A]]
     1.6  
     1.7 +  private def failure(exn: Throwable): Unit =
     1.8 +    System.err.println("Consumer thread failure:\n" + Exn.message(exn))
     1.9 +
    1.10 +  private def robust_finish(): Unit =
    1.11 +    try { finish() } catch { case exn: Throwable => failure(exn) }
    1.12 +
    1.13    @tailrec private def loop(): Unit =
    1.14      mbox.receive match {
    1.15        case Some(x) =>
    1.16 -        val continue = consume(x)
    1.17 -        if (continue) loop() else finish()
    1.18 -      case None => finish()
    1.19 +        val continue =
    1.20 +          try { consume(x) }
    1.21 +          catch { case exn: Throwable => failure(exn); true }
    1.22 +        if (continue) loop() else robust_finish()
    1.23 +      case None => robust_finish()
    1.24      }
    1.25    private val thread = Simple_Thread.fork(name, daemon) { loop() }
    1.26  
    1.27    private def is_active: Boolean = active && thread.isAlive
    1.28  
    1.29 +
    1.30 +  /* main methods */
    1.31 +
    1.32    def send(x: A): Unit = synchronized {
    1.33      if (is_active) mbox.send(Some(x))
    1.34      else error("Consumer thread not active")