src/Pure/Concurrent/consumer_thread.scala
changeset 56696 ff782c5450bf
parent 56695 963732291084
child 56698 e0655270d3f3
--- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 13:13:48 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 13:40:29 2014 +0200
@@ -11,19 +11,33 @@
 import scala.annotation.tailrec
 
 
-class Consumer_Thread[A](name: String = "", daemon: Boolean = false)
+object Consumer_Thread
 {
-  def consume(x: A) { }
-  def finish() { }
+  def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] =
+    new Consumer_Thread[A](name, daemon, consume)
+}
 
+final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit)
+{
+  private var ready = true
   private val mbox = Mailbox[Option[A]]
+
   @tailrec private def loop(): Unit =
     mbox.receive match {
       case Some(x) => consume(x); loop()
-      case None => finish()
+      case None =>
     }
   private val thread = Simple_Thread.fork(name, daemon) { loop() }
 
-  final def send(x: A) { mbox.send(Some(x)) }
-  final def shutdown() { mbox.send(None); mbox.await_empty; thread.join }
+  def send(x: A): Unit = synchronized {
+    if (ready) mbox.send(Some(x))
+    else error("Consumer thread not ready (after shutdown)")
+  }
+
+  def shutdown(): Unit =
+  {
+    synchronized { if (ready) { ready = false; mbox.send(None) } }
+    mbox.await_empty
+    thread.join
+  }
 }