Concurrent message exchange via mailbox -- with unbounded queueing.
authorwenzelm
Thu, 04 Sep 2008 19:45:13 +0200
changeset 28135 4f6f0496e93c
parent 28134 1179b32f885d
child 28136 6c7f005cfef8
Concurrent message exchange via mailbox -- with unbounded queueing.
src/Pure/Concurrent/mailbox.ML
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/Concurrent/mailbox.ML	Thu Sep 04 19:45:13 2008 +0200
@@ -0,0 +1,46 @@
+(*  Title:      Pure/Concurrent/mailbox.ML
+    ID:         $Id$
+
+Concurrent message exchange via mailbox -- with unbounded queueing.
+*)
+
+signature MAILBOX =
+sig
+  type 'a T
+  val create: unit -> 'a T
+  val send: 'a -> 'a T -> unit
+  val receive: 'a T -> 'a
+end;
+
+structure Mailbox: MAILBOX =
+struct
+
+datatype 'a T = Mailbox of
+  {lock: Mutex.mutex,
+   cond: ConditionVar.conditionVar,
+   messages: 'a Queue.T ref};
+
+fun create () = Mailbox
+  {lock = Mutex.mutex (),
+   cond = ConditionVar.conditionVar (),
+   messages = ref Queue.empty};
+
+fun send msg (Mailbox {lock, cond, messages}) = uninterruptible (fn _ => fn () =>
+  let
+    val _ = Mutex.lock lock;
+    val _ = change messages (Queue.enqueue msg);
+    val _ = Mutex.unlock lock;
+    val _ = ConditionVar.signal cond;
+  in () end) ();
+
+fun receive (Mailbox {lock, cond, messages}) = uninterruptible (fn restore_attributes => fn () =>
+  let
+    val _ = Mutex.lock lock;
+    fun check () = while (Queue.is_empty (! messages)) do (ConditionVar.wait (cond, lock));
+    val _ = restore_attributes check () handle Interrupt => (Mutex.unlock lock; raise Interrupt);
+    val (msg, msgs) = Queue.dequeue (! messages);
+    val _ = messages := msgs;
+    val _ = Mutex.unlock lock;
+  in msg end) ();
+
+end;