allow more control of main loop;
more robust is_active test, although thread could terminate at any time;
--- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:54:45 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 14:51:41 2014 +0200
@@ -13,31 +13,37 @@
object Consumer_Thread
{
- def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] =
- new Consumer_Thread[A](name, daemon, consume)
+ def fork[A](name: String = "", daemon: Boolean = false)(
+ consume: A => Boolean,
+ finish: () => Unit = () => ()): Consumer_Thread[A] =
+ new Consumer_Thread[A](name, daemon, consume, finish)
}
-final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit)
+final class Consumer_Thread[A] private(
+ name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
{
- private var ready = true
+ private var active = true
private val mbox = Mailbox[Option[A]]
@tailrec private def loop(): Unit =
mbox.receive match {
- case Some(x) => consume(x); loop()
- case None =>
+ case Some(x) =>
+ val continue = consume(x)
+ if (continue) loop() else finish()
+ case None => finish()
}
private val thread = Simple_Thread.fork(name, daemon) { loop() }
+ private def is_active: Boolean = active && thread.isAlive
+
def send(x: A): Unit = synchronized {
- if (ready) mbox.send(Some(x))
- else error("Consumer thread not ready (after shutdown)")
+ if (is_active) mbox.send(Some(x))
+ else error("Consumer thread not active")
}
def shutdown(): Unit =
{
- synchronized { if (ready) { ready = false; mbox.send(None) } }
- mbox.await_empty
+ synchronized { if (is_active) { active = false; mbox.send(None) } }
thread.join
}
}