--- a/src/Pure/Concurrent/mailbox.ML Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/mailbox.ML Fri Jun 27 22:08:55 2014 +0200
@@ -1,8 +1,8 @@
(* Title: Pure/Concurrent/mailbox.ML
Author: Makarius
-Message exchange via mailbox, with non-blocking send (due to unbounded
-queueing) and potentially blocking receive.
+Message exchange via mailbox, with multiple senders (non-blocking,
+unbounded buffering) and single receiver (bulk messages).
*)
signature MAILBOX =
@@ -10,30 +10,26 @@
type 'a T
val create: unit -> 'a T
val send: 'a T -> 'a -> unit
- val receive: 'a T -> 'a
- val receive_timeout: Time.time -> 'a T -> 'a option
+ val receive: Time.time option -> 'a T -> 'a list
val await_empty: 'a T -> unit
end;
structure Mailbox: MAILBOX =
struct
-datatype 'a T = Mailbox of 'a Queue.T Synchronized.var;
+datatype 'a T = Mailbox of 'a list Synchronized.var;
-fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty);
+fun create () = Mailbox (Synchronized.var "mailbox" []);
-fun send (Mailbox mailbox) msg =
- Synchronized.change mailbox (Queue.enqueue msg);
+fun send (Mailbox mailbox) msg = Synchronized.change mailbox (cons msg);
-fun receive (Mailbox mailbox) =
- Synchronized.guarded_access mailbox (try Queue.dequeue);
-
-fun receive_timeout timeout (Mailbox mailbox) =
+fun receive timeout (Mailbox mailbox) =
Synchronized.timed_access mailbox
- (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);
+ (fn _ => Option.map (fn t => (Time.+ (Time.now (), t))) timeout)
+ (fn [] => NONE | msgs => SOME (msgs, []))
+ |> these |> rev;
fun await_empty (Mailbox mailbox) =
- Synchronized.guarded_access mailbox
- (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE);
+ Synchronized.guarded_access mailbox (fn [] => SOME ((), []) | _ => NONE);
end;