src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Sun, 29 Jul 2007 17:28:57 +0200
changeset 24060 b643ee118928
parent 23991 d4417ba26706
child 24063 736c03ae92f5
permissions -rw-r--r--
critical: improved diagostics; schedule: proper broadcast on wakeup condition;
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     1
(*  Title:      Pure/ML-Systems/multithreading_polyml.ML
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     2
    ID:         $Id$
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     3
    Author:     Makarius
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     4
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     5
Multithreading in Poly/ML (version 5.1).
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     6
*)
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     7
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     8
open Thread;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
     9
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    10
structure Multithreading: MULTITHREADING =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    11
struct
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    12
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    13
val trace = ref false;
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    14
fun tracing msg =
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    15
  if ! trace
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    16
  then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    17
  else ();
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    18
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    19
val available = true;
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    20
val max_threads = ref 1;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    21
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    22
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    23
(* critical section -- may be nested within the same thread *)
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    24
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    25
local
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    26
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    27
val critical_lock = Mutex.mutex ();
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    28
val critical_thread = ref (NONE: Thread.thread option);
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    29
val critical_name = ref "";
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    30
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    31
fun add_name "" = ""
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    32
  | add_name name = " " ^ name;
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    33
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    34
fun add_name' "" = ""
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    35
  | add_name' name = " [" ^ name ^ "]";
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    36
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    37
in
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    38
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    39
fun self_critical () =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    40
  (case ! critical_thread of
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    41
    NONE => false
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    42
  | SOME id => Thread.equal (id, Thread.self ()));
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    43
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
    44
fun NAMED_CRITICAL name e =
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    45
  if self_critical () then e ()
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    46
  else
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    47
    let
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    48
      val _ =
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    49
        if Mutex.trylock critical_lock then ()
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    50
        else
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    51
          let
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    52
            val timer = Timer.startRealTimer ();
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    53
            val _ = tracing (fn () =>
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    54
              "CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": waiting for lock");
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    55
            val _ = Mutex.lock critical_lock;
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    56
            val _ = tracing (fn () =>
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    57
              "CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": obtained lock after " ^
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    58
              Time.toString (Timer.checkRealTimer timer));
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    59
          in () end;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    60
      val _ = critical_thread := SOME (Thread.self ());
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    61
      val _ = critical_name := name;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    62
      val result = Exn.capture e ();
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
    63
      val _ = critical_name := "";
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    64
      val _ = critical_thread := NONE;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    65
      val _ = Mutex.unlock critical_lock;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    66
    in Exn.release result end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    67
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
    68
fun CRITICAL e = NAMED_CRITICAL "" e;
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    69
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    70
end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
    71
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    72
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    73
(* scheduling -- non-interruptible threads working on a queue of tasks *)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    74
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    75
fun schedule n next_task tasks =
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    76
  let
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    77
    (*protected execution*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    78
    val lock = Mutex.mutex ();
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    79
    fun PROTECTED k e =
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    80
      let
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    81
        val _ =
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    82
          if Mutex.trylock lock then ()
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    83
          else
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    84
           (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    85
            Mutex.lock lock;
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    86
            tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    87
        val res = Exn.capture e ();
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    88
        val _ = Mutex.unlock lock;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    89
      in Exn.release res end;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    90
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    91
    (*the queue of tasks*)
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    92
    val queue = ref tasks;
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    93
    fun dequeue k = PROTECTED k (fn () =>
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    94
      let
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    95
        val (next, tasks') = next_task (! queue);
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    96
        val _ = queue := tasks';
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
    97
      in next end);
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    98
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
    99
    (*worker threads*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   100
    val running = ref 0;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   101
    val status = ref ([]: exn list);
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
   102
    val wakeup = ConditionVar.conditionVar ();
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
   103
    fun wait () = ConditionVar.wait (wakeup, lock);
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   104
    fun continue cont k =
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
   105
      (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.broadcast wakeup; work k ())
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   106
    and work k () =
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   107
      (case dequeue k of
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   108
        (Task.Task f, cont) =>
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   109
          (tracing (fn () => "TASK " ^ Int.toString k);
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   110
           case Exn.capture f () of
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   111
            Exn.Result () => continue cont k
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   112
          | Exn.Exn exn =>
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   113
              (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   114
      | (Task.Running, _) =>
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   115
          (tracing (fn () => "WAITING " ^ Int.toString k);
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   116
           PROTECTED k wait; work k ())
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   117
      | (Task.Finished, _) =>
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   118
         (tracing (fn () => "TERMINATING " ^ Int.toString k);
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   119
          PROTECTED k (fn () => running := ! running - 1);
24060
b643ee118928 critical: improved diagostics;
wenzelm
parents: 23991
diff changeset
   120
          ConditionVar.broadcast wakeup));
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   121
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   122
    (*main control: fork and wait*)
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   123
    fun fork 0 = ()
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   124
      | fork k =
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   125
         (running := ! running + 1;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   126
          Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   127
          fork (k - 1));
23981
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   128
    val _ = PROTECTED 0 (fn () =>
03b71bf91318 added trace flag, official tracing operation;
wenzelm
parents: 23973
diff changeset
   129
     (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
23973
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   130
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   131
  in ! status end;
b6ce6de5b700 renamed number_of_threads to max_threads;
wenzelm
parents: 23961
diff changeset
   132
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   133
end;
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   134
23991
d4417ba26706 renamed CRITICAL' to NAMED_CRITICAL;
wenzelm
parents: 23981
diff changeset
   135
val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
23961
9e7e1e309ebd Multithreading in Poly/ML (version 5.1).
wenzelm
parents:
diff changeset
   136
val CRITICAL = Multithreading.CRITICAL;