src/Pure/Concurrent/mailbox.ML
author blanchet
Fri, 20 Dec 2013 11:34:07 +0100
changeset 54835 431133d07192
parent 52583 0a7240d88e09
child 57417 29fe9bac501b
permissions -rw-r--r--
note manually proved exclusiveness property

(*  Title:      Pure/Concurrent/mailbox.ML
    Author:     Makarius

Message exchange via mailbox, with non-blocking send (due to unbounded
queueing) and potentially blocking receive.
*)

signature MAILBOX =
sig
  type 'a T
  val create: unit -> 'a T
  val send: 'a T -> 'a -> unit
  val receive: 'a T -> 'a
  val receive_timeout: Time.time -> 'a T -> 'a option
  val await_empty: 'a T -> unit
end;

structure Mailbox: MAILBOX =
struct

datatype 'a T = Mailbox of 'a Queue.T Synchronized.var;

fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty);

fun send (Mailbox mailbox) msg =
  Synchronized.change mailbox (Queue.enqueue msg);

fun receive (Mailbox mailbox) =
  Synchronized.guarded_access mailbox (try Queue.dequeue);

fun receive_timeout timeout (Mailbox mailbox) =
  Synchronized.timed_access mailbox
    (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);

fun await_empty (Mailbox mailbox) =
  Synchronized.guarded_access mailbox
    (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE);

end;