# HG changeset patch # User wenzelm # Date 1574248914 -3600 # Node ID d6688677a7846f8343030f7991d5b3e07480f1a9 # Parent b1c555d3cd717a5f2c1c4c4bc69eaf170235efbc clarified signature -- more explicit types; diff -r b1c555d3cd71 -r d6688677a784 src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Tue Nov 19 15:45:41 2019 +0100 +++ b/src/Pure/Concurrent/consumer_thread.scala Wed Nov 20 12:21:54 2019 +0100 @@ -17,19 +17,15 @@ consume: A => Boolean, finish: () => Unit = () => ()): Consumer_Thread[A] = new Consumer_Thread[A](name, daemon, consume, finish) - - - /* internal messages */ - - private type Ack = Synchronized[Option[Exn.Result[Boolean]]] - private type Request[A] = (A, Option[Ack]) } final class Consumer_Thread[A] private( name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) { + /* thread */ + private var active = true - private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]] + private val mailbox = Mailbox[Option[Request]] private val thread = Standard_Thread.fork(name, daemon) { main_loop(Nil) } def is_active: Boolean = active && thread.isAlive @@ -42,42 +38,63 @@ private def robust_finish(): Unit = try { finish() } catch { case exn: Throwable => failure(exn) } - @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit = + + /* requests */ + + private class Request(arg: A, acknowledge: Boolean = false) + { + private val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] = + if (acknowledge) Some(Synchronized(None)) else None + + def apply: Boolean = + { + val result = Exn.capture { consume(arg) } + (ack, result) match { + case ((Some(a), _)) => a.change(_ => Some(result.map(_ => ()))) + case ((None, Exn.Res(_))) => + case ((None, Exn.Exn(exn))) => failure(exn) + } + result match { + case Exn.Res(continue) => continue + case Exn.Exn(_) => true + } + } + + def await + { + for (a <- ack) { + Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })) + } + } + } + + private def request(req: Request) + { + synchronized { + if (is_active) mailbox.send(Some(req)) + else error("Consumer thread not active: " + quote(thread.getName)) + } + req.await + } + + @tailrec private def main_loop(msgs: List[Option[Request]]): Unit = msgs match { case Nil => main_loop(mailbox.receive(None)) - case Some((arg, ack)) :: rest => - val result = Exn.capture { consume(arg) } - val continue = - result match { - case Exn.Res(cont) => cont - case Exn.Exn(exn) => - if (!ack.isDefined) failure(exn) - true - } - ack.foreach(a => a.change(_ => Some(result))) + case Some(req) :: rest => + val continue = req.apply if (continue) main_loop(rest) else robust_finish() case None :: _ => robust_finish() } - assert(is_active) - /* main methods */ - private def request(x: A, ack: Option[Consumer_Thread.Ack]) - { - synchronized { - if (is_active) mailbox.send(Some((x, ack))) - else error("Consumer thread not active: " + quote(thread.getName)) - } - ack.foreach(a => - Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))) - } + assert(is_active) - def send(arg: A) { request(arg, None) } - def send_wait(arg: A) { request(arg, Some(Synchronized(None))) } + def send(arg: A) { request(new Request(arg)) } + def send_wait(arg: A) { request(new Request(arg, acknowledge = true)) } - def shutdown(): Unit = + def shutdown() { synchronized { if (is_active) { active = false; mailbox.send(None) } } thread.join diff -r b1c555d3cd71 -r d6688677a784 src/Pure/General/exn.scala --- a/src/Pure/General/exn.scala Tue Nov 19 15:45:41 2019 +0100 +++ b/src/Pure/General/exn.scala Wed Nov 20 12:21:54 2019 +0100 @@ -47,6 +47,12 @@ case Exn(ERROR(msg)) => Exn(ERROR(msg)) case _ => this } + + def map[B](f: A => B): Result[B] = + this match { + case Res(res) => Res(f(res)) + case Exn(exn) => Exn(exn) + } } case class Res[A](res: A) extends Result[A] case class Exn[A](exn: Throwable) extends Result[A]