# HG changeset patch # User wenzelm # Date 1398339629 -7200 # Node ID ff782c5450bfeefd9266295aa4d983ee6e17307e # Parent 96373229108449d1998b6db551662208d97e7776 more robust shutdown; less ooddities; diff -r 963732291084 -r ff782c5450bf src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:13:48 2014 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:40:29 2014 +0200 @@ -11,19 +11,33 @@ import scala.annotation.tailrec -class Consumer_Thread[A](name: String = "", daemon: Boolean = false) +object Consumer_Thread { - def consume(x: A) { } - def finish() { } + def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] = + new Consumer_Thread[A](name, daemon, consume) +} +final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit) +{ + private var ready = true private val mbox = Mailbox[Option[A]] + @tailrec private def loop(): Unit = mbox.receive match { case Some(x) => consume(x); loop() - case None => finish() + case None => } private val thread = Simple_Thread.fork(name, daemon) { loop() } - final def send(x: A) { mbox.send(Some(x)) } - final def shutdown() { mbox.send(None); mbox.await_empty; thread.join } + def send(x: A): Unit = synchronized { + if (ready) mbox.send(Some(x)) + else error("Consumer thread not ready (after shutdown)") + } + + def shutdown(): Unit = + { + synchronized { if (ready) { ready = false; mbox.send(None) } } + mbox.await_empty + thread.join + } }