support for mailbox limit;
authorwenzelm
Tue, 31 Oct 2023 16:11:26 +0100
changeset 78865 a0199212046a
parent 78864 2024a2298d7a
child 78866 1bd52b048f8e
support for mailbox limit;
src/Pure/Concurrent/consumer_thread.scala
src/Pure/Concurrent/mailbox.scala
--- a/src/Pure/Concurrent/consumer_thread.scala	Tue Oct 31 15:40:46 2023 +0100
+++ b/src/Pure/Concurrent/consumer_thread.scala	Tue Oct 31 16:11:26 2023 +0100
@@ -16,11 +16,13 @@
       bulk: A => Boolean,
       consume: List[A] => (List[Exn.Result[Unit]], Boolean),
       timeout: Option[Time] = None,
+      limit: Int = 0,
       finish: () => Unit = () => ()): Consumer_Thread[A] =
-    new Consumer_Thread[A](name, daemon, bulk, consume, timeout, finish)
+    new Consumer_Thread[A](name, daemon, bulk, consume, timeout, limit, finish)
 
   def fork[A](name: String = "", daemon: Boolean = false)(
       consume: A => Boolean,
+      limit: Int = 0,
       finish: () => Unit = () => ()
     ): Consumer_Thread[A] = {
     def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
@@ -31,7 +33,8 @@
       }
     }
 
-    fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
+    fork_bulk(name = name, daemon = daemon)(
+      _ => false, consume_single, limit = limit, finish = finish)
   }
 }
 
@@ -40,12 +43,13 @@
   bulk: A => Boolean,
   consume: List[A] => (List[Exn.Result[Unit]], Boolean),
   timeout: Option[Time] = None,
+  limit: Int,
   finish: () => Unit
 ) {
   /* thread */
 
   private var active = true
-  private val mailbox = Mailbox[Option[Request]]()
+  private val mailbox = Mailbox[Option[Request]](limit = limit)
 
   private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) }
   def is_active(): Boolean = active && thread.isAlive
--- a/src/Pure/Concurrent/mailbox.scala	Tue Oct 31 15:40:46 2023 +0100
+++ b/src/Pure/Concurrent/mailbox.scala	Tue Oct 31 16:11:26 2023 +0100
@@ -9,15 +9,17 @@
 
 
 object Mailbox {
-  def apply[A](): Mailbox[A] = new Mailbox[A]()
+  def apply[A](limit: Int = 0): Mailbox[A] = new Mailbox[A](limit)
 }
 
 
-class Mailbox[A] private() {
+class Mailbox[A] private(limit: Int) {
   private val mailbox = Synchronized[List[A]](Nil)
   override def toString: String = mailbox.value.reverse.mkString("Mailbox(", ",", ")")
 
-  def send(msg: A): Unit = mailbox.change(msg :: _)
+  def send(msg: A): Unit =
+    if (limit <= 0) mailbox.change(msg :: _)
+    else mailbox.guarded_access(msgs => if (msgs.length < limit) Some(((), msg :: msgs)) else None)
 
   def receive(timeout: Option[Time] = None): List[A] =
     (mailbox.timed_access(_ => timeout.map(t => Time.now() + t),