23961
|
1 |
(* Title: Pure/ML-Systems/multithreading_polyml.ML
|
|
2 |
ID: $Id$
|
|
3 |
Author: Makarius
|
|
4 |
|
|
5 |
Multithreading in Poly/ML (version 5.1).
|
|
6 |
*)
|
|
7 |
|
|
8 |
open Thread;
|
|
9 |
|
|
10 |
structure Multithreading: MULTITHREADING =
|
|
11 |
struct
|
|
12 |
|
23981
|
13 |
val trace = ref false;
|
|
14 |
fun tracing msg =
|
|
15 |
if ! trace
|
|
16 |
then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
|
|
17 |
else ();
|
23961
|
18 |
|
23981
|
19 |
val available = true;
|
23973
|
20 |
val max_threads = ref 1;
|
|
21 |
|
|
22 |
|
23961
|
23 |
(* critical section -- may be nested within the same thread *)
|
|
24 |
|
|
25 |
local
|
|
26 |
|
|
27 |
val critical_lock = Mutex.mutex ();
|
|
28 |
val critical_thread = ref (NONE: Thread.thread option);
|
24060
|
29 |
val critical_name = ref "";
|
|
30 |
|
|
31 |
fun add_name "" = ""
|
|
32 |
| add_name name = " " ^ name;
|
|
33 |
|
|
34 |
fun add_name' "" = ""
|
|
35 |
| add_name' name = " [" ^ name ^ "]";
|
23961
|
36 |
|
|
37 |
in
|
|
38 |
|
|
39 |
fun self_critical () =
|
|
40 |
(case ! critical_thread of
|
|
41 |
NONE => false
|
|
42 |
| SOME id => Thread.equal (id, Thread.self ()));
|
|
43 |
|
23991
|
44 |
fun NAMED_CRITICAL name e =
|
23961
|
45 |
if self_critical () then e ()
|
|
46 |
else
|
|
47 |
let
|
|
48 |
val _ =
|
|
49 |
if Mutex.trylock critical_lock then ()
|
|
50 |
else
|
24060
|
51 |
let
|
|
52 |
val timer = Timer.startRealTimer ();
|
|
53 |
val _ = tracing (fn () =>
|
|
54 |
"CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": waiting for lock");
|
|
55 |
val _ = Mutex.lock critical_lock;
|
|
56 |
val _ = tracing (fn () =>
|
|
57 |
"CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": obtained lock after " ^
|
|
58 |
Time.toString (Timer.checkRealTimer timer));
|
|
59 |
in () end;
|
23961
|
60 |
val _ = critical_thread := SOME (Thread.self ());
|
24060
|
61 |
val _ = critical_name := name;
|
23961
|
62 |
val result = Exn.capture e ();
|
24060
|
63 |
val _ = critical_name := "";
|
23961
|
64 |
val _ = critical_thread := NONE;
|
|
65 |
val _ = Mutex.unlock critical_lock;
|
|
66 |
in Exn.release result end;
|
|
67 |
|
23991
|
68 |
fun CRITICAL e = NAMED_CRITICAL "" e;
|
23981
|
69 |
|
23961
|
70 |
end;
|
|
71 |
|
23973
|
72 |
|
|
73 |
(* scheduling -- non-interruptible threads working on a queue of tasks *)
|
|
74 |
|
|
75 |
fun schedule n next_task tasks =
|
|
76 |
let
|
|
77 |
(*protected execution*)
|
|
78 |
val lock = Mutex.mutex ();
|
23981
|
79 |
fun PROTECTED k e =
|
23973
|
80 |
let
|
23981
|
81 |
val _ =
|
|
82 |
if Mutex.trylock lock then ()
|
|
83 |
else
|
|
84 |
(tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
|
|
85 |
Mutex.lock lock;
|
|
86 |
tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
|
23973
|
87 |
val res = Exn.capture e ();
|
|
88 |
val _ = Mutex.unlock lock;
|
|
89 |
in Exn.release res end;
|
|
90 |
|
23981
|
91 |
(*the queue of tasks*)
|
23973
|
92 |
val queue = ref tasks;
|
23981
|
93 |
fun dequeue k = PROTECTED k (fn () =>
|
23973
|
94 |
let
|
23981
|
95 |
val (next, tasks') = next_task (! queue);
|
23973
|
96 |
val _ = queue := tasks';
|
23981
|
97 |
in next end);
|
23973
|
98 |
|
|
99 |
(*worker threads*)
|
|
100 |
val running = ref 0;
|
|
101 |
val status = ref ([]: exn list);
|
24060
|
102 |
val wakeup = ConditionVar.conditionVar ();
|
|
103 |
fun wait () = ConditionVar.wait (wakeup, lock);
|
23981
|
104 |
fun continue cont k =
|
24060
|
105 |
(PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.broadcast wakeup; work k ())
|
23981
|
106 |
and work k () =
|
|
107 |
(case dequeue k of
|
|
108 |
(Task.Task f, cont) =>
|
|
109 |
(tracing (fn () => "TASK " ^ Int.toString k);
|
|
110 |
case Exn.capture f () of
|
|
111 |
Exn.Result () => continue cont k
|
|
112 |
| Exn.Exn exn =>
|
|
113 |
(PROTECTED k (fn () => status := exn :: ! status); continue cont k))
|
|
114 |
| (Task.Running, _) =>
|
|
115 |
(tracing (fn () => "WAITING " ^ Int.toString k);
|
|
116 |
PROTECTED k wait; work k ())
|
|
117 |
| (Task.Finished, _) =>
|
|
118 |
(tracing (fn () => "TERMINATING " ^ Int.toString k);
|
|
119 |
PROTECTED k (fn () => running := ! running - 1);
|
24060
|
120 |
ConditionVar.broadcast wakeup));
|
23973
|
121 |
|
|
122 |
(*main control: fork and wait*)
|
|
123 |
fun fork 0 = ()
|
|
124 |
| fork k =
|
|
125 |
(running := ! running + 1;
|
|
126 |
Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
|
|
127 |
fork (k - 1));
|
23981
|
128 |
val _ = PROTECTED 0 (fn () =>
|
|
129 |
(fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
|
23973
|
130 |
|
|
131 |
in ! status end;
|
|
132 |
|
23961
|
133 |
end;
|
|
134 |
|
23991
|
135 |
val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
|
23961
|
136 |
val CRITICAL = Multithreading.CRITICAL;
|