--- a/src/Pure/Concurrent/consumer_thread.scala Wed Nov 20 12:21:54 2019 +0100
+++ b/src/Pure/Concurrent/consumer_thread.scala Wed Nov 20 16:28:13 2019 +0100
@@ -13,14 +13,34 @@
object Consumer_Thread
{
+ def fork_bulk[A](name: String = "", daemon: Boolean = false)(
+ bulk: A => Boolean,
+ consume: List[A] => (List[Exn.Result[Unit]], Boolean),
+ finish: () => Unit = () => ()): Consumer_Thread[A] =
+ new Consumer_Thread[A](name, daemon, bulk, consume, finish)
+
def fork[A](name: String = "", daemon: Boolean = false)(
consume: A => Boolean,
finish: () => Unit = () => ()): Consumer_Thread[A] =
- new Consumer_Thread[A](name, daemon, consume, finish)
+ {
+ def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) =
+ {
+ assert(args.length == 1)
+ Exn.capture { consume(args.head) } match {
+ case Exn.Res(continue) => (List(Exn.Res(())), continue)
+ case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
+ }
+ }
+
+ fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
+ }
}
final class Consumer_Thread[A] private(
- name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
+ name: String, daemon: Boolean,
+ bulk: A => Boolean,
+ consume: List[A] => (List[Exn.Result[Unit]], Boolean),
+ finish: () => Unit)
{
/* thread */
@@ -41,25 +61,11 @@
/* requests */
- private class Request(arg: A, acknowledge: Boolean = false)
+ private class Request(val arg: A, acknowledge: Boolean = false)
{
- private val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
+ val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
if (acknowledge) Some(Synchronized(None)) else None
- def apply: Boolean =
- {
- val result = Exn.capture { consume(arg) }
- (ack, result) match {
- case ((Some(a), _)) => a.change(_ => Some(result.map(_ => ())))
- case ((None, Exn.Res(_))) =>
- case ((None, Exn.Exn(exn))) => failure(exn)
- }
- result match {
- case Exn.Res(continue) => continue
- case Exn.Exn(_) => true
- }
- }
-
def await
{
for (a <- ack) {
@@ -80,10 +86,30 @@
@tailrec private def main_loop(msgs: List[Option[Request]]): Unit =
msgs match {
case Nil => main_loop(mailbox.receive(None))
- case Some(req) :: rest =>
- val continue = req.apply
- if (continue) main_loop(rest) else robust_finish()
case None :: _ => robust_finish()
+ case _ =>
+ val reqs =
+ proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
+ .getOrElse(msgs.take(1))
+ .map(_.get)
+
+ val (results, continue) = consume(reqs.map(_.arg))
+
+ for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) }
+ {
+ (req.ack, res) match {
+ case ((Some(a), _)) => a.change(_ => Some(res))
+ case ((None, Exn.Res(_))) =>
+ case ((None, Exn.Exn(exn))) => failure(exn)
+ }
+ }
+
+ if (continue) {
+ val msgs1 = msgs.drop(reqs.length)
+ val msgs2 = mailbox.receive(Some(Time.zero))
+ main_loop(msgs1 ::: msgs2)
+ }
+ else robust_finish()
}