src/Pure/Concurrent/consumer_thread.scala
changeset 78865 a0199212046a
parent 78864 2024a2298d7a
child 80300 152d6c58adb3
--- a/src/Pure/Concurrent/consumer_thread.scala	Tue Oct 31 15:40:46 2023 +0100
+++ b/src/Pure/Concurrent/consumer_thread.scala	Tue Oct 31 16:11:26 2023 +0100
@@ -16,11 +16,13 @@
       bulk: A => Boolean,
       consume: List[A] => (List[Exn.Result[Unit]], Boolean),
       timeout: Option[Time] = None,
+      limit: Int = 0,
       finish: () => Unit = () => ()): Consumer_Thread[A] =
-    new Consumer_Thread[A](name, daemon, bulk, consume, timeout, finish)
+    new Consumer_Thread[A](name, daemon, bulk, consume, timeout, limit, finish)
 
   def fork[A](name: String = "", daemon: Boolean = false)(
       consume: A => Boolean,
+      limit: Int = 0,
       finish: () => Unit = () => ()
     ): Consumer_Thread[A] = {
     def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
@@ -31,7 +33,8 @@
       }
     }
 
-    fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
+    fork_bulk(name = name, daemon = daemon)(
+      _ => false, consume_single, limit = limit, finish = finish)
   }
 }
 
@@ -40,12 +43,13 @@
   bulk: A => Boolean,
   consume: List[A] => (List[Exn.Result[Unit]], Boolean),
   timeout: Option[Time] = None,
+  limit: Int,
   finish: () => Unit
 ) {
   /* thread */
 
   private var active = true
-  private val mailbox = Mailbox[Option[Request]]()
+  private val mailbox = Mailbox[Option[Request]](limit = limit)
 
   private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) }
   def is_active(): Boolean = active && thread.isAlive