author | wenzelm |
Wed, 11 Dec 2013 18:02:22 +0100 | |
changeset 54717 | 42c209a6c225 |
parent 50910 | 54f06ba192ef |
child 54723 | 124432e77ecf |
permissions | -rw-r--r-- |
23961 | 1 |
(* Title: Pure/ML-Systems/multithreading_polyml.ML |
2 |
Author: Makarius |
|
3 |
||
50910 | 4 |
Multithreading in Poly/ML (cf. polyml/basis/Thread.sml). |
23961 | 5 |
*) |
6 |
||
25704 | 7 |
signature MULTITHREADING_POLYML = |
8 |
sig |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
9 |
val interruptible: ('a -> 'b) -> 'a -> 'b |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
10 |
val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b |
25704 | 11 |
end; |
12 |
||
13 |
signature BASIC_MULTITHREADING = |
|
14 |
sig |
|
15 |
include BASIC_MULTITHREADING |
|
16 |
include MULTITHREADING_POLYML |
|
17 |
end; |
|
18 |
||
24208 | 19 |
signature MULTITHREADING = |
20 |
sig |
|
21 |
include MULTITHREADING |
|
25704 | 22 |
include MULTITHREADING_POLYML |
24208 | 23 |
end; |
24 |
||
23961 | 25 |
structure Multithreading: MULTITHREADING = |
26 |
struct |
|
27 |
||
24208 | 28 |
(* thread attributes *) |
29 |
||
28161 | 30 |
val no_interrupts = |
31 |
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer]; |
|
32 |
||
41713
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
33 |
val test_interrupts = |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
34 |
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptSynch]; |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
35 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
36 |
val public_interrupts = |
28161 | 37 |
[Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce]; |
38 |
||
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
39 |
val private_interrupts = |
30612
cb6421b6a18f
future_job: do not inherit attributes, but enforce restricted interrupts -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30602
diff
changeset
|
40 |
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce]; |
cb6421b6a18f
future_job: do not inherit attributes, but enforce restricted interrupts -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30602
diff
changeset
|
41 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
42 |
val sync_interrupts = map |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
43 |
(fn x as Thread.InterruptState Thread.InterruptDefer => x |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
44 |
| Thread.InterruptState _ => Thread.InterruptState Thread.InterruptSynch |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
45 |
| x => x); |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
46 |
|
28466
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
wenzelm
parents:
28465
diff
changeset
|
47 |
val safe_interrupts = map |
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
wenzelm
parents:
28465
diff
changeset
|
48 |
(fn Thread.InterruptState Thread.InterruptAsynch => |
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
wenzelm
parents:
28465
diff
changeset
|
49 |
Thread.InterruptState Thread.InterruptAsynchOnce |
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
wenzelm
parents:
28465
diff
changeset
|
50 |
| x => x); |
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
wenzelm
parents:
28465
diff
changeset
|
51 |
|
41713
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
52 |
fun interrupted () = |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
53 |
let |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
54 |
val orig_atts = safe_interrupts (Thread.getAttributes ()); |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
55 |
val _ = Thread.setAttributes test_interrupts; |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
56 |
val test = Exn.capture Thread.testInterrupt (); |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
57 |
val _ = Thread.setAttributes orig_atts; |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
58 |
in Exn.release test end; |
a21084741b37
added Multithreading.interrupted (cf. java.lang.Thread.interrupted);
wenzelm
parents:
41710
diff
changeset
|
59 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
60 |
fun with_attributes new_atts e = |
24208 | 61 |
let |
29550
67ec51c032cb
with_attributes: make double sure that unsafe attributes are avoided;
wenzelm
parents:
28555
diff
changeset
|
62 |
val orig_atts = safe_interrupts (Thread.getAttributes ()); |
30602
1bd90b76477a
with_attributes: canonical capture/release scheme (potentially iron out race condition);
wenzelm
parents:
29564
diff
changeset
|
63 |
val result = Exn.capture (fn () => |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
64 |
(Thread.setAttributes (safe_interrupts new_atts); e orig_atts)) (); |
30602
1bd90b76477a
with_attributes: canonical capture/release scheme (potentially iron out race condition);
wenzelm
parents:
29564
diff
changeset
|
65 |
val _ = Thread.setAttributes orig_atts; |
28466
6e35fbfc32b8
with_attributes: enforces InterruptAsynch => InterruptAsynchOnce to avoid race condition;
wenzelm
parents:
28465
diff
changeset
|
66 |
in Exn.release result end; |
24208 | 67 |
|
32286
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
68 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
69 |
(* portable wrappers *) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
70 |
|
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
71 |
fun interruptible f x = with_attributes public_interrupts (fn _ => f x); |
32286
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
72 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
73 |
fun uninterruptible f x = |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
74 |
with_attributes no_interrupts (fn atts => |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
75 |
f (fn g => fn y => with_attributes atts (fn _ => g y)) x); |
24668 | 76 |
|
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
77 |
|
54717 | 78 |
(* options *) |
79 |
||
80 |
val available = true; |
|
81 |
||
82 |
fun max_threads_result m = |
|
83 |
if m > 0 then m |
|
84 |
else |
|
85 |
let val n = |
|
86 |
(case Thread.numPhysicalProcessors () of |
|
87 |
SOME n => n |
|
88 |
| NONE => Thread.numProcessors ()) |
|
89 |
in Int.min (Int.max (n, 1), 8) end; |
|
90 |
||
91 |
val max_threads = ref 1; |
|
92 |
||
93 |
fun max_threads_value () = ! max_threads; |
|
94 |
||
95 |
fun max_threads_update m = max_threads := max_threads_result m; |
|
96 |
||
97 |
fun max_threads_setmp m f x = |
|
98 |
uninterruptible (fn restore_attributes => fn () => |
|
99 |
let |
|
100 |
val max_threads_orig = ! max_threads; |
|
101 |
val _ = max_threads_update m; |
|
102 |
val result = Exn.capture (restore_attributes f) x; |
|
103 |
val _ = max_threads := max_threads_orig; |
|
104 |
in Exn.release result end) (); |
|
105 |
||
106 |
fun enabled () = max_threads_value () > 1; |
|
107 |
||
108 |
||
32286
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
109 |
(* synchronous wait *) |
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
110 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
111 |
fun sync_wait opt_atts time cond lock = |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
112 |
with_attributes |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
113 |
(sync_interrupts (case opt_atts of SOME atts => atts | NONE => Thread.getAttributes ())) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
114 |
(fn _ => |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
115 |
(case time of |
43761
e72ba84ae58f
tuned signature -- corresponding to Scala version;
wenzelm
parents:
41713
diff
changeset
|
116 |
SOME t => Exn.Res (ConditionVar.waitUntil (cond, lock, t)) |
e72ba84ae58f
tuned signature -- corresponding to Scala version;
wenzelm
parents:
41713
diff
changeset
|
117 |
| NONE => (ConditionVar.wait (cond, lock); Exn.Res true)) |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
118 |
handle exn => Exn.Exn exn); |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
119 |
|
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
120 |
|
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
121 |
(* tracing *) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
122 |
|
39616
8052101883c3
renamed setmp_noncritical to Unsynchronized.setmp to emphasize its meaning;
wenzelm
parents:
39583
diff
changeset
|
123 |
val trace = ref 0; |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
124 |
|
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
125 |
fun tracing level msg = |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
126 |
if level > ! trace then () |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
127 |
else uninterruptible (fn _ => fn () => |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
128 |
(TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
129 |
handle _ (*sic*) => ()) (); |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
130 |
|
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
131 |
fun tracing_time detailed time = |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
132 |
tracing |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
133 |
(if not detailed then 5 |
40301 | 134 |
else if Time.>= (time, seconds 1.0) then 1 |
135 |
else if Time.>= (time, seconds 0.1) then 2 |
|
136 |
else if Time.>= (time, seconds 0.01) then 3 |
|
137 |
else if Time.>= (time, seconds 0.001) then 4 else 5); |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
138 |
|
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
139 |
fun real_time f x = |
32286
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
140 |
let |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
141 |
val timer = Timer.startRealTimer (); |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
142 |
val () = f x; |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
143 |
val time = Timer.checkRealTimer timer; |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32286
diff
changeset
|
144 |
in time end; |
32286
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
145 |
|
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
146 |
|
23961 | 147 |
(* critical section -- may be nested within the same thread *) |
148 |
||
149 |
local |
|
150 |
||
24063 | 151 |
val critical_lock = Mutex.mutex (); |
39616
8052101883c3
renamed setmp_noncritical to Unsynchronized.setmp to emphasize its meaning;
wenzelm
parents:
39583
diff
changeset
|
152 |
val critical_thread = ref (NONE: Thread.thread option); |
8052101883c3
renamed setmp_noncritical to Unsynchronized.setmp to emphasize its meaning;
wenzelm
parents:
39583
diff
changeset
|
153 |
val critical_name = ref ""; |
24063 | 154 |
|
40748
591b6778d076
removed bash from ML system bootstrap, and past the Secure ML barrier;
wenzelm
parents:
40301
diff
changeset
|
155 |
fun show "" = "" | show name = " " ^ name; |
591b6778d076
removed bash from ML system bootstrap, and past the Secure ML barrier;
wenzelm
parents:
40301
diff
changeset
|
156 |
fun show' "" = "" | show' name = " [" ^ name ^ "]"; |
591b6778d076
removed bash from ML system bootstrap, and past the Secure ML barrier;
wenzelm
parents:
40301
diff
changeset
|
157 |
|
23961 | 158 |
in |
159 |
||
160 |
fun self_critical () = |
|
161 |
(case ! critical_thread of |
|
162 |
NONE => false |
|
28150 | 163 |
| SOME t => Thread.equal (t, Thread.self ())); |
23961 | 164 |
|
23991 | 165 |
fun NAMED_CRITICAL name e = |
23961 | 166 |
if self_critical () then e () |
167 |
else |
|
32184 | 168 |
Exn.release (uninterruptible (fn restore_attributes => fn () => |
24208 | 169 |
let |
170 |
val name' = ! critical_name; |
|
171 |
val _ = |
|
172 |
if Mutex.trylock critical_lock then () |
|
173 |
else |
|
174 |
let |
|
32184 | 175 |
val _ = tracing 5 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting"); |
32185 | 176 |
val time = real_time Mutex.lock critical_lock; |
32186 | 177 |
val _ = tracing_time true time (fn () => |
24208 | 178 |
"CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time); |
179 |
in () end; |
|
180 |
val _ = critical_thread := SOME (Thread.self ()); |
|
181 |
val _ = critical_name := name; |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
182 |
val result = Exn.capture (restore_attributes e) (); |
24208 | 183 |
val _ = critical_name := ""; |
184 |
val _ = critical_thread := NONE; |
|
185 |
val _ = Mutex.unlock critical_lock; |
|
32184 | 186 |
in result end) ()); |
23961 | 187 |
|
23991 | 188 |
fun CRITICAL e = NAMED_CRITICAL "" e; |
23981 | 189 |
|
23961 | 190 |
end; |
191 |
||
23973 | 192 |
|
25704 | 193 |
(* serial numbers *) |
194 |
||
195 |
local |
|
196 |
||
197 |
val serial_lock = Mutex.mutex (); |
|
39616
8052101883c3
renamed setmp_noncritical to Unsynchronized.setmp to emphasize its meaning;
wenzelm
parents:
39583
diff
changeset
|
198 |
val serial_count = ref 0; |
25704 | 199 |
|
200 |
in |
|
201 |
||
202 |
val serial = uninterruptible (fn _ => fn () => |
|
203 |
let |
|
204 |
val _ = Mutex.lock serial_lock; |
|
28124
10a1f1f4c6ae
moved Multithreading.task/schedule to Concurrent/schedule.ML;
wenzelm
parents:
26504
diff
changeset
|
205 |
val _ = serial_count := ! serial_count + 1; |
10a1f1f4c6ae
moved Multithreading.task/schedule to Concurrent/schedule.ML;
wenzelm
parents:
26504
diff
changeset
|
206 |
val res = ! serial_count; |
25704 | 207 |
val _ = Mutex.unlock serial_lock; |
208 |
in res end); |
|
209 |
||
23961 | 210 |
end; |
211 |
||
25704 | 212 |
end; |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
213 |
|
32286
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
214 |
structure Basic_Multithreading: BASIC_MULTITHREADING = Multithreading; |
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32230
diff
changeset
|
215 |
open Basic_Multithreading; |