--- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 15:19:11 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 16:19:11 2014 +0200
@@ -2,7 +2,8 @@
Module: PIDE
Author: Makarius
-Consumer thread with unbounded queueing of requests.
+Consumer thread with unbounded queueing of requests, and optional
+acknowledgment.
*/
package isabelle
@@ -17,13 +18,19 @@
consume: A => Boolean,
finish: () => Unit = () => ()): Consumer_Thread[A] =
new Consumer_Thread[A](name, daemon, consume, finish)
+
+
+ /* internal messages */
+
+ private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
+ private type Request[A] = (A, Option[Ack])
}
final class Consumer_Thread[A] private(
name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
{
private var active = true
- private val mbox = Mailbox[Option[A]]
+ private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
private def failure(exn: Throwable): Unit =
System.err.println("Consumer thread failure:\n" + Exn.message(exn))
@@ -33,10 +40,16 @@
@tailrec private def loop(): Unit =
mbox.receive match {
- case Some(x) =>
+ case Some((arg, ack)) =>
+ val result = Exn.capture { consume(arg) }
val continue =
- try { consume(x) }
- catch { case exn: Throwable => failure(exn); true }
+ result match {
+ case Exn.Res(cont) => cont
+ case Exn.Exn(exn) =>
+ if (!ack.isDefined) failure(exn)
+ true
+ }
+ ack.foreach(a => a.change(_ => Some(result)))
if (continue) loop() else robust_finish()
case None => robust_finish()
}
@@ -47,11 +60,20 @@
/* main methods */
- def send(x: A): Unit = synchronized {
- if (is_active) mbox.send(Some(x))
+ private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized {
+ if (is_active) mbox.send(Some((x, ack)))
else error("Consumer thread not active")
}
+ def send(arg: A) { request(arg, None) }
+
+ def send_wait(arg: A) {
+ val ack: Consumer_Thread.Ack = Synchronized(None)
+ request(arg, Some(ack))
+ val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) })
+ Exn.release(result)
+ }
+
def shutdown(): Unit =
{
synchronized { if (is_active) { active = false; mbox.send(None) } }