src/Pure/Concurrent/consumer_thread.scala
changeset 71142 d6688677a784
parent 66094 24658c9d7c78
child 71143 5ea3ed3c52b3
--- a/src/Pure/Concurrent/consumer_thread.scala	Tue Nov 19 15:45:41 2019 +0100
+++ b/src/Pure/Concurrent/consumer_thread.scala	Wed Nov 20 12:21:54 2019 +0100
@@ -17,19 +17,15 @@
       consume: A => Boolean,
       finish: () => Unit = () => ()): Consumer_Thread[A] =
     new Consumer_Thread[A](name, daemon, consume, finish)
-
-
-  /* internal messages */
-
-  private type Ack = Synchronized[Option[Exn.Result[Boolean]]]
-  private type Request[A] = (A, Option[Ack])
 }
 
 final class Consumer_Thread[A] private(
   name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
 {
+  /* thread */
+
   private var active = true
-  private val mailbox = Mailbox[Option[Consumer_Thread.Request[A]]]
+  private val mailbox = Mailbox[Option[Request]]
 
   private val thread = Standard_Thread.fork(name, daemon) { main_loop(Nil) }
   def is_active: Boolean = active && thread.isAlive
@@ -42,42 +38,63 @@
   private def robust_finish(): Unit =
     try { finish() } catch { case exn: Throwable => failure(exn) }
 
-  @tailrec private def main_loop(msgs: List[Option[Consumer_Thread.Request[A]]]): Unit =
+
+  /* requests */
+
+  private class Request(arg: A, acknowledge: Boolean = false)
+  {
+    private val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
+      if (acknowledge) Some(Synchronized(None)) else None
+
+    def apply: Boolean =
+    {
+      val result = Exn.capture { consume(arg) }
+      (ack, result) match {
+        case ((Some(a), _)) => a.change(_ => Some(result.map(_ => ())))
+        case ((None, Exn.Res(_))) =>
+        case ((None, Exn.Exn(exn))) => failure(exn)
+      }
+      result match {
+        case Exn.Res(continue) => continue
+        case Exn.Exn(_) => true
+      }
+    }
+
+    def await
+    {
+      for (a <- ack) {
+        Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
+      }
+    }
+  }
+
+  private def request(req: Request)
+  {
+    synchronized {
+      if (is_active) mailbox.send(Some(req))
+      else error("Consumer thread not active: " + quote(thread.getName))
+    }
+    req.await
+  }
+
+  @tailrec private def main_loop(msgs: List[Option[Request]]): Unit =
     msgs match {
       case Nil => main_loop(mailbox.receive(None))
-      case Some((arg, ack)) :: rest =>
-        val result = Exn.capture { consume(arg) }
-        val continue =
-          result match {
-            case Exn.Res(cont) => cont
-            case Exn.Exn(exn) =>
-              if (!ack.isDefined) failure(exn)
-              true
-          }
-        ack.foreach(a => a.change(_ => Some(result)))
+      case Some(req) :: rest =>
+        val continue = req.apply
         if (continue) main_loop(rest) else robust_finish()
       case None :: _ => robust_finish()
     }
 
-  assert(is_active)
-
 
   /* main methods */
 
-  private def request(x: A, ack: Option[Consumer_Thread.Ack])
-  {
-    synchronized {
-      if (is_active) mailbox.send(Some((x, ack)))
-      else error("Consumer thread not active: " + quote(thread.getName))
-    }
-    ack.foreach(a =>
-      Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) })))
-  }
+  assert(is_active)
 
-  def send(arg: A) { request(arg, None) }
-  def send_wait(arg: A) { request(arg, Some(Synchronized(None))) }
+  def send(arg: A) { request(new Request(arg)) }
+  def send_wait(arg: A) { request(new Request(arg, acknowledge = true)) }
 
-  def shutdown(): Unit =
+  def shutdown()
   {
     synchronized { if (is_active) { active = false; mailbox.send(None) } }
     thread.join