# HG changeset patch # User wenzelm # Date 1220554962 -7200 # Node ID 831e545c655e6c9fe230ecc5f071ddd84858b566 # Parent 03e5196b1559c52316a7b907c9a9d4aa943aef3a added receive_timeout; tuned comments; diff -r 03e5196b1559 -r 831e545c655e src/Pure/Concurrent/mailbox.ML --- a/src/Pure/Concurrent/mailbox.ML Thu Sep 04 20:06:23 2008 +0200 +++ b/src/Pure/Concurrent/mailbox.ML Thu Sep 04 21:02:42 2008 +0200 @@ -9,12 +9,15 @@ type 'a T val create: unit -> 'a T val send: 'a -> 'a T -> unit + val receive_timeout: Time.time -> 'a T -> 'a option val receive: 'a T -> 'a end; structure Mailbox: MAILBOX = struct +(* datatype *) + datatype 'a T = Mailbox of {lock: Mutex.mutex, cond: ConditionVar.conditionVar, @@ -25,6 +28,9 @@ cond = ConditionVar.conditionVar (), messages = ref Queue.empty}; + +(* send -- non-blocking *) + fun send msg (Mailbox {lock, cond, messages}) = uninterruptible (fn _ => fn () => let val _ = Mutex.lock lock; @@ -33,14 +39,33 @@ val _ = ConditionVar.signal cond; in () end) (); -fun receive (Mailbox {lock, cond, messages}) = uninterruptible (fn restore_attributes => fn () => - let - val _ = Mutex.lock lock; - fun check () = while (Queue.is_empty (! messages)) do (ConditionVar.wait (cond, lock)); - val _ = restore_attributes check () handle Interrupt => (Mutex.unlock lock; raise Interrupt); - val (msg, msgs) = Queue.dequeue (! messages); - val _ = messages := msgs; - val _ = Mutex.unlock lock; - in msg end) (); + +(* receive -- blocking, interruptible, with timeout *) + +fun receive_timeout timeout (Mailbox {lock, cond, messages}) = + uninterruptible (fn restore_attributes => fn () => + let + val _ = Mutex.lock lock; + + val limit = Time.+ (Time.now (), timeout); + fun check () = not (Queue.is_empty (! messages)) orelse + ConditionVar.waitUntil (cond, lock, limit) andalso check (); + val ok = restore_attributes check () + handle Interrupt => (Mutex.unlock lock; raise Interrupt); + val res = + if ok then + let + val (msg, msgs) = Queue.dequeue (! messages); + val _ = messages := msgs; + in SOME msg end + else NONE; + + val _ = Mutex.unlock lock; + in res end) (); + +fun receive mailbox = + (case receive_timeout (Time.fromSeconds 10) mailbox of + NONE => receive mailbox + | SOME x => x); end;