author | wenzelm |
Tue, 11 Mar 2008 20:30:46 +0100 | |
changeset 26254 | 3def1a1fea4e |
parent 26221 | e557c20158e2 |
child 26390 | 99d4cbb1f941 |
permissions | -rw-r--r-- |
23961 | 1 |
(* Title: Pure/ML-Systems/multithreading_polyml.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
25704 | 5 |
Multithreading in Poly/ML 5.1 (cf. polyml/basis/Thread.sml). |
23961 | 6 |
*) |
7 |
||
8 |
open Thread; |
|
9 |
||
25704 | 10 |
signature MULTITHREADING_POLYML = |
11 |
sig |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
12 |
val interruptible: ('a -> 'b) -> 'a -> 'b |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
13 |
val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b |
26098
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
14 |
val system_out: string -> string * int |
25704 | 15 |
structure TimeLimit: TIME_LIMIT |
16 |
end; |
|
17 |
||
18 |
signature BASIC_MULTITHREADING = |
|
19 |
sig |
|
20 |
include BASIC_MULTITHREADING |
|
21 |
include MULTITHREADING_POLYML |
|
22 |
end; |
|
23 |
||
24208 | 24 |
signature MULTITHREADING = |
25 |
sig |
|
26 |
include MULTITHREADING |
|
25704 | 27 |
include MULTITHREADING_POLYML |
24208 | 28 |
end; |
29 |
||
23961 | 30 |
structure Multithreading: MULTITHREADING = |
31 |
struct |
|
32 |
||
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
33 |
(* options *) |
24069 | 34 |
|
24119 | 35 |
val trace = ref 0; |
36 |
fun tracing level msg = |
|
37 |
if level <= ! trace |
|
23981 | 38 |
then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr) |
39 |
else (); |
|
23961 | 40 |
|
23981 | 41 |
val available = true; |
25775
90525e67ede7
added Multithreading.max_threads_value, which maps a value of 0 to number of CPUs;
wenzelm
parents:
25735
diff
changeset
|
42 |
|
23973 | 43 |
val max_threads = ref 1; |
44 |
||
25775
90525e67ede7
added Multithreading.max_threads_value, which maps a value of 0 to number of CPUs;
wenzelm
parents:
25735
diff
changeset
|
45 |
fun max_threads_value () = |
90525e67ede7
added Multithreading.max_threads_value, which maps a value of 0 to number of CPUs;
wenzelm
parents:
25735
diff
changeset
|
46 |
let val m = ! max_threads |
90525e67ede7
added Multithreading.max_threads_value, which maps a value of 0 to number of CPUs;
wenzelm
parents:
25735
diff
changeset
|
47 |
in if m <= 0 then Thread.numProcessors () else m end; |
90525e67ede7
added Multithreading.max_threads_value, which maps a value of 0 to number of CPUs;
wenzelm
parents:
25735
diff
changeset
|
48 |
|
23973 | 49 |
|
24069 | 50 |
(* misc utils *) |
51 |
||
24208 | 52 |
fun cons x xs = x :: xs; |
24069 | 53 |
|
24208 | 54 |
fun change r f = r := f (! r); |
24069 | 55 |
|
56 |
fun inc i = (i := ! i + 1; ! i); |
|
57 |
fun dec i = (i := ! i - 1; ! i); |
|
58 |
||
24208 | 59 |
fun show "" = "" | show name = " " ^ name; |
60 |
fun show' "" = "" | show' name = " [" ^ name ^ "]"; |
|
61 |
||
26098
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
62 |
fun read_file name = |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
63 |
let val is = TextIO.openIn name |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
64 |
in TextIO.inputAll is before TextIO.closeIn is end; |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
65 |
|
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
66 |
fun write_file name txt = |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
67 |
let val os = TextIO.openOut name |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
68 |
in TextIO.output (os, txt) before TextIO.closeOut os end; |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
69 |
|
24208 | 70 |
|
71 |
(* thread attributes *) |
|
72 |
||
73 |
fun with_attributes new_atts f x = |
|
74 |
let |
|
75 |
val orig_atts = Thread.getAttributes (); |
|
76 |
fun restore () = Thread.setAttributes orig_atts; |
|
77 |
in |
|
78 |
Exn.release |
|
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
79 |
(*RACE for fully asynchronous interrupts!*) |
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
80 |
(let |
24208 | 81 |
val _ = Thread.setAttributes new_atts; |
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
82 |
val result = Exn.capture (f orig_atts) x; |
24208 | 83 |
val _ = restore (); |
84 |
in result end |
|
85 |
handle Interrupt => (restore (); Exn.Exn Interrupt)) |
|
86 |
end; |
|
87 |
||
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
88 |
fun interruptible f = |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
89 |
with_attributes |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
90 |
[Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce] |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
91 |
(fn _ => f); |
24208 | 92 |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
93 |
fun uninterruptible f = |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
94 |
with_attributes |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
95 |
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer] |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
96 |
(fn atts => f (fn g => with_attributes atts (fn _ => g))); |
24668 | 97 |
|
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
98 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
99 |
(* execution with time limit *) |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
100 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
101 |
structure TimeLimit = |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
102 |
struct |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
103 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
104 |
exception TimeOut; |
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
105 |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
106 |
fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () => |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
107 |
let |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
108 |
val worker = Thread.self (); |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
109 |
val timeout = ref false; |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
110 |
val watchdog = Thread.fork (fn () => |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
111 |
(OS.Process.sleep time; timeout := true; Thread.interrupt worker), []); |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
112 |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
113 |
(*RACE! timeout signal vs. external Interrupt*) |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
114 |
val result = Exn.capture (restore_attributes f) x; |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
115 |
val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false); |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
116 |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
117 |
val _ = Thread.interrupt watchdog handle Thread _ => (); |
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
118 |
in if was_timeout then raise TimeOut else Exn.release result end) (); |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
119 |
|
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
120 |
end; |
24668 | 121 |
|
24069 | 122 |
|
26098
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
123 |
(* system shell processes, with propagation of interrupts *) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
124 |
|
26221
e557c20158e2
system_out: threaded version does not work for 5.1;
wenzelm
parents:
26098
diff
changeset
|
125 |
fun system_out_threaded script = uninterruptible (fn restore_attributes => fn () => |
26098
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
126 |
let |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
127 |
val script_name = OS.FileSys.tmpName (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
128 |
val _ = write_file script_name script; |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
129 |
|
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
130 |
val pid_name = OS.FileSys.tmpName (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
131 |
val output_name = OS.FileSys.tmpName (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
132 |
|
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
133 |
(*result state*) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
134 |
datatype result = Wait | Signal | Result of int; |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
135 |
val result = ref Wait; |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
136 |
val result_mutex = Mutex.mutex (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
137 |
val result_cond = ConditionVar.conditionVar (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
138 |
fun set_result res = |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
139 |
(Mutex.lock result_mutex; result := res; Mutex.unlock result_mutex; |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
140 |
ConditionVar.signal result_cond); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
141 |
|
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
142 |
val _ = Mutex.lock result_mutex; |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
143 |
|
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
144 |
(*system thread*) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
145 |
val system_thread = Thread.fork (fn () => |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
146 |
let |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
147 |
val status = |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
148 |
OS.Process.system ("perl -w \"$ISABELLE_HOME/lib/scripts/system.pl\" group " ^ |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
149 |
script_name ^ " " ^ pid_name ^ " " ^ output_name); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
150 |
val res = |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
151 |
(case Posix.Process.fromStatus status of |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
152 |
Posix.Process.W_EXITED => Result 0 |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
153 |
| Posix.Process.W_EXITSTATUS 0wx82 => Signal |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
154 |
| Posix.Process.W_EXITSTATUS w => Result (Word8.toInt w) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
155 |
| Posix.Process.W_SIGNALED s => |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
156 |
if s = Posix.Signal.int then Signal |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
157 |
else Result (256 + LargeWord.toInt (Posix.Signal.toWord s)) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
158 |
| Posix.Process.W_STOPPED s => Result (512 + LargeWord.toInt (Posix.Signal.toWord s))); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
159 |
in set_result res end handle _ => set_result (Result 2), []); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
160 |
|
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
161 |
(*main thread -- proxy for interrupts*) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
162 |
fun kill n = |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
163 |
(case Int.fromString (read_file pid_name) of |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
164 |
SOME pid => |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
165 |
Posix.Process.kill |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
166 |
(Posix.Process.K_GROUP (Posix.Process.wordToPid (LargeWord.fromInt pid)), |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
167 |
Posix.Signal.int) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
168 |
| NONE => ()) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
169 |
handle OS.SysErr _ => () | IO.Io _ => |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
170 |
(OS.Process.sleep (Time.fromMilliseconds 100); if n > 0 then kill (n - 1) else ()); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
171 |
|
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
172 |
val _ = while ! result = Wait do |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
173 |
restore_attributes (fn () => |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
174 |
(ConditionVar.waitUntil (result_cond, result_mutex, Time.now () + Time.fromMilliseconds 100); ()) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
175 |
handle Interrupt => kill 10) (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
176 |
|
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
177 |
(*cleanup*) |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
178 |
val output = read_file output_name handle IO.Io _ => ""; |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
179 |
val _ = OS.FileSys.remove script_name handle OS.SysErr _ => (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
180 |
val _ = OS.FileSys.remove pid_name handle OS.SysErr _ => (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
181 |
val _ = OS.FileSys.remove output_name handle OS.SysErr _ => (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
182 |
val _ = Thread.interrupt system_thread handle Thread _ => (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
183 |
val rc = (case ! result of Signal => raise Interrupt | Result rc => rc); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
184 |
in (output, rc) end) (); |
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
185 |
|
26221
e557c20158e2
system_out: threaded version does not work for 5.1;
wenzelm
parents:
26098
diff
changeset
|
186 |
val system_out = |
e557c20158e2
system_out: threaded version does not work for 5.1;
wenzelm
parents:
26098
diff
changeset
|
187 |
if ml_system = "polyml-5.1" then system_out (*signals not propagated from root thread!*) |
e557c20158e2
system_out: threaded version does not work for 5.1;
wenzelm
parents:
26098
diff
changeset
|
188 |
else system_out_threaded; |
e557c20158e2
system_out: threaded version does not work for 5.1;
wenzelm
parents:
26098
diff
changeset
|
189 |
|
26098
b59d33f73aed
added system_out (back to multithreaded version -- still suffers from non-interruptible wait in Poly/ML 5.1);
wenzelm
parents:
26083
diff
changeset
|
190 |
|
23961 | 191 |
(* critical section -- may be nested within the same thread *) |
192 |
||
193 |
local |
|
194 |
||
24063 | 195 |
val critical_lock = Mutex.mutex (); |
196 |
val critical_thread = ref (NONE: Thread.thread option); |
|
197 |
val critical_name = ref ""; |
|
198 |
||
23961 | 199 |
in |
200 |
||
201 |
fun self_critical () = |
|
202 |
(case ! critical_thread of |
|
203 |
NONE => false |
|
204 |
| SOME id => Thread.equal (id, Thread.self ())); |
|
205 |
||
23991 | 206 |
fun NAMED_CRITICAL name e = |
23961 | 207 |
if self_critical () then e () |
208 |
else |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
209 |
uninterruptible (fn restore_attributes => fn () => |
24208 | 210 |
let |
211 |
val name' = ! critical_name; |
|
212 |
val _ = |
|
213 |
if Mutex.trylock critical_lock then () |
|
214 |
else |
|
215 |
let |
|
216 |
val timer = Timer.startRealTimer (); |
|
217 |
val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting"); |
|
218 |
val _ = Mutex.lock critical_lock; |
|
219 |
val time = Timer.checkRealTimer timer; |
|
220 |
val _ = tracing (if Time.> (time, Time.fromMilliseconds 10) then 3 else 4) (fn () => |
|
221 |
"CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time); |
|
222 |
in () end; |
|
223 |
val _ = critical_thread := SOME (Thread.self ()); |
|
224 |
val _ = critical_name := name; |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
225 |
val result = Exn.capture (restore_attributes e) (); |
24208 | 226 |
val _ = critical_name := ""; |
227 |
val _ = critical_thread := NONE; |
|
228 |
val _ = Mutex.unlock critical_lock; |
|
229 |
in Exn.release result end) (); |
|
23961 | 230 |
|
23991 | 231 |
fun CRITICAL e = NAMED_CRITICAL "" e; |
23981 | 232 |
|
23961 | 233 |
end; |
234 |
||
23973 | 235 |
|
24208 | 236 |
(* scheduling -- multiple threads working on a queue of tasks *) |
237 |
||
238 |
datatype 'a task = |
|
239 |
Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate; |
|
23973 | 240 |
|
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
241 |
fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks => |
23973 | 242 |
let |
243 |
(*protected execution*) |
|
244 |
val lock = Mutex.mutex (); |
|
24672 | 245 |
val protected_name = ref ""; |
24063 | 246 |
fun PROTECTED name e = |
23973 | 247 |
let |
24144 | 248 |
val name' = ! protected_name; |
23981 | 249 |
val _ = |
250 |
if Mutex.trylock lock then () |
|
251 |
else |
|
24144 | 252 |
let |
253 |
val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting"); |
|
254 |
val _ = Mutex.lock lock; |
|
255 |
val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed"); |
|
256 |
in () end; |
|
24069 | 257 |
val _ = protected_name := name; |
23973 | 258 |
val res = Exn.capture e (); |
24069 | 259 |
val _ = protected_name := ""; |
23973 | 260 |
val _ = Mutex.unlock lock; |
261 |
in Exn.release res end; |
|
262 |
||
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
263 |
(*wakeup condition*) |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
264 |
val wakeup = ConditionVar.conditionVar (); |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
265 |
fun wakeup_all () = ConditionVar.broadcast wakeup; |
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
266 |
fun wait () = ConditionVar.wait (wakeup, lock); |
24291 | 267 |
fun wait_timeout () = ConditionVar.waitUntil (wakeup, lock, Time.now () + Time.fromSeconds 1); |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
268 |
|
24214
0482ecc4ef11
(un)interruptible: pass-through original thread attributes;
wenzelm
parents:
24208
diff
changeset
|
269 |
(*queue of tasks*) |
23973 | 270 |
val queue = ref tasks; |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
271 |
val active = ref 0; |
24119 | 272 |
fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active"); |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
273 |
fun dequeue () = |
23973 | 274 |
let |
23981 | 275 |
val (next, tasks') = next_task (! queue); |
23973 | 276 |
val _ = queue := tasks'; |
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
277 |
in |
24208 | 278 |
(case next of Wait => |
279 |
(dec active; trace_active (); |
|
280 |
wait (); |
|
281 |
inc active; trace_active (); |
|
282 |
dequeue ()) |
|
283 |
| _ => next) |
|
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
284 |
end; |
23973 | 285 |
|
24208 | 286 |
(*pool of running threads*) |
23973 | 287 |
val status = ref ([]: exn list); |
24208 | 288 |
val running = ref ([]: Thread.thread list); |
289 |
fun start f = |
|
290 |
(inc active; |
|
291 |
change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer])))); |
|
292 |
fun stop () = |
|
293 |
(dec active; |
|
294 |
change running (List.filter (fn t => not (Thread.equal (t, Thread.self ()))))); |
|
295 |
||
296 |
(*worker thread*) |
|
297 |
fun worker () = |
|
24072
8b9e5d776ef3
dequeue: wait loop while PROTECTED -- avoids race condition;
wenzelm
parents:
24069
diff
changeset
|
298 |
(case PROTECTED "dequeue" dequeue of |
24208 | 299 |
Task {body, cont, fail} => |
26083
abb3f8dd66dc
removed managed_process (cf. General/shell_process.ML);
wenzelm
parents:
26074
diff
changeset
|
300 |
(case Exn.capture (restore_attributes body) () of |
24208 | 301 |
Exn.Result () => |
302 |
(PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ()) |
|
23981 | 303 |
| Exn.Exn exn => |
24208 | 304 |
PROTECTED "fail" (fn () => |
305 |
(change status (cons exn); change queue fail; stop (); wakeup_all ()))) |
|
306 |
| Terminate => PROTECTED "terminate" (fn () => (stop (); wakeup_all ()))); |
|
23973 | 307 |
|
308 |
(*main control: fork and wait*) |
|
309 |
fun fork 0 = () |
|
24208 | 310 |
| fork k = (start worker; fork (k - 1)); |
24063 | 311 |
val _ = PROTECTED "main" (fn () => |
312 |
(fork (Int.max (n, 1)); |
|
24208 | 313 |
while not (List.null (! running)) do |
314 |
(trace_active (); |
|
26254
3def1a1fea4e
schedule main control: more robust interrupting of potentially running threads;
wenzelm
parents:
26221
diff
changeset
|
315 |
if not (List.null (! status)) |
3def1a1fea4e
schedule main control: more robust interrupting of potentially running threads;
wenzelm
parents:
26221
diff
changeset
|
316 |
then (List.app (fn t => Thread.interrupt t handle Thread _ => ()) (! running)) |
3def1a1fea4e
schedule main control: more robust interrupting of potentially running threads;
wenzelm
parents:
26221
diff
changeset
|
317 |
else (); |
24291 | 318 |
wait_timeout ()))); |
23973 | 319 |
|
24208 | 320 |
in ! status end); |
23973 | 321 |
|
25704 | 322 |
|
323 |
(* serial numbers *) |
|
324 |
||
325 |
local |
|
326 |
||
327 |
val serial_lock = Mutex.mutex (); |
|
328 |
val serial_count = ref 0; |
|
329 |
||
330 |
in |
|
331 |
||
332 |
val serial = uninterruptible (fn _ => fn () => |
|
333 |
let |
|
334 |
val _ = Mutex.lock serial_lock; |
|
335 |
val res = inc serial_count; |
|
336 |
val _ = Mutex.unlock serial_lock; |
|
337 |
in res end); |
|
338 |
||
23961 | 339 |
end; |
340 |
||
25735 | 341 |
|
342 |
(* thread data *) |
|
343 |
||
344 |
val get_data = Thread.getLocal; |
|
345 |
val put_data = Thread.setLocal; |
|
346 |
||
25704 | 347 |
end; |
24688
a5754ca5c510
replaced interrupt_timeout by TimeLimit.timeLimit (available on SML/NJ and Poly/ML 5.1);
wenzelm
parents:
24672
diff
changeset
|
348 |
|
25704 | 349 |
structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading; |
350 |
open BasicMultithreading; |