# HG changeset patch # User wenzelm # Date 1698765086 -3600 # Node ID a0199212046a950a91e5ee3a38b16b9cd73a41b6 # Parent 2024a2298d7a750ee6282ccda9454b033a619d7b support for mailbox limit; diff -r 2024a2298d7a -r a0199212046a src/Pure/Concurrent/consumer_thread.scala --- a/src/Pure/Concurrent/consumer_thread.scala Tue Oct 31 15:40:46 2023 +0100 +++ b/src/Pure/Concurrent/consumer_thread.scala Tue Oct 31 16:11:26 2023 +0100 @@ -16,11 +16,13 @@ bulk: A => Boolean, consume: List[A] => (List[Exn.Result[Unit]], Boolean), timeout: Option[Time] = None, + limit: Int = 0, finish: () => Unit = () => ()): Consumer_Thread[A] = - new Consumer_Thread[A](name, daemon, bulk, consume, timeout, finish) + new Consumer_Thread[A](name, daemon, bulk, consume, timeout, limit, finish) def fork[A](name: String = "", daemon: Boolean = false)( consume: A => Boolean, + limit: Int = 0, finish: () => Unit = () => () ): Consumer_Thread[A] = { def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) = { @@ -31,7 +33,8 @@ } } - fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish) + fork_bulk(name = name, daemon = daemon)( + _ => false, consume_single, limit = limit, finish = finish) } } @@ -40,12 +43,13 @@ bulk: A => Boolean, consume: List[A] => (List[Exn.Result[Unit]], Boolean), timeout: Option[Time] = None, + limit: Int, finish: () => Unit ) { /* thread */ private var active = true - private val mailbox = Mailbox[Option[Request]]() + private val mailbox = Mailbox[Option[Request]](limit = limit) private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) } def is_active(): Boolean = active && thread.isAlive diff -r 2024a2298d7a -r a0199212046a src/Pure/Concurrent/mailbox.scala --- a/src/Pure/Concurrent/mailbox.scala Tue Oct 31 15:40:46 2023 +0100 +++ b/src/Pure/Concurrent/mailbox.scala Tue Oct 31 16:11:26 2023 +0100 @@ -9,15 +9,17 @@ object Mailbox { - def apply[A](): Mailbox[A] = new Mailbox[A]() + def apply[A](limit: Int = 0): Mailbox[A] = new Mailbox[A](limit) } -class Mailbox[A] private() { +class Mailbox[A] private(limit: Int) { private val mailbox = Synchronized[List[A]](Nil) override def toString: String = mailbox.value.reverse.mkString("Mailbox(", ",", ")") - def send(msg: A): Unit = mailbox.change(msg :: _) + def send(msg: A): Unit = + if (limit <= 0) mailbox.change(msg :: _) + else mailbox.guarded_access(msgs => if (msgs.length < limit) Some(((), msg :: msgs)) else None) def receive(timeout: Option[Time] = None): List[A] = (mailbox.timed_access(_ => timeout.map(t => Time.now() + t),