support for requests with explicit acknowledgment (and exception propagation);
authorwenzelm
Thu, 24 Apr 2014 16:19:11 +0200
changeset 56702 f96ad2b19c38
parent 56701 ac5b66fa2a56
child 56703 2d0ca179e749
support for requests with explicit acknowledgment (and exception propagation);
src/Pure/Concurrent/consumer_thread.scala
--- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 15:19:11 2014 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 16:19:11 2014 +0200
@@ -2,7 +2,8 @@
     Module:     PIDE
     Author:     Makarius
 
-Consumer thread with unbounded queueing of requests.
+Consumer thread with unbounded queueing of requests, and optional
+acknowledgment.
 */
 
 package isabelle
@@ -17,13 +18,19 @@
       consume: A => Boolean,
       finish: () => Unit = () => ()): Consumer_Thread[A] =
     new Consumer_Thread[A](name, daemon, consume, finish)
+
+
+  /* internal messages */
+
+  private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
+  private type Request[A] = (A, Option[Ack])
 }
 
 final class Consumer_Thread[A] private(
   name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
 {
   private var active = true
-  private val mbox = Mailbox[Option[A]]
+  private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
 
   private def failure(exn: Throwable): Unit =
     System.err.println("Consumer thread failure:\n" + Exn.message(exn))
@@ -33,10 +40,16 @@
 
   @tailrec private def loop(): Unit =
     mbox.receive match {
-      case Some(x) =>
+      case Some((arg, ack)) =>
+        val result = Exn.capture { consume(arg) }
         val continue =
-          try { consume(x) }
-          catch { case exn: Throwable => failure(exn); true }
+          result match {
+            case Exn.Res(cont) => cont
+            case Exn.Exn(exn) =>
+              if (!ack.isDefined) failure(exn)
+              true
+          }
+        ack.foreach(a => a.change(_ => Some(result)))
         if (continue) loop() else robust_finish()
       case None => robust_finish()
     }
@@ -47,11 +60,20 @@
 
   /* main methods */
 
-  def send(x: A): Unit = synchronized {
-    if (is_active) mbox.send(Some(x))
+  private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized {
+    if (is_active) mbox.send(Some((x, ack)))
     else error("Consumer thread not active")
   }
 
+  def send(arg: A) { request(arg, None) }
+
+  def send_wait(arg: A) {
+    val ack: Consumer_Thread.Ack = Synchronized(None)
+    request(arg, Some(ack))
+    val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) })
+    Exn.release(result)
+  }
+
   def shutdown(): Unit =
   {
     synchronized { if (is_active) { active = false; mbox.send(None) } }