support for bulk operations: consume mailbox content in batches;
authorwenzelm
Wed, 20 Nov 2019 16:28:13 +0100
changeset 71143 5ea3ed3c52b3
parent 71142 d6688677a784
child 71144 d6b9dead8c8d
support for bulk operations: consume mailbox content in batches;
src/Pure/Concurrent/consumer_thread.scala
--- a/src/Pure/Concurrent/consumer_thread.scala	Wed Nov 20 12:21:54 2019 +0100
+++ b/src/Pure/Concurrent/consumer_thread.scala	Wed Nov 20 16:28:13 2019 +0100
@@ -13,14 +13,34 @@
 
 object Consumer_Thread
 {
+  def fork_bulk[A](name: String = "", daemon: Boolean = false)(
+      bulk: A => Boolean,
+      consume: List[A] => (List[Exn.Result[Unit]], Boolean),
+      finish: () => Unit = () => ()): Consumer_Thread[A] =
+    new Consumer_Thread[A](name, daemon, bulk, consume, finish)
+
   def fork[A](name: String = "", daemon: Boolean = false)(
       consume: A => Boolean,
       finish: () => Unit = () => ()): Consumer_Thread[A] =
-    new Consumer_Thread[A](name, daemon, consume, finish)
+  {
+    def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) =
+    {
+      assert(args.length == 1)
+      Exn.capture { consume(args.head) } match {
+        case Exn.Res(continue) => (List(Exn.Res(())), continue)
+        case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
+      }
+    }
+
+    fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
+  }
 }
 
 final class Consumer_Thread[A] private(
-  name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
+  name: String, daemon: Boolean,
+  bulk: A => Boolean,
+  consume: List[A] => (List[Exn.Result[Unit]], Boolean),
+  finish: () => Unit)
 {
   /* thread */
 
@@ -41,25 +61,11 @@
 
   /* requests */
 
-  private class Request(arg: A, acknowledge: Boolean = false)
+  private class Request(val arg: A, acknowledge: Boolean = false)
   {
-    private val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
+    val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
       if (acknowledge) Some(Synchronized(None)) else None
 
-    def apply: Boolean =
-    {
-      val result = Exn.capture { consume(arg) }
-      (ack, result) match {
-        case ((Some(a), _)) => a.change(_ => Some(result.map(_ => ())))
-        case ((None, Exn.Res(_))) =>
-        case ((None, Exn.Exn(exn))) => failure(exn)
-      }
-      result match {
-        case Exn.Res(continue) => continue
-        case Exn.Exn(_) => true
-      }
-    }
-
     def await
     {
       for (a <- ack) {
@@ -80,10 +86,30 @@
   @tailrec private def main_loop(msgs: List[Option[Request]]): Unit =
     msgs match {
       case Nil => main_loop(mailbox.receive(None))
-      case Some(req) :: rest =>
-        val continue = req.apply
-        if (continue) main_loop(rest) else robust_finish()
       case None :: _ => robust_finish()
+      case _ =>
+        val reqs =
+          proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
+            .getOrElse(msgs.take(1))
+            .map(_.get)
+
+        val (results, continue) = consume(reqs.map(_.arg))
+
+        for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) }
+        {
+          (req.ack, res) match {
+            case ((Some(a), _)) => a.change(_ => Some(res))
+            case ((None, Exn.Res(_))) =>
+            case ((None, Exn.Exn(exn))) => failure(exn)
+          }
+        }
+
+        if (continue) {
+          val msgs1 = msgs.drop(reqs.length)
+          val msgs2 = mailbox.receive(Some(Time.zero))
+          main_loop(msgs1 ::: msgs2)
+        }
+        else robust_finish()
     }