src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Fri, 03 Aug 2007 22:33:09 +0200
changeset 24150 ed724867099a
parent 24144 ec51a0f7eefe
child 24208 f4cafbaa05e4
permissions -rw-r--r--
sort indexes according to symbolic update_time (multithreading-safe);
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     1
(*  Title:      Pure/ML-Systems/multithreading_polyml.ML
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     2
    ID:         $Id$
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     3
    Author:     Makarius
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     4
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     5
Multithreading in Poly/ML (version 5.1).
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     6
*)
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     7
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     8
open Thread;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     9
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    10
structure Multithreading: MULTITHREADING =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    11
struct
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    12
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
    13
(* options *)
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    14
24119
06965b38c5e9 tracing: level;
wenzelm
parents: 24109
diff changeset
    15
val trace = ref 0;
06965b38c5e9 tracing: level;
wenzelm
parents: 24109
diff changeset
    16
fun tracing level msg =
06965b38c5e9 tracing: level;
wenzelm
parents: 24109
diff changeset
    17
  if level <= ! trace
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    18
  then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    19
  else ();
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    20
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    21
val available = true;
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    22
val max_threads = ref 1;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    23
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    24
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    25
(* misc utils *)
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    26
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    27
fun show "" = ""
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    28
  | show name = " " ^ name;
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    29
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    30
fun show' "" = ""
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    31
  | show' name = " [" ^ name ^ "]";
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    32
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    33
fun inc i = (i := ! i + 1; ! i);
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    34
fun dec i = (i := ! i - 1; ! i);
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    35
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    36
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    37
(* critical section -- may be nested within the same thread *)
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    38
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    39
local
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    40
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    41
val critical_lock = Mutex.mutex ();
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    42
val critical_thread = ref (NONE: Thread.thread option);
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    43
val critical_name = ref "";
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    44
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    45
in
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    46
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    47
fun self_critical () =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    48
  (case ! critical_thread of
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    49
    NONE => false
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    50
  | SOME id => Thread.equal (id, Thread.self ()));
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    51
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
    52
fun NAMED_CRITICAL name e =
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    53
  if self_critical () then e ()
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    54
  else
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    55
    let
24144
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
    56
      val name' = ! critical_name;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    57
      val _ =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    58
        if Mutex.trylock critical_lock then ()
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    59
        else
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    60
          let
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    61
            val timer = Timer.startRealTimer ();
24144
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
    62
            val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting");
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    63
            val _ = Mutex.lock critical_lock;
24144
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
    64
            val time = Timer.checkRealTimer timer;
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
    65
            val _ = tracing (if Time.> (time, Time.fromMilliseconds 10) then 3 else 4) (fn () =>
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
    66
              "CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time);
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    67
          in () end;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    68
      val _ = critical_thread := SOME (Thread.self ());
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    69
      val _ = critical_name := name;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    70
      val result = Exn.capture e ();
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    71
      val _ = critical_name := "";
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    72
      val _ = critical_thread := NONE;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    73
      val _ = Mutex.unlock critical_lock;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    74
    in Exn.release result end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    75
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
    76
fun CRITICAL e = NAMED_CRITICAL "" e;
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    77
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    78
end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    79
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    80
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    81
(* scheduling -- non-interruptible threads working on a queue of tasks *)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    82
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    83
local
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    84
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    85
val protected_name = ref "";
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    86
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
    87
in
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    88
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    89
fun schedule n next_task tasks =
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    90
  let
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    91
    (*protected execution*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    92
    val lock = Mutex.mutex ();
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
    93
    fun PROTECTED name e =
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    94
      let
24144
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
    95
        val name' = ! protected_name;
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    96
        val _ =
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    97
          if Mutex.trylock lock then ()
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    98
          else
24144
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
    99
            let
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
   100
              val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting");
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
   101
              val _ = Mutex.lock lock;
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
   102
              val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed");
ec51a0f7eefe tuned tracing;
wenzelm
parents: 24119
diff changeset
   103
            in () end;
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   104
        val _ = protected_name := name;
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   105
        val res = Exn.capture e ();
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   106
        val _ = protected_name := "";
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   107
        val _ = Mutex.unlock lock;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   108
      in Exn.release res end;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   109
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   110
    (*wakeup condition*)
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   111
    val wakeup = ConditionVar.conditionVar ();
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   112
    fun wakeup_all () = ConditionVar.broadcast wakeup;
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   113
    fun wait () = ConditionVar.wait (wakeup, lock);
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   114
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   115
    (*the queue of tasks*)
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   116
    val queue = ref tasks;
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   117
    val active = ref 0;
24119
06965b38c5e9 tracing: level;
wenzelm
parents: 24109
diff changeset
   118
    fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   119
    fun dequeue () =
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   120
      let
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   121
        val (next, tasks') = next_task (! queue);
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   122
        val _ = queue := tasks';
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   123
      in
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   124
        if Task.is_running (#1 next) then
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   125
         (dec active; trace_active ();
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   126
          wait ();
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   127
          inc active; trace_active ();
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   128
          dequeue ())
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   129
        else next
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   130
      end;
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   131
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   132
    (*worker threads*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   133
    val running = ref 0;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   134
    val status = ref ([]: exn list);
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   135
    fun work () =
24072
8b9e5d776ef3 dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents: 24069
diff changeset
   136
      (case PROTECTED "dequeue" dequeue of
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   137
        (Task.Task f, cont) =>
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   138
          (case Exn.capture f () of
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   139
            Exn.Result () => continue cont
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   140
          | Exn.Exn exn =>
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   141
              (PROTECTED "status" (fn () => status := exn :: ! status); continue cont))
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   142
      | (Task.Finished, _) =>
24108
24e5587603b4 "running": PROTECTED wakeup;
wenzelm
parents: 24072
diff changeset
   143
         (PROTECTED "running" (fn () => (dec active; dec running; wakeup_all ()))))
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   144
    and continue cont =
24109
952efb77cf91 oops -- fixed syntax;
wenzelm
parents: 24108
diff changeset
   145
      (PROTECTED "cont" (fn () => (queue := cont (! queue); wakeup_all ())); work ());
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   146
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   147
    (*main control: fork and wait*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   148
    fun fork 0 = ()
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   149
      | fork k =
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   150
         (inc running; inc active;
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   151
          Thread.fork (work, [Thread.InterruptState Thread.InterruptDefer]);
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   152
          fork (k - 1));
24063
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   153
    val _ = PROTECTED "main" (fn () =>
736c03ae92f5 more informative tracing;
wenzelm
parents: 24060
diff changeset
   154
     (fork (Int.max (n, 1));
24066
fb455cb475df more informative tracing;
wenzelm
parents: 24063
diff changeset
   155
      while ! running <> 0 do (trace_active (); wait ())));
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   156
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   157
  in ! status end;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   158
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   159
end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   160
24069
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   161
end;
8a15a04e36f6 tuned msgs;
wenzelm
parents: 24066
diff changeset
   162
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
   163
val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   164
val CRITICAL = Multithreading.CRITICAL;