added receive_timeout;
authorwenzelm
Thu, 04 Sep 2008 21:02:42 +0200
changeset 28139 831e545c655e
parent 28138 03e5196b1559
child 28140 a74a1c580360
added receive_timeout; tuned comments;
src/Pure/Concurrent/mailbox.ML
--- a/src/Pure/Concurrent/mailbox.ML	Thu Sep 04 20:06:23 2008 +0200
+++ b/src/Pure/Concurrent/mailbox.ML	Thu Sep 04 21:02:42 2008 +0200
@@ -9,12 +9,15 @@
   type 'a T
   val create: unit -> 'a T
   val send: 'a -> 'a T -> unit
+  val receive_timeout: Time.time -> 'a T -> 'a option
   val receive: 'a T -> 'a
 end;
 
 structure Mailbox: MAILBOX =
 struct
 
+(* datatype *)
+
 datatype 'a T = Mailbox of
   {lock: Mutex.mutex,
    cond: ConditionVar.conditionVar,
@@ -25,6 +28,9 @@
    cond = ConditionVar.conditionVar (),
    messages = ref Queue.empty};
 
+
+(* send -- non-blocking *)
+
 fun send msg (Mailbox {lock, cond, messages}) = uninterruptible (fn _ => fn () =>
   let
     val _ = Mutex.lock lock;
@@ -33,14 +39,33 @@
     val _ = ConditionVar.signal cond;
   in () end) ();
 
-fun receive (Mailbox {lock, cond, messages}) = uninterruptible (fn restore_attributes => fn () =>
-  let
-    val _ = Mutex.lock lock;
-    fun check () = while (Queue.is_empty (! messages)) do (ConditionVar.wait (cond, lock));
-    val _ = restore_attributes check () handle Interrupt => (Mutex.unlock lock; raise Interrupt);
-    val (msg, msgs) = Queue.dequeue (! messages);
-    val _ = messages := msgs;
-    val _ = Mutex.unlock lock;
-  in msg end) ();
+
+(* receive -- blocking, interruptible, with timeout *)
+
+fun receive_timeout timeout (Mailbox {lock, cond, messages}) =
+  uninterruptible (fn restore_attributes => fn () =>
+    let
+      val _ = Mutex.lock lock;
+
+      val limit = Time.+ (Time.now (), timeout);
+      fun check () = not (Queue.is_empty (! messages)) orelse
+        ConditionVar.waitUntil (cond, lock, limit) andalso check ();
+      val ok = restore_attributes check ()
+        handle Interrupt => (Mutex.unlock lock; raise Interrupt);
+      val res =
+        if ok then
+          let
+            val (msg, msgs) = Queue.dequeue (! messages);
+            val _ = messages := msgs;
+          in SOME msg end
+        else NONE;
+
+      val _ = Mutex.unlock lock;
+    in res end) ();
+
+fun receive mailbox =
+  (case receive_timeout (Time.fromSeconds 10) mailbox of
+    NONE => receive mailbox
+  | SOME x => x);
 
 end;