simplified implementation using Synchronized.var;
authorwenzelm
Mon, 13 Oct 2008 15:48:38 +0200
changeset 28576 dc4aae271c41
parent 28575 ed869f019642
child 28577 bd2456e0d944
simplified implementation using Synchronized.var;
src/Pure/Concurrent/mailbox.ML
--- a/src/Pure/Concurrent/mailbox.ML	Mon Oct 13 15:48:37 2008 +0200
+++ b/src/Pure/Concurrent/mailbox.ML	Mon Oct 13 15:48:38 2008 +0200
@@ -2,7 +2,7 @@
     ID:         $Id$
     Author:     Makarius
 
-Concurrent message exchange via mailbox -- with unbounded queueing.
+Message exchange via mailbox -- with unbounded queueing.
 *)
 
 signature MAILBOX =
@@ -17,47 +17,18 @@
 structure Mailbox: MAILBOX =
 struct
 
-(* datatype *)
-
-datatype 'a T = Mailbox of
-  {lock: Mutex.mutex,
-   cond: ConditionVar.conditionVar,
-   messages: 'a Queue.T ref};
+datatype 'a T = Mailbox of 'a Queue.T Synchronized.var;
+fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty);
 
-fun create () = Mailbox
-  {lock = Mutex.mutex (),
-   cond = ConditionVar.conditionVar (),
-   messages = ref Queue.empty};
-
-
-(* send -- non-blocking *)
+(*send -- non-blocking*)
+fun send (Mailbox mailbox) msg = Synchronized.change mailbox (Queue.enqueue msg);
 
-fun send (Mailbox {lock, cond, messages}) msg = uninterruptible (fn _ => fn () =>
-  let
-    val _ = Mutex.lock lock;
-    val _ = change messages (Queue.enqueue msg);
-    val _ = ConditionVar.broadcast cond;
-    val _ = Mutex.unlock lock;
-  in () end) ();
-
-
-(* receive -- blocking, interruptible, with timeout *)
+(*receive_timeout -- blocking, interruptible, with timeout*)
+fun receive_timeout timeout (Mailbox mailbox) =
+  Synchronized.guarded_change (not o Queue.is_empty) (fn _ => SOME (Time.+ (Time.now (), timeout)))
+    mailbox (fn ok => fn msgs => if ok then Queue.dequeue msgs |>> SOME else (NONE, msgs));
 
-fun receive_timeout timeout (Mailbox {lock, cond, messages}) =
-  uninterruptible (fn restore_attributes => fn () =>
-    let
-      val _ = Mutex.lock lock;
-
-      val limit = Time.+ (Time.now (), timeout);
-      fun check () = not (Queue.is_empty (! messages)) orelse
-        ConditionVar.waitUntil (cond, lock, limit) andalso check ();
-      val ok = restore_attributes check ()
-        handle Exn.Interrupt => (Mutex.unlock lock; raise Exn.Interrupt);
-      val res = if ok then SOME (change_result messages Queue.dequeue) else NONE;
-
-      val _ = Mutex.unlock lock;
-    in res end) ();
-
+(*receive -- blocking, interruptible*)
 fun receive mailbox =
   (case receive_timeout (Time.fromSeconds 10) mailbox of
     NONE => receive mailbox