--- a/src/Pure/Concurrent/mailbox.ML Mon Oct 13 15:48:37 2008 +0200
+++ b/src/Pure/Concurrent/mailbox.ML Mon Oct 13 15:48:38 2008 +0200
@@ -2,7 +2,7 @@
ID: $Id$
Author: Makarius
-Concurrent message exchange via mailbox -- with unbounded queueing.
+Message exchange via mailbox -- with unbounded queueing.
*)
signature MAILBOX =
@@ -17,47 +17,18 @@
structure Mailbox: MAILBOX =
struct
-(* datatype *)
-
-datatype 'a T = Mailbox of
- {lock: Mutex.mutex,
- cond: ConditionVar.conditionVar,
- messages: 'a Queue.T ref};
+datatype 'a T = Mailbox of 'a Queue.T Synchronized.var;
+fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty);
-fun create () = Mailbox
- {lock = Mutex.mutex (),
- cond = ConditionVar.conditionVar (),
- messages = ref Queue.empty};
-
-
-(* send -- non-blocking *)
+(*send -- non-blocking*)
+fun send (Mailbox mailbox) msg = Synchronized.change mailbox (Queue.enqueue msg);
-fun send (Mailbox {lock, cond, messages}) msg = uninterruptible (fn _ => fn () =>
- let
- val _ = Mutex.lock lock;
- val _ = change messages (Queue.enqueue msg);
- val _ = ConditionVar.broadcast cond;
- val _ = Mutex.unlock lock;
- in () end) ();
-
-
-(* receive -- blocking, interruptible, with timeout *)
+(*receive_timeout -- blocking, interruptible, with timeout*)
+fun receive_timeout timeout (Mailbox mailbox) =
+ Synchronized.guarded_change (not o Queue.is_empty) (fn _ => SOME (Time.+ (Time.now (), timeout)))
+ mailbox (fn ok => fn msgs => if ok then Queue.dequeue msgs |>> SOME else (NONE, msgs));
-fun receive_timeout timeout (Mailbox {lock, cond, messages}) =
- uninterruptible (fn restore_attributes => fn () =>
- let
- val _ = Mutex.lock lock;
-
- val limit = Time.+ (Time.now (), timeout);
- fun check () = not (Queue.is_empty (! messages)) orelse
- ConditionVar.waitUntil (cond, lock, limit) andalso check ();
- val ok = restore_attributes check ()
- handle Exn.Interrupt => (Mutex.unlock lock; raise Exn.Interrupt);
- val res = if ok then SOME (change_result messages Queue.dequeue) else NONE;
-
- val _ = Mutex.unlock lock;
- in res end) ();
-
+(*receive -- blocking, interruptible*)
fun receive mailbox =
(case receive_timeout (Time.fromSeconds 10) mailbox of
NONE => receive mailbox