src/Pure/Concurrent/mailbox.scala
changeset 56693 0423c957b081
child 57417 29fe9bac501b
equal deleted inserted replaced
56692:8219a65b24e3 56693:0423c957b081
       
     1 /*  Title:      Pure/Concurrent/mailbox.scala
       
     2     Module:     PIDE
       
     3     Author:     Makarius
       
     4 
       
     5 Message exchange via mailbox, with non-blocking send (due to unbounded
       
     6 queueing) and potentially blocking receive.
       
     7 */
       
     8 
       
     9 package isabelle
       
    10 
       
    11 
       
    12 import scala.collection.immutable.Queue
       
    13 
       
    14 
       
    15 object Mailbox
       
    16 {
       
    17   def apply[A]: Mailbox[A] = new Mailbox[A]()
       
    18 }
       
    19 
       
    20 
       
    21 class Mailbox[A] private()
       
    22 {
       
    23   private val mailbox = Synchronized(Queue.empty[A])
       
    24   override def toString: String = mailbox.value.mkString("Mailbox(", ",", ")")
       
    25 
       
    26   def send(msg: A): Unit =
       
    27     mailbox.change(_.enqueue(msg))
       
    28 
       
    29   def receive: A =
       
    30     mailbox.guarded_access(_.dequeueOption)
       
    31 
       
    32   def receive_timeout(timeout: Time): Option[A] =
       
    33     mailbox.timed_access(_ => Some(Time.now() + timeout), _.dequeueOption)
       
    34 
       
    35   def await_empty: Unit =
       
    36     mailbox.guarded_access(queue => if (queue.isEmpty) Some(((), queue)) else None)
       
    37 }