support for requests with explicit acknowledgment (and exception propagation);
authorwenzelm
Thu Apr 24 16:19:11 2014 +0200 (2014-04-24)
changeset 56702f96ad2b19c38
parent 56701 ac5b66fa2a56
child 56703 2d0ca179e749
support for requests with explicit acknowledgment (and exception propagation);
src/Pure/Concurrent/consumer_thread.scala
     1.1 --- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 15:19:11 2014 +0200
     1.2 +++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 16:19:11 2014 +0200
     1.3 @@ -2,7 +2,8 @@
     1.4      Module:     PIDE
     1.5      Author:     Makarius
     1.6  
     1.7 -Consumer thread with unbounded queueing of requests.
     1.8 +Consumer thread with unbounded queueing of requests, and optional
     1.9 +acknowledgment.
    1.10  */
    1.11  
    1.12  package isabelle
    1.13 @@ -17,13 +18,19 @@
    1.14        consume: A => Boolean,
    1.15        finish: () => Unit = () => ()): Consumer_Thread[A] =
    1.16      new Consumer_Thread[A](name, daemon, consume, finish)
    1.17 +
    1.18 +
    1.19 +  /* internal messages */
    1.20 +
    1.21 +  private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
    1.22 +  private type Request[A] = (A, Option[Ack])
    1.23  }
    1.24  
    1.25  final class Consumer_Thread[A] private(
    1.26    name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
    1.27  {
    1.28    private var active = true
    1.29 -  private val mbox = Mailbox[Option[A]]
    1.30 +  private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]]
    1.31  
    1.32    private def failure(exn: Throwable): Unit =
    1.33      System.err.println("Consumer thread failure:\n" + Exn.message(exn))
    1.34 @@ -33,10 +40,16 @@
    1.35  
    1.36    @tailrec private def loop(): Unit =
    1.37      mbox.receive match {
    1.38 -      case Some(x) =>
    1.39 +      case Some((arg, ack)) =>
    1.40 +        val result = Exn.capture { consume(arg) }
    1.41          val continue =
    1.42 -          try { consume(x) }
    1.43 -          catch { case exn: Throwable => failure(exn); true }
    1.44 +          result match {
    1.45 +            case Exn.Res(cont) => cont
    1.46 +            case Exn.Exn(exn) =>
    1.47 +              if (!ack.isDefined) failure(exn)
    1.48 +              true
    1.49 +          }
    1.50 +        ack.foreach(a => a.change(_ => Some(result)))
    1.51          if (continue) loop() else robust_finish()
    1.52        case None => robust_finish()
    1.53      }
    1.54 @@ -47,11 +60,20 @@
    1.55  
    1.56    /* main methods */
    1.57  
    1.58 -  def send(x: A): Unit = synchronized {
    1.59 -    if (is_active) mbox.send(Some(x))
    1.60 +  private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized {
    1.61 +    if (is_active) mbox.send(Some((x, ack)))
    1.62      else error("Consumer thread not active")
    1.63    }
    1.64  
    1.65 +  def send(arg: A) { request(arg, None) }
    1.66 +
    1.67 +  def send_wait(arg: A) {
    1.68 +    val ack: Consumer_Thread.Ack = Synchronized(None)
    1.69 +    request(arg, Some(ack))
    1.70 +    val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) })
    1.71 +    Exn.release(result)
    1.72 +  }
    1.73 +
    1.74    def shutdown(): Unit =
    1.75    {
    1.76      synchronized { if (is_active) { active = false; mbox.send(None) } }