wenzelm@28156
|
1 |
(* Title: Pure/Concurrent/future.ML
|
wenzelm@28156
|
2 |
ID: $Id$
|
wenzelm@28156
|
3 |
Author: Makarius
|
wenzelm@28156
|
4 |
|
wenzelm@28201
|
5 |
Future values.
|
wenzelm@28201
|
6 |
|
wenzelm@28201
|
7 |
Notes:
|
wenzelm@28201
|
8 |
|
wenzelm@28201
|
9 |
* Futures are similar to delayed evaluation, i.e. delay/force is
|
wenzelm@28201
|
10 |
generalized to fork/join (and variants). The idea is to model
|
wenzelm@28201
|
11 |
parallel value-oriented computations, but *not* communicating
|
wenzelm@28201
|
12 |
processes.
|
wenzelm@28201
|
13 |
|
wenzelm@28201
|
14 |
* Futures are grouped; failure of one group member causes the whole
|
wenzelm@28201
|
15 |
group to be interrupted eventually.
|
wenzelm@28201
|
16 |
|
wenzelm@28201
|
17 |
* Forked futures are evaluated spontaneously by a farm of worker
|
wenzelm@28201
|
18 |
threads in the background; join resynchronizes the computation and
|
wenzelm@28201
|
19 |
delivers results (values or exceptions).
|
wenzelm@28201
|
20 |
|
wenzelm@28201
|
21 |
* The pool of worker threads is limited, usually in correlation with
|
wenzelm@28201
|
22 |
the number of physical cores on the machine. Note that allocation
|
wenzelm@28201
|
23 |
of runtime resources is distorted either if workers yield CPU time
|
wenzelm@28201
|
24 |
(e.g. via system sleep or wait operations), or if non-worker
|
wenzelm@28201
|
25 |
threads contend for significant runtime resources independently.
|
wenzelm@28156
|
26 |
*)
|
wenzelm@28156
|
27 |
|
wenzelm@28156
|
28 |
signature FUTURE =
|
wenzelm@28156
|
29 |
sig
|
wenzelm@28166
|
30 |
type task = TaskQueue.task
|
wenzelm@28166
|
31 |
type group = TaskQueue.group
|
wenzelm@28156
|
32 |
type 'a T
|
wenzelm@28166
|
33 |
val task_of: 'a T -> task
|
wenzelm@28177
|
34 |
val group_of: 'a T -> group
|
wenzelm@28191
|
35 |
val future: group option -> task list -> (unit -> 'a) -> 'a T
|
wenzelm@28166
|
36 |
val fork: (unit -> 'a) -> 'a T
|
wenzelm@28193
|
37 |
val join_results: 'a T list -> 'a Exn.result list
|
wenzelm@28166
|
38 |
val join: 'a T -> 'a
|
wenzelm@28202
|
39 |
val focus: task list -> unit
|
wenzelm@28206
|
40 |
val interrupt_task: string -> unit
|
wenzelm@28197
|
41 |
val cancel: 'a T -> unit
|
wenzelm@28203
|
42 |
val shutdown: unit -> unit
|
wenzelm@28156
|
43 |
end;
|
wenzelm@28156
|
44 |
|
wenzelm@28156
|
45 |
structure Future: FUTURE =
|
wenzelm@28156
|
46 |
struct
|
wenzelm@28156
|
47 |
|
wenzelm@28177
|
48 |
(** future values **)
|
wenzelm@28177
|
49 |
|
wenzelm@28167
|
50 |
(* identifiers *)
|
wenzelm@28167
|
51 |
|
wenzelm@28167
|
52 |
type task = TaskQueue.task;
|
wenzelm@28167
|
53 |
type group = TaskQueue.group;
|
wenzelm@28167
|
54 |
|
wenzelm@28177
|
55 |
local val tag = Universal.tag () : (task * group) option Universal.tag in
|
wenzelm@28177
|
56 |
fun thread_data () = the_default NONE (Thread.getLocal tag);
|
wenzelm@28177
|
57 |
fun set_thread_data x = Thread.setLocal (tag, x);
|
wenzelm@28167
|
58 |
end;
|
wenzelm@28167
|
59 |
|
wenzelm@28167
|
60 |
|
wenzelm@28167
|
61 |
(* datatype future *)
|
wenzelm@28167
|
62 |
|
wenzelm@28167
|
63 |
datatype 'a T = Future of
|
wenzelm@28167
|
64 |
{task: task,
|
wenzelm@28177
|
65 |
group: group,
|
wenzelm@28167
|
66 |
result: 'a Exn.result option ref};
|
wenzelm@28167
|
67 |
|
wenzelm@28167
|
68 |
fun task_of (Future {task, ...}) = task;
|
wenzelm@28167
|
69 |
fun group_of (Future {group, ...}) = group;
|
wenzelm@28167
|
70 |
|
wenzelm@28167
|
71 |
|
wenzelm@28177
|
72 |
|
wenzelm@28177
|
73 |
(** scheduling **)
|
wenzelm@28177
|
74 |
|
wenzelm@28177
|
75 |
(* global state *)
|
wenzelm@28177
|
76 |
|
wenzelm@28177
|
77 |
val queue = ref TaskQueue.empty;
|
wenzelm@28192
|
78 |
val workers = ref ([]: (Thread.thread * bool) list);
|
wenzelm@28177
|
79 |
val scheduler = ref (NONE: Thread.thread option);
|
wenzelm@28177
|
80 |
val excessive = ref 0;
|
wenzelm@28206
|
81 |
val canceled = ref ([]: TaskQueue.group list);
|
wenzelm@28206
|
82 |
val do_shutdown = ref false;
|
wenzelm@28177
|
83 |
|
wenzelm@28177
|
84 |
|
wenzelm@28177
|
85 |
(* synchronization *)
|
wenzelm@28156
|
86 |
|
wenzelm@28156
|
87 |
local
|
wenzelm@28156
|
88 |
val lock = Mutex.mutex ();
|
wenzelm@28156
|
89 |
val cond = ConditionVar.conditionVar ();
|
wenzelm@28156
|
90 |
in
|
wenzelm@28156
|
91 |
|
wenzelm@28192
|
92 |
fun SYNCHRONIZED name e = uninterruptible (fn restore_attributes => fn () =>
|
wenzelm@28162
|
93 |
let
|
wenzelm@28192
|
94 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": locking");
|
wenzelm@28162
|
95 |
val _ = Mutex.lock lock;
|
wenzelm@28192
|
96 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": locked");
|
wenzelm@28162
|
97 |
val result = Exn.capture (restore_attributes e) ();
|
wenzelm@28162
|
98 |
val _ = Mutex.unlock lock;
|
wenzelm@28192
|
99 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": unlocked");
|
wenzelm@28162
|
100 |
in Exn.release result end) ();
|
wenzelm@28156
|
101 |
|
wenzelm@28167
|
102 |
fun wait name = (*requires SYNCHRONIZED*)
|
wenzelm@28206
|
103 |
ConditionVar.wait (cond, lock);
|
wenzelm@28206
|
104 |
|
wenzelm@28206
|
105 |
fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
|
wenzelm@28206
|
106 |
ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
|
wenzelm@28166
|
107 |
|
wenzelm@28166
|
108 |
fun notify_all () = (*requires SYNCHRONIZED*)
|
wenzelm@28166
|
109 |
ConditionVar.broadcast cond;
|
wenzelm@28156
|
110 |
|
wenzelm@28156
|
111 |
end;
|
wenzelm@28156
|
112 |
|
wenzelm@28156
|
113 |
|
wenzelm@28177
|
114 |
(* execute *)
|
wenzelm@28156
|
115 |
|
wenzelm@28167
|
116 |
fun execute name (task, group, run) =
|
wenzelm@28167
|
117 |
let
|
wenzelm@28177
|
118 |
val _ = set_thread_data (SOME (task, group));
|
wenzelm@28167
|
119 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": running");
|
wenzelm@28167
|
120 |
val ok = run ();
|
wenzelm@28167
|
121 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": finished");
|
wenzelm@28177
|
122 |
val _ = set_thread_data NONE;
|
wenzelm@28192
|
123 |
val _ = SYNCHRONIZED "execute" (fn () =>
|
wenzelm@28177
|
124 |
(change queue (TaskQueue.finish task);
|
wenzelm@28186
|
125 |
if ok then ()
|
wenzelm@28191
|
126 |
else if TaskQueue.cancel (! queue) group then ()
|
wenzelm@28206
|
127 |
else change canceled (cons group);
|
wenzelm@28177
|
128 |
notify_all ()));
|
wenzelm@28167
|
129 |
in () end;
|
wenzelm@28167
|
130 |
|
wenzelm@28167
|
131 |
|
wenzelm@28167
|
132 |
(* worker threads *)
|
wenzelm@28167
|
133 |
|
wenzelm@28192
|
134 |
fun change_active active = (*requires SYNCHRONIZED*)
|
wenzelm@28203
|
135 |
let
|
wenzelm@28203
|
136 |
val _ = change workers (AList.update Thread.equal (Thread.self (), active));
|
wenzelm@28203
|
137 |
val ws = ! workers;
|
wenzelm@28203
|
138 |
val m = string_of_int (length ws);
|
wenzelm@28203
|
139 |
val n = string_of_int (length (filter #2 ws));
|
wenzelm@28203
|
140 |
in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
|
wenzelm@28203
|
141 |
|
wenzelm@28186
|
142 |
|
wenzelm@28186
|
143 |
fun worker_wait name = (*requires SYNCHRONIZED*)
|
wenzelm@28186
|
144 |
(change_active false; wait name; change_active true);
|
wenzelm@28162
|
145 |
|
wenzelm@28167
|
146 |
fun worker_next name = (*requires SYNCHRONIZED*)
|
wenzelm@28167
|
147 |
if ! excessive > 0 then
|
wenzelm@28167
|
148 |
(dec excessive;
|
wenzelm@28192
|
149 |
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
|
wenzelm@28203
|
150 |
notify_all ();
|
wenzelm@28167
|
151 |
NONE)
|
wenzelm@28166
|
152 |
else
|
wenzelm@28186
|
153 |
(case change_result queue TaskQueue.dequeue of
|
wenzelm@28186
|
154 |
NONE => (worker_wait name; worker_next name)
|
wenzelm@28166
|
155 |
| some => some);
|
wenzelm@28156
|
156 |
|
wenzelm@28167
|
157 |
fun worker_loop name =
|
wenzelm@28192
|
158 |
(case SYNCHRONIZED name (fn () => worker_next name) of
|
wenzelm@28203
|
159 |
NONE => Multithreading.tracing 4 (fn () => name ^ ": exit")
|
wenzelm@28167
|
160 |
| SOME work => (execute name work; worker_loop name));
|
wenzelm@28156
|
161 |
|
wenzelm@28167
|
162 |
fun worker_start name = (*requires SYNCHRONIZED*)
|
wenzelm@28242
|
163 |
change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
|
wenzelm@28156
|
164 |
|
wenzelm@28156
|
165 |
|
wenzelm@28156
|
166 |
(* scheduler *)
|
wenzelm@28156
|
167 |
|
wenzelm@28206
|
168 |
fun scheduler_next () = (*requires SYNCHRONIZED*)
|
wenzelm@28156
|
169 |
let
|
wenzelm@28206
|
170 |
(*worker threads*)
|
wenzelm@28191
|
171 |
val _ =
|
wenzelm@28192
|
172 |
(case List.partition (Thread.isActive o #1) (! workers) of
|
wenzelm@28191
|
173 |
(_, []) => ()
|
wenzelm@28191
|
174 |
| (active, inactive) =>
|
wenzelm@28191
|
175 |
(workers := active; Multithreading.tracing 0 (fn () =>
|
wenzelm@28192
|
176 |
"SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
|
wenzelm@28191
|
177 |
|
wenzelm@28206
|
178 |
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
|
wenzelm@28167
|
179 |
val l = length (! workers);
|
wenzelm@28167
|
180 |
val _ = excessive := l - m;
|
wenzelm@28203
|
181 |
val _ =
|
wenzelm@28203
|
182 |
if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) ()
|
wenzelm@28203
|
183 |
else ();
|
wenzelm@28206
|
184 |
|
wenzelm@28206
|
185 |
(*canceled groups*)
|
wenzelm@28206
|
186 |
val _ = change canceled (filter_out (TaskQueue.cancel (! queue)));
|
wenzelm@28206
|
187 |
|
wenzelm@28206
|
188 |
(*shutdown*)
|
wenzelm@28206
|
189 |
val continue = not (! do_shutdown andalso null (! workers));
|
wenzelm@28206
|
190 |
val _ = if continue then () else scheduler := NONE;
|
wenzelm@28167
|
191 |
|
wenzelm@28206
|
192 |
val _ = notify_all ();
|
wenzelm@28206
|
193 |
val _ = wait_timeout "scheduler" (Time.fromSeconds 1);
|
wenzelm@28206
|
194 |
in continue end;
|
wenzelm@28206
|
195 |
|
wenzelm@28206
|
196 |
fun scheduler_loop () =
|
wenzelm@28206
|
197 |
(while SYNCHRONIZED "scheduler" scheduler_next do ();
|
wenzelm@28206
|
198 |
Multithreading.tracing 4 (fn () => "scheduler: exit"));
|
wenzelm@28156
|
199 |
|
wenzelm@28203
|
200 |
fun scheduler_active () = (*requires SYNCHRONIZED*)
|
wenzelm@28203
|
201 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
|
wenzelm@28203
|
202 |
|
wenzelm@28192
|
203 |
fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () =>
|
wenzelm@28206
|
204 |
if not (scheduler_active ()) then
|
wenzelm@28242
|
205 |
(do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
|
wenzelm@28206
|
206 |
else if ! do_shutdown then error "Scheduler shutdown in progress"
|
wenzelm@28206
|
207 |
else ());
|
wenzelm@28156
|
208 |
|
wenzelm@28156
|
209 |
|
wenzelm@28191
|
210 |
(* future values: fork independent computation *)
|
wenzelm@28156
|
211 |
|
wenzelm@28191
|
212 |
fun future opt_group deps (e: unit -> 'a) =
|
wenzelm@28156
|
213 |
let
|
wenzelm@28191
|
214 |
val _ = scheduler_check ();
|
wenzelm@28177
|
215 |
|
wenzelm@28191
|
216 |
val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
|
wenzelm@28177
|
217 |
|
wenzelm@28166
|
218 |
val result = ref (NONE: 'a Exn.result option);
|
wenzelm@28177
|
219 |
val run = Multithreading.with_attributes (Thread.getAttributes ())
|
wenzelm@28177
|
220 |
(fn _ => fn ok =>
|
wenzelm@28177
|
221 |
let val res = if ok then Exn.capture e () else Exn.Exn Interrupt
|
wenzelm@28177
|
222 |
in result := SOME res; is_some (Exn.get_result res) end);
|
wenzelm@28177
|
223 |
|
wenzelm@28192
|
224 |
val task = SYNCHRONIZED "future" (fn () =>
|
wenzelm@28166
|
225 |
change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
|
wenzelm@28166
|
226 |
in Future {task = task, group = group, result = result} end;
|
wenzelm@28162
|
227 |
|
wenzelm@28191
|
228 |
fun fork e = future (Option.map #2 (thread_data ())) [] e;
|
wenzelm@28186
|
229 |
|
wenzelm@28186
|
230 |
|
wenzelm@28191
|
231 |
(* join: retrieve results *)
|
wenzelm@28186
|
232 |
|
wenzelm@28193
|
233 |
fun join_results xs =
|
wenzelm@28156
|
234 |
let
|
wenzelm@28206
|
235 |
val _ = scheduler_check ();
|
wenzelm@28193
|
236 |
val _ = Multithreading.self_critical () andalso
|
wenzelm@28193
|
237 |
error "Cannot join future values within critical section";
|
wenzelm@28177
|
238 |
|
wenzelm@28186
|
239 |
fun unfinished () =
|
wenzelm@28186
|
240 |
xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
|
wenzelm@28186
|
241 |
|
wenzelm@28186
|
242 |
(*alien thread -- refrain from contending for resources*)
|
wenzelm@28186
|
243 |
fun passive_join () = (*requires SYNCHRONIZED*)
|
wenzelm@28186
|
244 |
(case unfinished () of [] => ()
|
wenzelm@28203
|
245 |
| _ => (wait "passive_join"; passive_join ()));
|
wenzelm@28186
|
246 |
|
wenzelm@28186
|
247 |
(*proper worker thread -- actively work towards results*)
|
wenzelm@28186
|
248 |
fun active_join () = (*requires SYNCHRONIZED*)
|
wenzelm@28186
|
249 |
(case unfinished () of [] => ()
|
wenzelm@28186
|
250 |
| tasks =>
|
wenzelm@28186
|
251 |
(case change_result queue (TaskQueue.dequeue_towards tasks) of
|
wenzelm@28203
|
252 |
NONE => (worker_wait "active_join"; active_join ())
|
wenzelm@28203
|
253 |
| SOME work => (execute "active_join" work; active_join ())));
|
wenzelm@28186
|
254 |
|
wenzelm@28186
|
255 |
val _ =
|
wenzelm@28186
|
256 |
(case thread_data () of
|
wenzelm@28203
|
257 |
NONE => SYNCHRONIZED "passive_join" passive_join
|
wenzelm@28203
|
258 |
| SOME (task, _) => SYNCHRONIZED "active_join" (fn () =>
|
wenzelm@28186
|
259 |
(change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
|
wenzelm@28186
|
260 |
|
wenzelm@28193
|
261 |
in xs |> map (fn Future {result = ref (SOME res), ...} => res) end;
|
wenzelm@28186
|
262 |
|
wenzelm@28193
|
263 |
fun join x = Exn.release (singleton join_results x);
|
wenzelm@28156
|
264 |
|
wenzelm@28191
|
265 |
|
wenzelm@28202
|
266 |
(* misc operations *)
|
wenzelm@28202
|
267 |
|
wenzelm@28202
|
268 |
(*focus: collection of high-priority task*)
|
wenzelm@28202
|
269 |
fun focus tasks = SYNCHRONIZED "interrupt" (fn () =>
|
wenzelm@28202
|
270 |
change queue (TaskQueue.focus tasks));
|
wenzelm@28191
|
271 |
|
wenzelm@28202
|
272 |
(*interrupt: permissive signal, may get ignored*)
|
wenzelm@28197
|
273 |
fun interrupt_task id = SYNCHRONIZED "interrupt"
|
wenzelm@28197
|
274 |
(fn () => TaskQueue.interrupt_external (! queue) id);
|
wenzelm@28191
|
275 |
|
wenzelm@28206
|
276 |
(*cancel: present and future group members will be interrupted eventually*)
|
wenzelm@28206
|
277 |
fun cancel x =
|
wenzelm@28208
|
278 |
(scheduler_check ();
|
wenzelm@28208
|
279 |
SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ())));
|
wenzelm@28206
|
280 |
|
wenzelm@28206
|
281 |
|
wenzelm@28203
|
282 |
(*global join and shutdown*)
|
wenzelm@28203
|
283 |
fun shutdown () =
|
wenzelm@28203
|
284 |
(scheduler_check ();
|
wenzelm@28203
|
285 |
SYNCHRONIZED "shutdown" (fn () =>
|
wenzelm@28206
|
286 |
(while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
|
wenzelm@28206
|
287 |
while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
|
wenzelm@28206
|
288 |
do_shutdown := true;
|
wenzelm@28208
|
289 |
notify_all ();
|
wenzelm@28203
|
290 |
while not (null (! workers)) do wait "shutdown: workers";
|
wenzelm@28206
|
291 |
while scheduler_active () do wait "shutdown: scheduler still active")));
|
wenzelm@28203
|
292 |
|
wenzelm@28156
|
293 |
end;
|