# HG changeset patch # User wenzelm # Date 1223905718 -7200 # Node ID dc4aae271c411630418d82f99364ed3e272c94d3 # Parent ed869f0196420dfa6bebab59b8a80fe2242be48a simplified implementation using Synchronized.var; diff -r ed869f019642 -r dc4aae271c41 src/Pure/Concurrent/mailbox.ML --- a/src/Pure/Concurrent/mailbox.ML Mon Oct 13 15:48:37 2008 +0200 +++ b/src/Pure/Concurrent/mailbox.ML Mon Oct 13 15:48:38 2008 +0200 @@ -2,7 +2,7 @@ ID: $Id$ Author: Makarius -Concurrent message exchange via mailbox -- with unbounded queueing. +Message exchange via mailbox -- with unbounded queueing. *) signature MAILBOX = @@ -17,47 +17,18 @@ structure Mailbox: MAILBOX = struct -(* datatype *) - -datatype 'a T = Mailbox of - {lock: Mutex.mutex, - cond: ConditionVar.conditionVar, - messages: 'a Queue.T ref}; +datatype 'a T = Mailbox of 'a Queue.T Synchronized.var; +fun create () = Mailbox (Synchronized.var "mailbox" Queue.empty); -fun create () = Mailbox - {lock = Mutex.mutex (), - cond = ConditionVar.conditionVar (), - messages = ref Queue.empty}; - - -(* send -- non-blocking *) +(*send -- non-blocking*) +fun send (Mailbox mailbox) msg = Synchronized.change mailbox (Queue.enqueue msg); -fun send (Mailbox {lock, cond, messages}) msg = uninterruptible (fn _ => fn () => - let - val _ = Mutex.lock lock; - val _ = change messages (Queue.enqueue msg); - val _ = ConditionVar.broadcast cond; - val _ = Mutex.unlock lock; - in () end) (); - - -(* receive -- blocking, interruptible, with timeout *) +(*receive_timeout -- blocking, interruptible, with timeout*) +fun receive_timeout timeout (Mailbox mailbox) = + Synchronized.guarded_change (not o Queue.is_empty) (fn _ => SOME (Time.+ (Time.now (), timeout))) + mailbox (fn ok => fn msgs => if ok then Queue.dequeue msgs |>> SOME else (NONE, msgs)); -fun receive_timeout timeout (Mailbox {lock, cond, messages}) = - uninterruptible (fn restore_attributes => fn () => - let - val _ = Mutex.lock lock; - - val limit = Time.+ (Time.now (), timeout); - fun check () = not (Queue.is_empty (! messages)) orelse - ConditionVar.waitUntil (cond, lock, limit) andalso check (); - val ok = restore_attributes check () - handle Exn.Interrupt => (Mutex.unlock lock; raise Exn.Interrupt); - val res = if ok then SOME (change_result messages Queue.dequeue) else NONE; - - val _ = Mutex.unlock lock; - in res end) (); - +(*receive -- blocking, interruptible*) fun receive mailbox = (case receive_timeout (Time.fromSeconds 10) mailbox of NONE => receive mailbox