author | paulson |
Fri, 05 Oct 2007 09:59:03 +0200 | |
changeset 24854 | 0ebcd575d3c6 |
parent 24688 | a5754ca5c510 |
child 25704 | df9c8074ff09 |
permissions | -rw-r--r-- |
23961 | 1 |
(* Title: Pure/ML-Systems/multithreading_polyml.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
5 |
Multithreading in Poly/ML 5.1 or later (cf. polyml/basis/Thread.sml). |
23961 | 6 |
*) |
7 |
||
8 |
open Thread; |
|
9 |
||
24208 | 10 |
signature MULTITHREADING = |
11 |
sig |
|
12 |
include MULTITHREADING |
|
24668 | 13 |
val ignore_interrupt: ('a -> 'b) -> 'a -> 'b |
14 |
val raise_interrupt: ('a -> 'b) -> 'a -> 'b |
|
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
15 |
structure TimeLimit: TIME_LIMIT |
24208 | 16 |
end; |
17 |
||
23961 | 18 |
structure Multithreading: MULTITHREADING = |
19 |
struct |
|
20 |
||
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
21 |
(* options *) |
24069 | 22 |
|
24119 | 23 |
val trace = ref 0; |
24 |
fun tracing level msg = |
|
25 |
if level <= ! trace |
|
23981 | 26 |
then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) |
27 |
else (); |
|
23961 | 28 |
|
23981 | 29 |
val available = true; |
23973 | 30 |
val max_threads = ref 1; |
31 |
||
32 |
||
24069 | 33 |
(* misc utils *) |
34 |
||
24208 | 35 |
fun cons x xs = x :: xs; |
24069 | 36 |
|
24208 | 37 |
fun change r f = r := f (! r); |
24069 | 38 |
|
39 |
fun inc i = (i := ! i + 1; ! i); |
|
40 |
fun dec i = (i := ! i - 1; ! i); |
|
41 |
||
24208 | 42 |
fun show "" = "" | show name = " " ^ name; |
43 |
fun show' "" = "" | show' name = " [" ^ name ^ "]"; |
|
44 |
||
45 |
||
46 |
(* thread attributes *) |
|
47 |
||
48 |
fun with_attributes new_atts f x = |
|
49 |
let |
|
50 |
val orig_atts = Thread.getAttributes (); |
|
51 |
fun restore () = Thread.setAttributes orig_atts; |
|
52 |
in |
|
53 |
Exn.release |
|
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
54 |
(*RACE for fully asynchronous interrupts!*) |
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
55 |
(let |
24208 | 56 |
val _ = Thread.setAttributes new_atts; |
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
57 |
val result = Exn.capture (f orig_atts) x; |
24208 | 58 |
val _ = restore (); |
59 |
in result end |
|
60 |
handle Interrupt => (restore (); Exn.Exn Interrupt)) |
|
61 |
end; |
|
62 |
||
24668 | 63 |
|
64 |
(* interrupt handling *) |
|
65 |
||
24297
a50cdc42798d
improved treatment of global interrupts: Thread.EnableBroadcastInterrupt, redefine ignore/raise_interrupt;
wenzelm
parents:
24291
diff
changeset
|
66 |
fun uninterruptible f x = with_attributes |
a50cdc42798d
improved treatment of global interrupts: Thread.EnableBroadcastInterrupt, redefine ignore/raise_interrupt;
wenzelm
parents:
24291
diff
changeset
|
67 |
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer] f x; |
24208 | 68 |
|
24297
a50cdc42798d
improved treatment of global interrupts: Thread.EnableBroadcastInterrupt, redefine ignore/raise_interrupt;
wenzelm
parents:
24291
diff
changeset
|
69 |
fun interruptible f x = with_attributes |
a50cdc42798d
improved treatment of global interrupts: Thread.EnableBroadcastInterrupt, redefine ignore/raise_interrupt;
wenzelm
parents:
24291
diff
changeset
|
70 |
[Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce] f x; |
24208 | 71 |
|
24668 | 72 |
fun ignore_interrupt f = uninterruptible (fn _ => f); |
73 |
fun raise_interrupt f = interruptible (fn _ => f); |
|
74 |
||
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
75 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
76 |
(* execution with time limit *) |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
77 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
78 |
structure TimeLimit = |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
79 |
struct |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
80 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
81 |
exception TimeOut; |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
82 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
83 |
fun timeLimit time f x = |
24668 | 84 |
uninterruptible (fn atts => fn () => |
85 |
let |
|
86 |
val worker = Thread.self (); |
|
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
87 |
val timeout = ref false; |
24668 | 88 |
val watchdog = Thread.fork (interruptible (fn _ => fn () => |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
89 |
(OS.Process.sleep time; timeout := true; Thread.interrupt worker)), []); |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
90 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
91 |
(*RACE! timeout signal vs. external Interrupt*) |
24668 | 92 |
val result = Exn.capture (with_attributes atts (fn _ => f)) x; |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
93 |
val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false); |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
94 |
|
24668 | 95 |
val _ = Thread.interrupt watchdog handle Thread _ => (); |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
96 |
in if was_timeout then raise TimeOut else Exn.release result end) (); |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
97 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
98 |
end; |
24668 | 99 |
|
24069 | 100 |
|
23961 | 101 |
(* critical section -- may be nested within the same thread *) |
102 |
||
103 |
local |
|
104 |
||
24063 | 105 |
val critical_lock = Mutex.mutex (); |
106 |
val critical_thread = ref (NONE: Thread.thread option); |
|
107 |
val critical_name = ref ""; |
|
108 |
||
23961 | 109 |
in |
110 |
||
111 |
fun self_critical () = |
|
112 |
(case ! critical_thread of |
|
113 |
NONE => false |
|
114 |
| SOME id => Thread.equal (id, Thread.self ())); |
|
115 |
||
23991 | 116 |
fun NAMED_CRITICAL name e = |
23961 | 117 |
if self_critical () then e () |
118 |
else |
|
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
119 |
uninterruptible (fn atts => fn () => |
24208 | 120 |
let |
121 |
val name' = ! critical_name; |
|
122 |
val _ = |
|
123 |
if Mutex.trylock critical_lock then () |
|
124 |
else |
|
125 |
let |
|
126 |
val timer = Timer.startRealTimer (); |
|
127 |
val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting"); |
|
128 |
val _ = Mutex.lock critical_lock; |
|
129 |
val time = Timer.checkRealTimer timer; |
|
130 |
val _ = tracing (if Time.> (time, Time.fromMilliseconds 10) then 3 else 4) (fn () => |
|
131 |
"CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time); |
|
132 |
in () end; |
|
133 |
val _ = critical_thread := SOME (Thread.self ()); |
|
134 |
val _ = critical_name := name; |
|
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
135 |
val result = Exn.capture (with_attributes atts (fn _ => e)) (); |
24208 | 136 |
val _ = critical_name := ""; |
137 |
val _ = critical_thread := NONE; |
|
138 |
val _ = Mutex.unlock critical_lock; |
|
139 |
in Exn.release result end) (); |
|
23961 | 140 |
|
23991 | 141 |
fun CRITICAL e = NAMED_CRITICAL "" e; |
23981 | 142 |
|
23961 | 143 |
end; |
144 |
||
23973 | 145 |
|
24208 | 146 |
(* scheduling -- multiple threads working on a queue of tasks *) |
147 |
||
148 |
datatype 'a task = |
|
149 |
Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; |
|
23973 | 150 |
|
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
151 |
fun schedule n next_task = uninterruptible (fn _ => fn tasks => |
23973 | 152 |
let |
153 |
(*protected execution*) |
|
154 |
val lock = Mutex.mutex (); |
|
24672 | 155 |
val protected_name = ref ""; |
24063 | 156 |
fun PROTECTED name e = |
23973 | 157 |
let |
24144 | 158 |
val name' = ! protected_name; |
23981 | 159 |
val _ = |
160 |
if Mutex.trylock lock then () |
|
161 |
else |
|
24144 | 162 |
let |
163 |
val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting"); |
|
164 |
val _ = Mutex.lock lock; |
|
165 |
val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed"); |
|
166 |
in () end; |
|
24069 | 167 |
val _ = protected_name := name; |
23973 | 168 |
val res = Exn.capture e (); |
24069 | 169 |
val _ = protected_name := ""; |
23973 | 170 |
val _ = Mutex.unlock lock; |
171 |
in Exn.release res end; |
|
172 |
||
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
173 |
(*wakeup condition*) |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
174 |
val wakeup = ConditionVar.conditionVar (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
175 |
fun wakeup_all () = ConditionVar.broadcast wakeup; |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
176 |
fun wait () = ConditionVar.wait (wakeup, lock); |
24291 | 177 |
fun wait_timeout () = ConditionVar.waitUntil (wakeup, lock, Time.now () + Time.fromSeconds 1); |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
178 |
|
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
179 |
(*queue of tasks*) |
23973 | 180 |
val queue = ref tasks; |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
181 |
val active = ref 0; |
24119 | 182 |
fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active"); |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
183 |
fun dequeue () = |
23973 | 184 |
let |
23981 | 185 |
val (next, tasks') = next_task (! queue); |
23973 | 186 |
val _ = queue := tasks'; |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
187 |
in |
24208 | 188 |
(case next of Wait => |
189 |
(dec active; trace_active (); |
|
190 |
wait (); |
|
191 |
inc active; trace_active (); |
|
192 |
dequeue ()) |
|
193 |
| _ => next) |
|
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
194 |
end; |
23973 | 195 |
|
24208 | 196 |
(*pool of running threads*) |
23973 | 197 |
val status = ref ([]: exn list); |
24208 | 198 |
val running = ref ([]: Thread.thread list); |
199 |
fun start f = |
|
200 |
(inc active; |
|
201 |
change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer])))); |
|
202 |
fun stop () = |
|
203 |
(dec active; |
|
204 |
change running (List.filter (fn t => not (Thread.equal (t, Thread.self ()))))); |
|
205 |
||
206 |
(*worker thread*) |
|
207 |
fun worker () = |
|
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
208 |
(case PROTECTED "dequeue" dequeue of |
24208 | 209 |
Task {body, cont, fail} => |
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
210 |
(case Exn.capture (interruptible (fn _ => body)) () of |
24208 | 211 |
Exn.Result () => |
212 |
(PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ()) |
|
23981 | 213 |
| Exn.Exn exn => |
24208 | 214 |
PROTECTED "fail" (fn () => |
215 |
(change status (cons exn); change queue fail; stop (); wakeup_all ()))) |
|
216 |
| Terminate => PROTECTED "terminate" (fn () => (stop (); wakeup_all ()))); |
|
23973 | 217 |
|
218 |
(*main control: fork and wait*) |
|
219 |
fun fork 0 = () |
|
24208 | 220 |
| fork k = (start worker; fork (k - 1)); |
24063 | 221 |
val _ = PROTECTED "main" (fn () => |
222 |
(fork (Int.max (n, 1)); |
|
24208 | 223 |
while not (List.null (! running)) do |
224 |
(trace_active (); |
|
225 |
if not (List.null (! status)) then (List.app Thread.interrupt (! running)) else (); |
|
24291 | 226 |
wait_timeout ()))); |
23973 | 227 |
|
24208 | 228 |
in ! status end); |
23973 | 229 |
|
23961 | 230 |
end; |
231 |
||
23991 | 232 |
val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL; |
23961 | 233 |
val CRITICAL = Multithreading.CRITICAL; |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
234 |
|
24668 | 235 |
val ignore_interrupt = Multithreading.ignore_interrupt; |
236 |
val raise_interrupt = Multithreading.raise_interrupt; |
|
24208 | 237 |
|
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
238 |
structure TimeLimit = Multithreading.TimeLimit; |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
239 |