# HG changeset patch # User wenzelm # Date 1574263693 -3600 # Node ID 5ea3ed3c52b35a44506d8142c113ddda7d230749 # Parent d6688677a7846f8343030f7991d5b3e07480f1a9 support for bulk operations: consume mailbox content in batches; diff -r d6688677a784 -r 5ea3ed3c52b3 src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Wed Nov 20 12:21:54 2019 +0100 +++ b/src/Pure/Concurrent/consumer_thread.scala Wed Nov 20 16:28:13 2019 +0100 @@ -13,14 +13,34 @@ object Consumer_Thread { + def fork_bulk[A](name: String = "", daemon: Boolean = false)( + bulk: A => Boolean, + consume: List[A] => (List[Exn.Result[Unit]], Boolean), + finish: () => Unit = () => ()): Consumer_Thread[A] = + new Consumer_Thread[A](name, daemon, bulk, consume, finish) + def fork[A](name: String = "", daemon: Boolean = false)( consume: A => Boolean, finish: () => Unit = () => ()): Consumer_Thread[A] = - new Consumer_Thread[A](name, daemon, consume, finish) + { + def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = + { + assert(args.length == 1) + Exn.capture { consume(args.head) } match { + case Exn.Res(continue) => (List(Exn.Res(())), continue) + case Exn.Exn(exn) => (List(Exn.Exn(exn)), true) + } + } + + fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish) + } } final class Consumer_Thread[A] private( - name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit) + name: String, daemon: Boolean, + bulk: A => Boolean, + consume: List[A] => (List[Exn.Result[Unit]], Boolean), + finish: () => Unit) { /* thread */ @@ -41,25 +61,11 @@ /* requests */ - private class Request(arg: A, acknowledge: Boolean = false) + private class Request(val arg: A, acknowledge: Boolean = false) { - private val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] = + val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] = if (acknowledge) Some(Synchronized(None)) else None - def apply: Boolean = - { - val result = Exn.capture { consume(arg) } - (ack, result) match { - case ((Some(a), _)) => a.change(_ => Some(result.map(_ => ()))) - case ((None, Exn.Res(_))) => - case ((None, Exn.Exn(exn))) => failure(exn) - } - result match { - case Exn.Res(continue) => continue - case Exn.Exn(_) => true - } - } - def await { for (a <- ack) { @@ -80,10 +86,30 @@ @tailrec private def main_loop(msgs: List[Option[Request]]): Unit = msgs match { case Nil => main_loop(mailbox.receive(None)) - case Some(req) :: rest => - val continue = req.apply - if (continue) main_loop(rest) else robust_finish() case None :: _ => robust_finish() + case _ => + val reqs = + proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg))) + .getOrElse(msgs.take(1)) + .map(_.get) + + val (results, continue) = consume(reqs.map(_.arg)) + + for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) } + { + (req.ack, res) match { + case ((Some(a), _)) => a.change(_ => Some(res)) + case ((None, Exn.Res(_))) => + case ((None, Exn.Exn(exn))) => failure(exn) + } + } + + if (continue) { + val msgs1 = msgs.drop(reqs.length) + val msgs2 = mailbox.receive(Some(Time.zero)) + main_loop(msgs1 ::: msgs2) + } + else robust_finish() }