--- a/src/Pure/Concurrent/consumer_thread.scala Tue Oct 31 15:40:46 2023 +0100
+++ b/src/Pure/Concurrent/consumer_thread.scala Tue Oct 31 16:11:26 2023 +0100
@@ -16,11 +16,13 @@
bulk: A => Boolean,
consume: List[A] => (List[Exn.Result[Unit]], Boolean),
timeout: Option[Time] = None,
+ limit: Int = 0,
finish: () => Unit = () => ()): Consumer_Thread[A] =
- new Consumer_Thread[A](name, daemon, bulk, consume, timeout, finish)
+ new Consumer_Thread[A](name, daemon, bulk, consume, timeout, limit, finish)
def fork[A](name: String = "", daemon: Boolean = false)(
consume: A => Boolean,
+ limit: Int = 0,
finish: () => Unit = () => ()
): Consumer_Thread[A] = {
def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = {
@@ -31,7 +33,8 @@
}
}
- fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
+ fork_bulk(name = name, daemon = daemon)(
+ _ => false, consume_single, limit = limit, finish = finish)
}
}
@@ -40,12 +43,13 @@
bulk: A => Boolean,
consume: List[A] => (List[Exn.Result[Unit]], Boolean),
timeout: Option[Time] = None,
+ limit: Int,
finish: () => Unit
) {
/* thread */
private var active = true
- private val mailbox = Mailbox[Option[Request]]()
+ private val mailbox = Mailbox[Option[Request]](limit = limit)
private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) }
def is_active(): Boolean = active && thread.isAlive
--- a/src/Pure/Concurrent/mailbox.scala Tue Oct 31 15:40:46 2023 +0100
+++ b/src/Pure/Concurrent/mailbox.scala Tue Oct 31 16:11:26 2023 +0100
@@ -9,15 +9,17 @@
object Mailbox {
- def apply[A](): Mailbox[A] = new Mailbox[A]()
+ def apply[A](limit: Int = 0): Mailbox[A] = new Mailbox[A](limit)
}
-class Mailbox[A] private() {
+class Mailbox[A] private(limit: Int) {
private val mailbox = Synchronized[List[A]](Nil)
override def toString: String = mailbox.value.reverse.mkString("Mailbox(", ",", ")")
- def send(msg: A): Unit = mailbox.change(msg :: _)
+ def send(msg: A): Unit =
+ if (limit <= 0) mailbox.change(msg :: _)
+ else mailbox.guarded_access(msgs => if (msgs.length < limit) Some(((), msg :: msgs)) else None)
def receive(timeout: Option[Time] = None): List[A] =
(mailbox.timed_access(_ => timeout.map(t => Time.now() + t),