src/Pure/Concurrent/consumer_thread.scala
changeset 56698 e0655270d3f3
parent 56696 ff782c5450bf
child 56701 ac5b66fa2a56
--- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 13:54:45 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 14:51:41 2014 +0200
@@ -13,31 +13,37 @@
 
 object Consumer_Thread
 {
-  def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] =
-    new Consumer_Thread[A](name, daemon, consume)
+  def fork[A](name: String = "", daemon: Boolean = false)(
+      consume: A => Boolean,
+      finish: () => Unit = () => ()): Consumer_Thread[A] =
+    new Consumer_Thread[A](name, daemon, consume, finish)
 }
 
-final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit)
+final class Consumer_Thread[A] private(
+  name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
 {
-  private var ready = true
+  private var active = true
   private val mbox = Mailbox[Option[A]]
 
   @tailrec private def loop(): Unit =
     mbox.receive match {
-      case Some(x) => consume(x); loop()
-      case None =>
+      case Some(x) =>
+        val continue = consume(x)
+        if (continue) loop() else finish()
+      case None => finish()
     }
   private val thread = Simple_Thread.fork(name, daemon) { loop() }
 
+  private def is_active: Boolean = active && thread.isAlive
+
   def send(x: A): Unit = synchronized {
-    if (ready) mbox.send(Some(x))
-    else error("Consumer thread not ready (after shutdown)")
+    if (is_active) mbox.send(Some(x))
+    else error("Consumer thread not active")
   }
 
   def shutdown(): Unit =
   {
-    synchronized { if (ready) { ready = false; mbox.send(None) } }
-    mbox.await_empty
+    synchronized { if (is_active) { active = false; mbox.send(None) } }
     thread.join
   }
 }