src/Pure/Concurrent/mailbox.ML
author wenzelm
Thu, 09 Oct 2008 20:53:13 +0200
changeset 28546 d57bfb44c9e5
parent 28443 de653f1ad78b
child 28576 dc4aae271c41
permissions -rw-r--r--
Dummy version of parallel list combinators -- plain sequential evaluation.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     1
(*  Title:      Pure/Concurrent/mailbox.ML
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     2
    ID:         $Id$
28140
a74a1c580360 proper header;
wenzelm
parents: 28139
diff changeset
     3
    Author:     Makarius
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     4
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     5
Concurrent message exchange via mailbox -- with unbounded queueing.
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     6
*)
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     7
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     8
signature MAILBOX =
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
     9
sig
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    10
  type 'a T
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    11
  val create: unit -> 'a T
28170
a18cf8a0e656 tuned Mailbox.send;
wenzelm
parents: 28158
diff changeset
    12
  val send: 'a T -> 'a -> unit
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    13
  val receive_timeout: Time.time -> 'a T -> 'a option
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    14
  val receive: 'a T -> 'a
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    15
end;
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    16
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    17
structure Mailbox: MAILBOX =
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    18
struct
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    19
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    20
(* datatype *)
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    21
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    22
datatype 'a T = Mailbox of
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    23
  {lock: Mutex.mutex,
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    24
   cond: ConditionVar.conditionVar,
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    25
   messages: 'a Queue.T ref};
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    26
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    27
fun create () = Mailbox
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    28
  {lock = Mutex.mutex (),
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    29
   cond = ConditionVar.conditionVar (),
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    30
   messages = ref Queue.empty};
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    31
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    32
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    33
(* send -- non-blocking *)
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    34
28170
a18cf8a0e656 tuned Mailbox.send;
wenzelm
parents: 28158
diff changeset
    35
fun send (Mailbox {lock, cond, messages}) msg = uninterruptible (fn _ => fn () =>
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    36
  let
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    37
    val _ = Mutex.lock lock;
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    38
    val _ = change messages (Queue.enqueue msg);
28172
a46751a649af send: broadcast condition while locked!
wenzelm
parents: 28170
diff changeset
    39
    val _ = ConditionVar.broadcast cond;
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    40
    val _ = Mutex.unlock lock;
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    41
  in () end) ();
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    42
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    43
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    44
(* receive -- blocking, interruptible, with timeout *)
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    45
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    46
fun receive_timeout timeout (Mailbox {lock, cond, messages}) =
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    47
  uninterruptible (fn restore_attributes => fn () =>
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    48
    let
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    49
      val _ = Mutex.lock lock;
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    50
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    51
      val limit = Time.+ (Time.now (), timeout);
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    52
      fun check () = not (Queue.is_empty (! messages)) orelse
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    53
        ConditionVar.waitUntil (cond, lock, limit) andalso check ();
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    54
      val ok = restore_attributes check ()
28443
de653f1ad78b more robust treatment of Interrupt (cf. exn.ML);
wenzelm
parents: 28172
diff changeset
    55
        handle Exn.Interrupt => (Mutex.unlock lock; raise Exn.Interrupt);
28158
wenzelm
parents: 28147
diff changeset
    56
      val res = if ok then SOME (change_result messages Queue.dequeue) else NONE;
28139
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    57
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    58
      val _ = Mutex.unlock lock;
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    59
    in res end) ();
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    60
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    61
fun receive mailbox =
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    62
  (case receive_timeout (Time.fromSeconds 10) mailbox of
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    63
    NONE => receive mailbox
831e545c655e added receive_timeout;
wenzelm
parents: 28135
diff changeset
    64
  | SOME x => x);
28135
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    65
4f6f0496e93c Concurrent message exchange via mailbox -- with unbounded queueing.
wenzelm
parents:
diff changeset
    66
end;