--- a/src/Pure/Concurrent/consumer_thread.scala Tue Nov 19 15:45:41 2019 +0100
+++ b/src/Pure/Concurrent/consumer_thread.scala Wed Nov 20 12:21:54 2019 +0100
@@ -17,19 +17,15 @@
consume: A => Boolean,
finish: () => Unit = () => ()): Consumer_Thread[A] =
new Consumer_Thread[A](name, daemon, consume, finish)
-
-
- /* internal messages */
-
- private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
- private type Request[A] = (A, Option[Ack])
}
final class Consumer_Thread[A] private(
name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
{
+ /* thread */
+
private var active = true
- private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]]
+ private val mailbox = Mailbox[Option[Request]]
private val thread = Standard_Thread.fork(name, daemon) { main_loop(Nil) }
def is_active: Boolean = active && thread.isAlive
@@ -42,42 +38,63 @@
private def robust_finish(): Unit =
try { finish() } catch { case exn: Throwable => failure(exn) }
- @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit =
+
+ /* requests */
+
+ private class Request(arg: A, acknowledge: Boolean = false)
+ {
+ private val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
+ if (acknowledge) Some(Synchronized(None)) else None
+
+ def apply: Boolean =
+ {
+ val result = Exn.capture { consume(arg) }
+ (ack, result) match {
+ case ((Some(a), _)) => a.change(_ => Some(result.map(_ => ())))
+ case ((None, Exn.Res(_))) =>
+ case ((None, Exn.Exn(exn))) => failure(exn)
+ }
+ result match {
+ case Exn.Res(continue) => continue
+ case Exn.Exn(_) => true
+ }
+ }
+
+ def await
+ {
+ for (a <- ack) {
+ Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
+ }
+ }
+ }
+
+ private def request(req: Request)
+ {
+ synchronized {
+ if (is_active) mailbox.send(Some(req))
+ else error("Consumer thread not active: " + quote(thread.getName))
+ }
+ req.await
+ }
+
+ @tailrec private def main_loop(msgs: List[Option[Request]]): Unit =
msgs match {
case Nil => main_loop(mailbox.receive(None))
- case Some((arg, ack)) :: rest =>
- val result = Exn.capture { consume(arg) }
- val continue =
- result match {
- case Exn.Res(cont) => cont
- case Exn.Exn(exn) =>
- if (!ack.isDefined) failure(exn)
- true
- }
- ack.foreach(a => a.change(_ => Some(result)))
+ case Some(req) :: rest =>
+ val continue = req.apply
if (continue) main_loop(rest) else robust_finish()
case None :: _ => robust_finish()
}
- assert(is_active)
-
/* main methods */
- private def request(x: A, ack: Option[Consumer_Thread.Ack])
- {
- synchronized {
- if (is_active) mailbox.send(Some((x, ack)))
- else error("Consumer thread not active: " + quote(thread.getName))
- }
- ack.foreach(a =>
- Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
- }
+ assert(is_active)
- def send(arg: A) { request(arg, None) }
- def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
+ def send(arg: A) { request(new Request(arg)) }
+ def send_wait(arg: A) { request(new Request(arg, acknowledge = true)) }
- def shutdown(): Unit =
+ def shutdown()
{
synchronized { if (is_active) { active = false; mailbox.send(None) } }
thread.join
--- a/src/Pure/General/exn.scala Tue Nov 19 15:45:41 2019 +0100
+++ b/src/Pure/General/exn.scala Wed Nov 20 12:21:54 2019 +0100
@@ -47,6 +47,12 @@
case Exn(ERROR(msg)) => Exn(ERROR(msg))
case _ => this
}
+
+ def map[B](f: A => B): Result[B] =
+ this match {
+ case Res(res) => Res(f(res))
+ case Exn(exn) => Exn(exn)
+ }
}
case class Res[A](res: A) extends Result[A]
case class Exn[A](exn: Throwable) extends Result[A]