src/Pure/Concurrent/mailbox.ML
author wenzelm
Mon, 08 Sep 2008 20:35:38 +0200
changeset 28170 a18cf8a0e656
parent 28158 96cbf4afdc7d
child 28172 a46751a649af
permissions -rw-r--r--
tuned Mailbox.send;
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     1
(*  Title:      Pure/Concurrent/mailbox.ML
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     2
    ID:         $Id$
28140
a74a1c580360 proper header;
wenzelm
parents: 28139
diff changeset
     3
    Author:     Makarius
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     4
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     5
Concurrent message exchange via mailbox -- with unbounded queueing.
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     6
*)
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     7
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     8
signature MAILBOX =
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     9
sig
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    10
  type 'a T
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    11
  val create: unit -> 'a T
28170
a18cf8a0e656 tuned Mailbox.send;
wenzelm
parents: 28158
diff changeset
    12
  val send: 'a T -> 'a -> unit
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    13
  val receive_timeout: Time.time -> 'a T -> 'a option
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    14
  val receive: 'a T -> 'a
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    15
end;
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    16
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    17
structure Mailbox: MAILBOX =
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    18
struct
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    19
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    20
(* datatype *)
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    21
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    22
datatype 'a T = Mailbox of
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    23
  {lock: Mutex.mutex,
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    24
   cond: ConditionVar.conditionVar,
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    25
   messages: 'a Queue.T ref};
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    26
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    27
fun create () = Mailbox
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    28
  {lock = Mutex.mutex (),
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    29
   cond = ConditionVar.conditionVar (),
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    30
   messages = ref Queue.empty};
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    31
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    32
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    33
(* send -- non-blocking *)
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    34
28170
a18cf8a0e656 tuned Mailbox.send;
wenzelm
parents: 28158
diff changeset
    35
fun send (Mailbox {lock, cond, messages}) msg = uninterruptible (fn _ => fn () =>
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    36
  let
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    37
    val _ = Mutex.lock lock;
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    38
    val _ = change messages (Queue.enqueue msg);
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    39
    val _ = Mutex.unlock lock;
28147
b44a7b259909 send: broadcast to all waiting threads;
wenzelm
parents: 28140
diff changeset
    40
    val _ = ConditionVar.broadcast cond;
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    41
  in () end) ();
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    42
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    43
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    44
(* receive -- blocking, interruptible, with timeout *)
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    45
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    46
fun receive_timeout timeout (Mailbox {lock, cond, messages}) =
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    47
  uninterruptible (fn restore_attributes => fn () =>
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    48
    let
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    49
      val _ = Mutex.lock lock;
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    50
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    51
      val limit = Time.+ (Time.now (), timeout);
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    52
      fun check () = not (Queue.is_empty (! messages)) orelse
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    53
        ConditionVar.waitUntil (cond, lock, limit) andalso check ();
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    54
      val ok = restore_attributes check ()
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    55
        handle Interrupt => (Mutex.unlock lock; raise Interrupt);
28158
wenzelm
parents: 28147
diff changeset
    56
      val res = if ok then SOME (change_result messages Queue.dequeue) else NONE;
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    57
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    58
      val _ = Mutex.unlock lock;
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    59
    in res end) ();
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    60
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    61
fun receive mailbox =
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    62
  (case receive_timeout (Time.fromSeconds 10) mailbox of
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    63
    NONE => receive mailbox
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    64
  | SOME x => x);
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    65
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    66
end;