--- a/src/Pure/Concurrent/mailbox.ML Mon Oct 13 15:48:40 2008 +0200
+++ b/src/Pure/Concurrent/mailbox.ML Tue Oct 14 13:01:52 2008 +0200
@@ -2,7 +2,8 @@
ID: $Id$
Author: Makarius
-Message exchange via mailbox -- with unbounded queueing.
+Message exchange via mailbox, with non-blocking send (due to unbounded
+queueing) and potentially blocking receive.
*)
signature MAILBOX =
@@ -10,28 +11,25 @@
type 'a T
val create: unit -> 'a T
val send: 'a T -> 'a -> unit
+ val receive: 'a T -> 'a
val receive_timeout: Time.time -> 'a T -> 'a option
- val receive: 'a T -> 'a
end;
structure Mailbox: MAILBOX =
struct
datatype 'a T = Mailbox of 'a Queue.T Synchronized.var;
+
fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty);
-(*send -- non-blocking*)
-fun send (Mailbox mailbox) msg = Synchronized.change mailbox (Queue.enqueue msg);
+fun send (Mailbox mailbox) msg =
+ Synchronized.change mailbox (Queue.enqueue msg);
-(*receive_timeout -- blocking, interruptible, with timeout*)
+fun receive (Mailbox mailbox) =
+ Synchronized.guarded_access mailbox (try Queue.dequeue);
+
fun receive_timeout timeout (Mailbox mailbox) =
- Synchronized.guarded_change (not o Queue.is_empty) (fn _ => SOME (Time.+ (Time.now (), timeout)))
- mailbox (fn ok => fn msgs => if ok then Queue.dequeue msgs |>> SOME else (NONE, msgs));
-
-(*receive -- blocking, interruptible*)
-fun receive mailbox =
- (case receive_timeout (Time.fromSeconds 10) mailbox of
- NONE => receive mailbox
- | SOME x => x);
+ Synchronized.timed_access mailbox
+ (fn _ => SOME (Time.+ (Time.now (), timeout))) (try Queue.dequeue);
end;