--- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:13:48 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:40:29 2014 +0200
@@ -11,19 +11,33 @@
import scala.annotation.tailrec
-class Consumer_Thread[A](name: String = "", daemon: Boolean = false)
+object Consumer_Thread
{
- def consume(x: A) { }
- def finish() { }
+ def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] =
+ new Consumer_Thread[A](name, daemon, consume)
+}
+final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit)
+{
+ private var ready = true
private val mbox = Mailbox[Option[A]]
+
@tailrec private def loop(): Unit =
mbox.receive match {
case Some(x) => consume(x); loop()
- case None => finish()
+ case None =>
}
private val thread = Simple_Thread.fork(name, daemon) { loop() }
- final def send(x: A) { mbox.send(Some(x)) }
- final def shutdown() { mbox.send(None); mbox.await_empty; thread.join }
+ def send(x: A): Unit = synchronized {
+ if (ready) mbox.send(Some(x))
+ else error("Consumer thread not ready (after shutdown)")
+ }
+
+ def shutdown(): Unit =
+ {
+ synchronized { if (ready) { ready = false; mbox.send(None) } }
+ mbox.await_empty
+ thread.join
+ }
}