author | wenzelm |
Mon, 30 Jul 2007 19:22:27 +0200 | |
changeset 24072 | 8b9e5d776ef3 |
parent 24069 | 8a15a04e36f6 |
child 24108 | 24e5587603b4 |
permissions | -rw-r--r-- |
23961 | 1 |
(* Title: Pure/ML-Systems/multithreading_polyml.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
5 |
Multithreading in Poly/ML (version 5.1). |
|
6 |
*) |
|
7 |
||
8 |
open Thread; |
|
9 |
||
10 |
structure Multithreading: MULTITHREADING = |
|
11 |
struct |
|
12 |
||
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
13 |
(* options *) |
24069 | 14 |
|
23981 | 15 |
val trace = ref false; |
16 |
fun tracing msg = |
|
17 |
if ! trace |
|
18 |
then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) |
|
19 |
else (); |
|
23961 | 20 |
|
23981 | 21 |
val available = true; |
23973 | 22 |
val max_threads = ref 1; |
23 |
||
24 |
||
24069 | 25 |
(* misc utils *) |
26 |
||
27 |
fun show "" = "" |
|
28 |
| show name = " " ^ name; |
|
29 |
||
30 |
fun show' "" = "" |
|
31 |
| show' name = " [" ^ name ^ "]"; |
|
32 |
||
33 |
fun inc i = (i := ! i + 1; ! i); |
|
34 |
fun dec i = (i := ! i - 1; ! i); |
|
35 |
||
36 |
||
23961 | 37 |
(* critical section -- may be nested within the same thread *) |
38 |
||
39 |
local |
|
40 |
||
24063 | 41 |
val critical_lock = Mutex.mutex (); |
42 |
val critical_thread = ref (NONE: Thread.thread option); |
|
43 |
val critical_name = ref ""; |
|
44 |
||
23961 | 45 |
in |
46 |
||
47 |
fun self_critical () = |
|
48 |
(case ! critical_thread of |
|
49 |
NONE => false |
|
50 |
| SOME id => Thread.equal (id, Thread.self ())); |
|
51 |
||
23991 | 52 |
fun NAMED_CRITICAL name e = |
23961 | 53 |
if self_critical () then e () |
54 |
else |
|
55 |
let |
|
56 |
val _ = |
|
57 |
if Mutex.trylock critical_lock then () |
|
58 |
else |
|
24060 | 59 |
let |
60 |
val timer = Timer.startRealTimer (); |
|
61 |
val _ = tracing (fn () => |
|
24069 | 62 |
"CRITICAL" ^ show name ^ show' (! critical_name) ^ ": waiting"); |
24060 | 63 |
val _ = Mutex.lock critical_lock; |
64 |
val _ = tracing (fn () => |
|
24069 | 65 |
"CRITICAL" ^ show name ^ show' (! critical_name) ^ ": passed after " ^ |
24060 | 66 |
Time.toString (Timer.checkRealTimer timer)); |
67 |
in () end; |
|
23961 | 68 |
val _ = critical_thread := SOME (Thread.self ()); |
24060 | 69 |
val _ = critical_name := name; |
23961 | 70 |
val result = Exn.capture e (); |
24060 | 71 |
val _ = critical_name := ""; |
23961 | 72 |
val _ = critical_thread := NONE; |
73 |
val _ = Mutex.unlock critical_lock; |
|
74 |
in Exn.release result end; |
|
75 |
||
23991 | 76 |
fun CRITICAL e = NAMED_CRITICAL "" e; |
23981 | 77 |
|
23961 | 78 |
end; |
79 |
||
23973 | 80 |
|
81 |
(* scheduling -- non-interruptible threads working on a queue of tasks *) |
|
82 |
||
24069 | 83 |
local |
84 |
||
85 |
val protected_name = ref ""; |
|
86 |
||
87 |
in |
|
24063 | 88 |
|
23973 | 89 |
fun schedule n next_task tasks = |
90 |
let |
|
91 |
(*protected execution*) |
|
92 |
val lock = Mutex.mutex (); |
|
24063 | 93 |
fun PROTECTED name e = |
23973 | 94 |
let |
23981 | 95 |
val _ = |
96 |
if Mutex.trylock lock then () |
|
97 |
else |
|
24069 | 98 |
(tracing (fn () => "PROTECTED" ^ show name ^ show' (! protected_name) ^ ": waiting"); |
23981 | 99 |
Mutex.lock lock; |
24069 | 100 |
tracing (fn () => "PROTECTED" ^ show name ^ show' (! protected_name) ^ ": passed")); |
101 |
val _ = protected_name := name; |
|
23973 | 102 |
val res = Exn.capture e (); |
24069 | 103 |
val _ = protected_name := ""; |
23973 | 104 |
val _ = Mutex.unlock lock; |
105 |
in Exn.release res end; |
|
106 |
||
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
107 |
(*wakeup condition*) |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
108 |
val wakeup = ConditionVar.conditionVar (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
109 |
fun wakeup_all () = ConditionVar.broadcast wakeup; |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
110 |
fun wait () = ConditionVar.wait (wakeup, lock); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
111 |
|
23981 | 112 |
(*the queue of tasks*) |
23973 | 113 |
val queue = ref tasks; |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
114 |
val active = ref 0; |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
115 |
fun trace_active () = tracing (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active"); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
116 |
fun dequeue () = |
23973 | 117 |
let |
23981 | 118 |
val (next, tasks') = next_task (! queue); |
23973 | 119 |
val _ = queue := tasks'; |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
120 |
in |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
121 |
if Task.is_running (#1 next) then |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
122 |
(dec active; trace_active (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
123 |
wait (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
124 |
inc active; trace_active (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
125 |
dequeue ()) |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
126 |
else next |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
127 |
end; |
23973 | 128 |
|
129 |
(*worker threads*) |
|
130 |
val running = ref 0; |
|
131 |
val status = ref ([]: exn list); |
|
24066 | 132 |
fun work () = |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
133 |
(case PROTECTED "dequeue" dequeue of |
23981 | 134 |
(Task.Task f, cont) => |
24063 | 135 |
(case Exn.capture f () of |
24066 | 136 |
Exn.Result () => continue cont |
23981 | 137 |
| Exn.Exn exn => |
24066 | 138 |
(PROTECTED "status" (fn () => status := exn :: ! status); continue cont)) |
23981 | 139 |
| (Task.Finished, _) => |
24063 | 140 |
(PROTECTED "running" (fn () => (dec active; dec running)); |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
141 |
wakeup_all ())) |
24066 | 142 |
and continue cont = |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
143 |
(PROTECTED "cont" (fn () => queue := cont (! queue)); wakeup_all; work ()); |
23973 | 144 |
|
145 |
(*main control: fork and wait*) |
|
146 |
fun fork 0 = () |
|
147 |
| fork k = |
|
24063 | 148 |
(inc running; inc active; |
24066 | 149 |
Thread.fork (work, [Thread.InterruptState Thread.InterruptDefer]); |
23973 | 150 |
fork (k - 1)); |
24063 | 151 |
val _ = PROTECTED "main" (fn () => |
152 |
(fork (Int.max (n, 1)); |
|
24066 | 153 |
while ! running <> 0 do (trace_active (); wait ()))); |
23973 | 154 |
|
155 |
in ! status end; |
|
156 |
||
23961 | 157 |
end; |
158 |
||
24069 | 159 |
end; |
160 |
||
23991 | 161 |
val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL; |
23961 | 162 |
val CRITICAL = Multithreading.CRITICAL; |