src/Pure/Concurrent/mailbox.ML
changeset 57417 29fe9bac501b
parent 52583 0a7240d88e09
child 62826 eb94e570c1a4
--- a/src/Pure/Concurrent/mailbox.ML	Fri Jun 27 19:38:32 2014 +0200
+++ b/src/Pure/Concurrent/mailbox.ML	Fri Jun 27 22:08:55 2014 +0200
@@ -1,8 +1,8 @@
 (*  Title:      Pure/Concurrent/mailbox.ML
     Author:     Makarius
 
-Message exchange via mailbox, with non-blocking send (due to unbounded
-queueing) and potentially blocking receive.
+Message exchange via mailbox, with multiple senders (non-blocking,
+unbounded buffering) and single receiver (bulk messages).
 *)
 
 signature MAILBOX =
@@ -10,30 +10,26 @@
   type 'a T
   val create: unit -> 'a T
   val send: 'a T -> 'a -> unit
-  val receive: 'a T -> 'a
-  val receive_timeout: Time.time -> 'a T -> 'a option
+  val receive: Time.time option -> 'a T -> 'a list
   val await_empty: 'a T -> unit
 end;
 
 structure Mailbox: MAILBOX =
 struct
 
-datatype 'a T = Mailbox of 'a Queue.T Synchronized.var;
+datatype 'a T = Mailbox of 'a list Synchronized.var;
 
-fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty);
+fun create () = Mailbox (Synchronized.var "mailbox" []);
 
-fun send (Mailbox mailbox) msg =
-  Synchronized.change mailbox (Queue.enqueue msg);
+fun send (Mailbox mailbox) msg = Synchronized.change mailbox (cons msg);
 
-fun receive (Mailbox mailbox) =
-  Synchronized.guarded_access mailbox (try Queue.dequeue);
-
-fun receive_timeout timeout (Mailbox mailbox) =
+fun receive timeout (Mailbox mailbox) =
   Synchronized.timed_access mailbox
-    (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);
+    (fn _ => Option.map (fn t => (Time.+ (Time.now (), t))) timeout)
+    (fn [] => NONE | msgs => SOME (msgs, []))
+  |> these |> rev;
 
 fun await_empty (Mailbox mailbox) =
-  Synchronized.guarded_access mailbox
-    (fn queue => if Queue.is_empty queue then SOME ((), queue) else NONE);
+  Synchronized.guarded_access mailbox (fn [] => SOME ((), []) | _ => NONE);
 
 end;