src/Pure/Concurrent/mailbox.ML
author haftmann
Wed, 10 Aug 2016 18:57:20 +0200
changeset 63659 abe0c3872d8a
parent 62826 eb94e570c1a4
permissions -rw-r--r--
keeping lifting rules local

(*  Title:      Pure/Concurrent/mailbox.ML
    Author:     Makarius

Message exchange via mailbox, with multiple senders (non-blocking,
unbounded buffering) and single receiver (bulk messages).
*)

signature MAILBOX =
sig
  type 'a T
  val create: unit -> 'a T
  val send: 'a T -> 'a -> unit
  val receive: Time.time option -> 'a T -> 'a list
  val await_empty: 'a T -> unit
end;

structure Mailbox: MAILBOX =
struct

datatype 'a T = Mailbox of 'a list Synchronized.var;

fun create () = Mailbox (Synchronized.var "mailbox" []);

fun send (Mailbox mailbox) msg = Synchronized.change mailbox (cons msg);

fun receive timeout (Mailbox mailbox) =
  Synchronized.timed_access mailbox
    (fn _ => Option.map (fn t => Time.now () + t) timeout)
    (fn [] => NONE | msgs => SOME (msgs, []))
  |> these |> rev;

fun await_empty (Mailbox mailbox) =
  Synchronized.guarded_access mailbox (fn [] => SOME ((), []) | _ => NONE);

end;