--- a/src/Pure/Concurrent/consumer_thread.scala Thu Aug 17 15:12:18 2023 +0200
+++ b/src/Pure/Concurrent/consumer_thread.scala Thu Aug 17 16:15:25 2023 +0200
@@ -15,8 +15,9 @@
def fork_bulk[A](name: String = "", daemon: Boolean = false)(
bulk: A => Boolean,
consume: List[A] => (List[Exn.Result[Unit]], Boolean),
+ timeout: Option[Time] = None,
finish: () => Unit = () => ()): Consumer_Thread[A] =
- new Consumer_Thread[A](name, daemon, bulk, consume, finish)
+ new Consumer_Thread[A](name, daemon, bulk, consume, timeout, finish)
def fork[A](name: String = "", daemon: Boolean = false)(
consume: A => Boolean,
@@ -38,6 +39,7 @@
name: String, daemon: Boolean,
bulk: A => Boolean,
consume: List[A] => (List[Exn.Result[Unit]], Boolean),
+ timeout: Option[Time] = None,
finish: () => Unit
) {
/* thread */
@@ -78,30 +80,33 @@
req.await()
}
- @tailrec private def main_loop(msgs: List[Option[Request]]): Unit =
- msgs match {
- case Nil => main_loop(mailbox.receive())
- case None :: _ => robust_finish()
- case _ =>
- val reqs =
- proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
- .getOrElse(msgs.take(1))
- .map(_.get)
-
- val (results, cont) = consume(reqs.map(_.arg))
+ private def process(msgs: List[Option[Request]]): (List[Option[Request]], Boolean) = {
+ val reqs =
+ proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
+ .getOrElse(msgs.take(1))
+ .map(_.get)
+ val rest = msgs.drop(reqs.length)
- for {
- (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
- } {
- (req.ack, res) match {
- case (Some(a), _) => a.change(_ => Some(res))
- case (None, Exn.Res(_)) =>
- case (None, Exn.Exn(exn)) => failure(exn)
- }
- }
+ val (results, cont) = consume(reqs.map(_.arg))
+ for {
+ (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None)
+ } {
+ (req.ack, res) match {
+ case (Some(a), _) => a.change(_ => Some(res))
+ case (None, Exn.Res(_)) =>
+ case (None, Exn.Exn(exn)) => failure(exn)
+ }
+ }
- if (cont) main_loop(msgs.drop(reqs.length))
- else robust_finish()
+ (rest, cont)
+ }
+
+ @tailrec private def main_loop(buffer: List[Option[Request]]): Unit =
+ proper_list(buffer).getOrElse(mailbox.receive(timeout = timeout)) match {
+ case None :: _ => robust_finish()
+ case msgs =>
+ val (rest, cont) = process(msgs)
+ if (cont) main_loop(rest) else robust_finish()
}