# HG changeset patch # User wenzelm # Date 1398343901 -7200 # Node ID e0655270d3f35758f3a378add3958690f6df32c0 # Parent 76b38be47febf914ee0905f99bbbd4f59b844a7a allow more control of main loop; more robust is_active test, although thread could terminate at any time; diff -r 76b38be47feb -r e0655270d3f3 src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:54:45 2014 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 14:51:41 2014 +0200 @@ -13,31 +13,37 @@ object Consumer_Thread { - def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] = - new Consumer_Thread[A](name, daemon, consume) + def fork[A](name: String = "", daemon: Boolean = false)( + consume: A => Boolean, + finish: () => Unit = () => ()): Consumer_Thread[A] = + new Consumer_Thread[A](name, daemon, consume, finish) } -final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit) +final class Consumer_Thread[A] private( + name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) { - private var ready = true + private var active = true private val mbox = Mailbox[Option[A]] @tailrec private def loop(): Unit = mbox.receive match { - case Some(x) => consume(x); loop() - case None => + case Some(x) => + val continue = consume(x) + if (continue) loop() else finish() + case None => finish() } private val thread = Simple_Thread.fork(name, daemon) { loop() } + private def is_active: Boolean = active && thread.isAlive + def send(x: A): Unit = synchronized { - if (ready) mbox.send(Some(x)) - else error("Consumer thread not ready (after shutdown)") + if (is_active) mbox.send(Some(x)) + else error("Consumer thread not active") } def shutdown(): Unit = { - synchronized { if (ready) { ready = false; mbox.send(None) } } - mbox.await_empty + synchronized { if (is_active) { active = false; mbox.send(None) } } thread.join } }