clarified main_loop: support timeout, which results in consume(Nil);
authorwenzelm
Thu, 17 Aug 2023 16:15:25 +0200
changeset 78534 879e1ba3868b
parent 78533 dd0501accda8
child 78535 af37e1b4dce0
clarified main_loop: support timeout, which results in consume(Nil);
src/Pure/Concurrent/consumer_thread.scala
--- a/src/Pure/Concurrent/consumer_thread.scala	Thu Aug 17 15:12:18 2023 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Aug 17 16:15:25 2023 +0200
@@ -15,8 +15,9 @@
   def fork_bulk[A](name: String = "", daemon: Boolean = false)(
       bulk: A => Boolean,
       consume: List[A] => (List[Exn.Result[Unit]], Boolean),
+      timeout: Option[Time] = None,
       finish: () => Unit = () => ()): Consumer_Thread[A] =
-    new Consumer_Thread[A](name, daemon, bulk, consume, finish)
+    new Consumer_Thread[A](name, daemon, bulk, consume, timeout, finish)
 
   def fork[A](name: String = "", daemon: Boolean = false)(
       consume: A => Boolean,
@@ -38,6 +39,7 @@
   name: String, daemon: Boolean,
   bulk: A => Boolean,
   consume: List[A] => (List[Exn.Result[Unit]], Boolean),
+  timeout: Option[Time] = None,
   finish: () => Unit
 ) {
   /* thread */
@@ -78,30 +80,33 @@
     req.await()
   }
 
-  @tailrec private def main_loop(msgs: List[Option[Request]]): Unit =
-    msgs match {
-      case Nil => main_loop(mailbox.receive())
-      case None :: _ => robust_finish()
-      case _ =>
-        val reqs =
-          proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
-            .getOrElse(msgs.take(1))
-            .map(_.get)
-
-        val (results, cont) = consume(reqs.map(_.arg))
+  private def process(msgs: List[Option[Request]]): (List[Option[Request]], Boolean) = {
+    val reqs =
+      proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
+        .getOrElse(msgs.take(1))
+        .map(_.get)
+    val rest = msgs.drop(reqs.length)
 
-        for {
-          (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
-        } {
-          (req.ack, res) match {
-            case (Some(a), _) => a.change(_ => Some(res))
-            case (None, Exn.Res(_)) =>
-            case (None, Exn.Exn(exn)) => failure(exn)
-          }
-        }
+    val (results, cont) = consume(reqs.map(_.arg))
+    for {
+      (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
+    } {
+      (req.ack, res) match {
+        case (Some(a), _) => a.change(_ => Some(res))
+        case (None, Exn.Res(_)) =>
+        case (None, Exn.Exn(exn)) => failure(exn)
+      }
+    }
 
-        if (cont) main_loop(msgs.drop(reqs.length))
-        else robust_finish()
+    (rest, cont)
+  }
+
+  @tailrec private def main_loop(buffer: List[Option[Request]]): Unit =
+    proper_list(buffer).getOrElse(mailbox.receive(timeout = timeout)) match {
+      case None :: _ => robust_finish()
+      case msgs =>
+        val (rest, cont) = process(msgs)
+        if (cont) main_loop(rest) else robust_finish()
     }