author | haftmann |
Thu, 09 Oct 2008 08:47:27 +0200 | |
changeset 28537 | 1e84256d1a8a |
parent 28443 | de653f1ad78b |
child 28576 | dc4aae271c41 |
permissions | -rw-r--r-- |
28135
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
1 |
(* Title: Pure/Concurrent/mailbox.ML |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
2 |
ID: $Id$ |
28140 | 3 |
Author: Makarius |
28135
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
4 |
|
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
5 |
Concurrent message exchange via mailbox -- with unbounded queueing. |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
6 |
*) |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
7 |
|
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
8 |
signature MAILBOX = |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
9 |
sig |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
10 |
type 'a T |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
11 |
val create: unit -> 'a T |
28170 | 12 |
val send: 'a T -> 'a -> unit |
28139 | 13 |
val receive_timeout: Time.time -> 'a T -> 'a option |
28135
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
14 |
val receive: 'a T -> 'a |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
15 |
end; |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
16 |
|
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
17 |
structure Mailbox: MAILBOX = |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
18 |
struct |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
19 |
|
28139 | 20 |
(* datatype *) |
21 |
||
28135
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
22 |
datatype 'a T = Mailbox of |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
23 |
{lock: Mutex.mutex, |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
24 |
cond: ConditionVar.conditionVar, |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
25 |
messages: 'a Queue.T ref}; |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
26 |
|
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
27 |
fun create () = Mailbox |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
28 |
{lock = Mutex.mutex (), |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
29 |
cond = ConditionVar.conditionVar (), |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
30 |
messages = ref Queue.empty}; |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
31 |
|
28139 | 32 |
|
33 |
(* send -- non-blocking *) |
|
34 |
||
28170 | 35 |
fun send (Mailbox {lock, cond, messages}) msg = uninterruptible (fn _ => fn () => |
28135
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
36 |
let |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
37 |
val _ = Mutex.lock lock; |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
38 |
val _ = change messages (Queue.enqueue msg); |
28172 | 39 |
val _ = ConditionVar.broadcast cond; |
28135
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
40 |
val _ = Mutex.unlock lock; |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
41 |
in () end) (); |
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
42 |
|
28139 | 43 |
|
44 |
(* receive -- blocking, interruptible, with timeout *) |
|
45 |
||
46 |
fun receive_timeout timeout (Mailbox {lock, cond, messages}) = |
|
47 |
uninterruptible (fn restore_attributes => fn () => |
|
48 |
let |
|
49 |
val _ = Mutex.lock lock; |
|
50 |
||
51 |
val limit = Time.+ (Time.now (), timeout); |
|
52 |
fun check () = not (Queue.is_empty (! messages)) orelse |
|
53 |
ConditionVar.waitUntil (cond, lock, limit) andalso check (); |
|
54 |
val ok = restore_attributes check () |
|
28443 | 55 |
handle Exn.Interrupt => (Mutex.unlock lock; raise Exn.Interrupt); |
28158 | 56 |
val res = if ok then SOME (change_result messages Queue.dequeue) else NONE; |
28139 | 57 |
|
58 |
val _ = Mutex.unlock lock; |
|
59 |
in res end) (); |
|
60 |
||
61 |
fun receive mailbox = |
|
62 |
(case receive_timeout (Time.fromSeconds 10) mailbox of |
|
63 |
NONE => receive mailbox |
|
64 |
| SOME x => x); |
|
28135
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
65 |
|
4f6f0496e93c
Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff
changeset
|
66 |
end; |