src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Mon, 30 Jul 2007 19:22:27 +0200
changeset 24072 8b9e5d776ef3
parent 24069 8a15a04e36f6
child 24108 24e5587603b4
permissions -rw-r--r--
dequeue: wait loop while PROTECTED -- avoids race condition;
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     1
(*  Title:      Pure/ML-Systems/multithreading_polyml.ML
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     2
    ID:         $Id$
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     3
    Author:     Makarius
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     4
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     5
Multithreading in Poly/ML (version 5.1).
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     6
*)
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     7
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     8
open Thread;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     9
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    10
structure Multithreading: MULTITHREADING =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    11
struct
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    12
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
    13
(* options *)
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    14
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    15
val trace = ref false;
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    16
fun tracing msg =
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    17
  if ! trace
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    18
  then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    19
  else ();
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    20
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    21
val available = true;
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    22
val max_threads = ref 1;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    23
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    24
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    25
(* misc utils *)
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    26
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    27
fun show "" = ""
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    28
  | show name = " " ^ name;
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    29
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    30
fun show' "" = ""
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    31
  | show' name = " [" ^ name ^ "]";
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    32
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    33
fun inc i = (i := ! i + 1; ! i);
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    34
fun dec i = (i := ! i - 1; ! i);
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    35
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    36
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    37
(* critical section -- may be nested within the same thread *)
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    38
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    39
local
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    40
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    41
val critical_lock = Mutex.mutex ();
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    42
val critical_thread = ref (NONE: Thread.thread option);
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    43
val critical_name = ref "";
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    44
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    45
in
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    46
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    47
fun self_critical () =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    48
  (case ! critical_thread of
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    49
    NONE => false
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    50
  | SOME id => Thread.equal (id, Thread.self ()));
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    51
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
    52
fun NAMED_CRITICAL name e =
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    53
  if self_critical () then e ()
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    54
  else
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    55
    let
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    56
      val _ =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    57
        if Mutex.trylock critical_lock then ()
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    58
        else
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    59
          let
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    60
            val timer = Timer.startRealTimer ();
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    61
            val _ = tracing (fn () =>
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    62
              "CRITICAL" ^ show name ^ show' (! critical_name) ^ ": waiting");
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    63
            val _ = Mutex.lock critical_lock;
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    64
            val _ = tracing (fn () =>
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    65
              "CRITICAL" ^ show name ^ show' (! critical_name) ^ ": passed after " ^
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    66
              Time.toString (Timer.checkRealTimer timer));
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    67
          in () end;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    68
      val _ = critical_thread := SOME (Thread.self ());
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    69
      val _ = critical_name := name;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    70
      val result = Exn.capture e ();
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    71
      val _ = critical_name := "";
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    72
      val _ = critical_thread := NONE;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    73
      val _ = Mutex.unlock critical_lock;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    74
    in Exn.release result end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    75
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
    76
fun CRITICAL e = NAMED_CRITICAL "" e;
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    77
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    78
end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    79
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    80
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    81
(* scheduling -- non-interruptible threads working on a queue of tasks *)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    82
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    83
local
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    84
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    85
val protected_name = ref "";
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    86
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    87
in
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    88
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    89
fun schedule n next_task tasks =
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    90
  let
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    91
    (*protected execution*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    92
    val lock = Mutex.mutex ();
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    93
    fun PROTECTED name e =
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    94
      let
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    95
        val _ =
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    96
          if Mutex.trylock lock then ()
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    97
          else
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    98
           (tracing (fn () => "PROTECTED" ^ show name ^ show' (! protected_name) ^ ": waiting");
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    99
            Mutex.lock lock;
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   100
            tracing (fn () => "PROTECTED" ^ show name ^ show' (! protected_name) ^ ": passed"));
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   101
        val _ = protected_name := name;
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   102
        val res = Exn.capture e ();
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   103
        val _ = protected_name := "";
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   104
        val _ = Mutex.unlock lock;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   105
      in Exn.release res end;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   106
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   107
    (*wakeup condition*)
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   108
    val wakeup = ConditionVar.conditionVar ();
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   109
    fun wakeup_all () = ConditionVar.broadcast wakeup;
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   110
    fun wait () = ConditionVar.wait (wakeup, lock);
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   111
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   112
    (*the queue of tasks*)
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   113
    val queue = ref tasks;
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   114
    val active = ref 0;
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   115
    fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   116
    fun dequeue () =
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   117
      let
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   118
        val (next, tasks') = next_task (! queue);
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   119
        val _ = queue := tasks';
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   120
      in
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   121
        if Task.is_running (#1 next) then
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   122
         (dec active; trace_active ();
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   123
          wait ();
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   124
          inc active; trace_active ();
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   125
          dequeue ())
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   126
        else next
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   127
      end;
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   128
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   129
    (*worker threads*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   130
    val running = ref 0;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   131
    val status = ref ([]: exn list);
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   132
    fun work () =
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   133
      (case PROTECTED "dequeue" dequeue of
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   134
        (Task.Task f, cont) =>
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   135
          (case Exn.capture f () of
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   136
            Exn.Result () => continue cont
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   137
          | Exn.Exn exn =>
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   138
              (PROTECTED "status" (fn () => status := exn :: ! status); continue cont))
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   139
      | (Task.Finished, _) =>
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   140
         (PROTECTED "running" (fn () => (dec active; dec running));
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   141
          wakeup_all ()))
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   142
    and continue cont =
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   143
      (PROTECTED "cont" (fn () => queue := cont (! queue)); wakeup_all; work ());
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   144
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   145
    (*main control: fork and wait*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   146
    fun fork 0 = ()
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   147
      | fork k =
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   148
         (inc running; inc active;
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   149
          Thread.fork (work, [Thread.InterruptState Thread.InterruptDefer]);
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   150
          fork (k - 1));
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   151
    val _ = PROTECTED "main" (fn () =>
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   152
     (fork (Int.max (n, 1));
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   153
      while ! running <> 0 do (trace_active (); wait ())));
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   154
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   155
  in ! status end;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   156
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   157
end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   158
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   159
end;
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   160
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
   161
val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   162
val CRITICAL = Multithreading.CRITICAL;