# HG changeset patch # User wenzelm # Date 1398349151 -7200 # Node ID f96ad2b19c388013887fd59ec921743ab31af07b # Parent ac5b66fa2a56321f73c60f168ddb9e538e635c30 support for requests with explicit acknowledgment (and exception propagation); diff -r ac5b66fa2a56 -r f96ad2b19c38 src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 15:19:11 2014 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 16:19:11 2014 +0200 @@ -2,7 +2,8 @@ Module: PIDE Author: Makarius -Consumer thread with unbounded queueing of requests. +Consumer thread with unbounded queueing of requests, and optional +acknowledgment. */ package isabelle @@ -17,13 +18,19 @@ consume: A => Boolean, finish: () => Unit = () => ()): Consumer_Thread[A] = new Consumer_Thread[A](name, daemon, consume, finish) + + + /* internal messages */ + + private type Ack = Synchronized[Option[Exn.Result[Boolean]]] + private type Request[A] = (A, Option[Ack]) } final class Consumer_Thread[A] private( name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) { private var active = true - private val mbox = Mailbox[Option[A]] + private val mbox = Mailbox[Option[Consumer_Thread.Request[A]]] private def failure(exn: Throwable): Unit = System.err.println("Consumer thread failure:\n" + Exn.message(exn)) @@ -33,10 +40,16 @@ @tailrec private def loop(): Unit = mbox.receive match { - case Some(x) => + case Some((arg, ack)) => + val result = Exn.capture { consume(arg) } val continue = - try { consume(x) } - catch { case exn: Throwable => failure(exn); true } + result match { + case Exn.Res(cont) => cont + case Exn.Exn(exn) => + if (!ack.isDefined) failure(exn) + true + } + ack.foreach(a => a.change(_ => Some(result))) if (continue) loop() else robust_finish() case None => robust_finish() } @@ -47,11 +60,20 @@ /* main methods */ - def send(x: A): Unit = synchronized { - if (is_active) mbox.send(Some(x)) + private def request(x: A, ack: Option[Consumer_Thread.Ack]): Unit = synchronized { + if (is_active) mbox.send(Some((x, ack))) else error("Consumer thread not active") } + def send(arg: A) { request(arg, None) } + + def send_wait(arg: A) { + val ack: Consumer_Thread.Ack = Synchronized(None) + request(arg, Some(ack)) + val result = ack.guarded_access({ case None => None case res => Some((res.get, res)) }) + Exn.release(result) + } + def shutdown(): Unit = { synchronized { if (is_active) { active = false; mbox.send(None) } }