# HG changeset patch # User wenzelm # Date 1692281725 -7200 # Node ID 879e1ba3868be0207d043762bd653431927d202d # Parent dd0501accda87c95b19d74f5d40a5e1ec411c0dc clarified main_loop: support timeout, which results in consume(Nil); diff -r dd0501accda8 -r 879e1ba3868b src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Thu Aug 17 15:12:18 2023 +0200 +++ b/src/Pure/Concurrent/consumer_thread.scala Thu Aug 17 16:15:25 2023 +0200 @@ -15,8 +15,9 @@ def fork_bulk[A](name: String = "", daemon: Boolean = false)( bulk: A => Boolean, consume: List[A] => (List[Exn.Result[Unit]], Boolean), + timeout: Option[Time] = None, finish: () => Unit = () => ()): Consumer_Thread[A] = - new Consumer_Thread[A](name, daemon, bulk, consume, finish) + new Consumer_Thread[A](name, daemon, bulk, consume, timeout, finish) def fork[A](name: String = "", daemon: Boolean = false)( consume: A => Boolean, @@ -38,6 +39,7 @@ name: String, daemon: Boolean, bulk: A => Boolean, consume: List[A] => (List[Exn.Result[Unit]], Boolean), + timeout: Option[Time] = None, finish: () => Unit ) { /* thread */ @@ -78,30 +80,33 @@ req.await() } - @tailrec private def main_loop(msgs: List[Option[Request]]): Unit = - msgs match { - case Nil => main_loop(mailbox.receive()) - case None :: _ => robust_finish() - case _ => - val reqs = - proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg))) - .getOrElse(msgs.take(1)) - .map(_.get) - - val (results, cont) = consume(reqs.map(_.arg)) + private def process(msgs: List[Option[Request]]): (List[Option[Request]], Boolean) = { + val reqs = + proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg))) + .getOrElse(msgs.take(1)) + .map(_.get) + val rest = msgs.drop(reqs.length) - for { - (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) - } { - (req.ack, res) match { - case (Some(a), _) => a.change(_ => Some(res)) - case (None, Exn.Res(_)) => - case (None, Exn.Exn(exn)) => failure(exn) - } - } + val (results, cont) = consume(reqs.map(_.arg)) + for { + (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) + } { + (req.ack, res) match { + case (Some(a), _) => a.change(_ => Some(res)) + case (None, Exn.Res(_)) => + case (None, Exn.Exn(exn)) => failure(exn) + } + } - if (cont) main_loop(msgs.drop(reqs.length)) - else robust_finish() + (rest, cont) + } + + @tailrec private def main_loop(buffer: List[Option[Request]]): Unit = + proper_list(buffer).getOrElse(mailbox.receive(timeout = timeout)) match { + case None :: _ => robust_finish() + case msgs => + val (rest, cont) = process(msgs) + if (cont) main_loop(rest) else robust_finish() }