author | wenzelm |
Tue, 21 Jul 2009 11:30:12 +0200 | |
changeset 32095 | ad4be204fdfe |
parent 32058 | c76fd93b3b99 |
child 32096 | cb9adb13f892 |
permissions | -rw-r--r-- |
28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
Author: Makarius |
|
3 |
||
28201 | 4 |
Future values. |
5 |
||
6 |
Notes: |
|
7 |
||
8 |
* Futures are similar to delayed evaluation, i.e. delay/force is |
|
9 |
generalized to fork/join (and variants). The idea is to model |
|
10 |
parallel value-oriented computations, but *not* communicating |
|
11 |
processes. |
|
12 |
||
13 |
* Futures are grouped; failure of one group member causes the whole |
|
14 |
group to be interrupted eventually. |
|
15 |
||
16 |
* Forked futures are evaluated spontaneously by a farm of worker |
|
17 |
threads in the background; join resynchronizes the computation and |
|
18 |
delivers results (values or exceptions). |
|
19 |
||
20 |
* The pool of worker threads is limited, usually in correlation with |
|
21 |
the number of physical cores on the machine. Note that allocation |
|
22 |
of runtime resources is distorted either if workers yield CPU time |
|
23 |
(e.g. via system sleep or wait operations), or if non-worker |
|
24 |
threads contend for significant runtime resources independently. |
|
28156 | 25 |
*) |
26 |
||
27 |
signature FUTURE = |
|
28 |
sig |
|
28645 | 29 |
val enabled: unit -> bool |
29119 | 30 |
type task = Task_Queue.task |
31 |
type group = Task_Queue.group |
|
32058 | 32 |
val is_worker: unit -> bool |
28972 | 33 |
type 'a future |
34 |
val task_of: 'a future -> task |
|
35 |
val group_of: 'a future -> group |
|
36 |
val peek: 'a future -> 'a Exn.result option |
|
37 |
val is_finished: 'a future -> bool |
|
28997 | 38 |
val value: 'a -> 'a future |
28972 | 39 |
val fork: (unit -> 'a) -> 'a future |
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
40 |
val fork_group: group -> (unit -> 'a) -> 'a future |
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
41 |
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future |
29119 | 42 |
val fork_pri: int -> (unit -> 'a) -> 'a future |
32058 | 43 |
val fork_local: int -> (unit -> 'a) -> 'a future |
28972 | 44 |
val join_results: 'a future list -> 'a Exn.result list |
45 |
val join_result: 'a future -> 'a Exn.result |
|
46 |
val join: 'a future -> 'a |
|
47 |
val map: ('a -> 'b) -> 'a future -> 'b future |
|
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
48 |
val interruptible_task: ('a -> 'b) -> 'a -> 'b |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
49 |
val interrupt_task: string -> unit |
29431 | 50 |
val cancel_group: group -> unit |
28972 | 51 |
val cancel: 'a future -> unit |
28203 | 52 |
val shutdown: unit -> unit |
28156 | 53 |
end; |
54 |
||
55 |
structure Future: FUTURE = |
|
56 |
struct |
|
57 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
58 |
(** future values **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
59 |
|
28645 | 60 |
fun enabled () = |
29118 | 61 |
Multithreading.enabled () andalso |
28645 | 62 |
not (Multithreading.self_critical ()); |
63 |
||
64 |
||
28167 | 65 |
(* identifiers *) |
66 |
||
29119 | 67 |
type task = Task_Queue.task; |
68 |
type group = Task_Queue.group; |
|
28167 | 69 |
|
32058 | 70 |
local |
71 |
val tag = Universal.tag () : (string * task * group) option Universal.tag; |
|
72 |
in |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
73 |
fun thread_data () = the_default NONE (Thread.getLocal tag); |
32058 | 74 |
fun setmp_thread_data data f x = |
75 |
Library.setmp_thread_data tag (thread_data ()) (SOME data) f x; |
|
28167 | 76 |
end; |
77 |
||
32058 | 78 |
val is_worker = is_some o thread_data; |
79 |
||
28167 | 80 |
|
81 |
(* datatype future *) |
|
82 |
||
28972 | 83 |
datatype 'a future = Future of |
28167 | 84 |
{task: task, |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
85 |
group: group, |
28167 | 86 |
result: 'a Exn.result option ref}; |
87 |
||
88 |
fun task_of (Future {task, ...}) = task; |
|
89 |
fun group_of (Future {group, ...}) = group; |
|
90 |
||
28558 | 91 |
fun peek (Future {result, ...}) = ! result; |
92 |
fun is_finished x = is_some (peek x); |
|
28320 | 93 |
|
28997 | 94 |
fun value x = Future |
29119 | 95 |
{task = Task_Queue.new_task 0, |
96 |
group = Task_Queue.new_group (), |
|
28997 | 97 |
result = ref (SOME (Exn.Result x))}; |
98 |
||
28167 | 99 |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
100 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
101 |
(** scheduling **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
102 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
103 |
(* global state *) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
104 |
|
29119 | 105 |
val queue = ref Task_Queue.empty; |
28468 | 106 |
val next = ref 0; |
28192 | 107 |
val workers = ref ([]: (Thread.thread * bool) list); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
108 |
val scheduler = ref (NONE: Thread.thread option); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
109 |
val excessive = ref 0; |
29119 | 110 |
val canceled = ref ([]: Task_Queue.group list); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
111 |
val do_shutdown = ref false; |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
112 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
113 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
114 |
(* synchronization *) |
28156 | 115 |
|
116 |
local |
|
117 |
val lock = Mutex.mutex (); |
|
118 |
val cond = ConditionVar.conditionVar (); |
|
119 |
in |
|
120 |
||
28575 | 121 |
fun SYNCHRONIZED name = SimpleThread.synchronized name lock; |
28156 | 122 |
|
29119 | 123 |
fun wait () = (*requires SYNCHRONIZED*) |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
124 |
ConditionVar.wait (cond, lock); |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
125 |
|
29119 | 126 |
fun wait_timeout timeout = (*requires SYNCHRONIZED*) |
29341
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
127 |
ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout))); |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
128 |
|
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
129 |
fun notify_all () = (*requires SYNCHRONIZED*) |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
130 |
ConditionVar.broadcast cond; |
28156 | 131 |
|
132 |
end; |
|
133 |
||
134 |
||
28382 | 135 |
(* worker activity *) |
136 |
||
32095 | 137 |
fun count_active ws = |
138 |
fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0; |
|
139 |
||
140 |
fun trace_active () = Multithreading.tracing 1 (fn () => |
|
28382 | 141 |
let |
142 |
val ws = ! workers; |
|
143 |
val m = string_of_int (length ws); |
|
32095 | 144 |
val n = string_of_int (count_active ws); |
145 |
in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end); |
|
28382 | 146 |
|
147 |
fun change_active active = (*requires SYNCHRONIZED*) |
|
148 |
change workers (AList.update Thread.equal (Thread.self (), active)); |
|
149 |
||
32095 | 150 |
fun overloaded () = |
151 |
count_active (! workers) > Multithreading.max_threads_value (); |
|
152 |
||
28382 | 153 |
|
29366 | 154 |
(* execute jobs *) |
28156 | 155 |
|
29341
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
156 |
fun do_cancel group = (*requires SYNCHRONIZED*) |
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
157 |
change canceled (insert Task_Queue.eq_group group); |
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
158 |
|
29366 | 159 |
fun execute name (task, group, jobs) = |
28167 | 160 |
let |
28382 | 161 |
val _ = trace_active (); |
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
162 |
val valid = Task_Queue.is_valid group; |
32058 | 163 |
val ok = setmp_thread_data (name, task, group) (fn () => |
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
164 |
fold (fn job => fn ok => job valid andalso ok) jobs true) (); |
28192 | 165 |
val _ = SYNCHRONIZED "execute" (fn () => |
29119 | 166 |
(change queue (Task_Queue.finish task); |
28186 | 167 |
if ok then () |
29119 | 168 |
else if Task_Queue.cancel (! queue) group then () |
29341
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
169 |
else do_cancel group; |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
170 |
notify_all ())); |
28167 | 171 |
in () end; |
172 |
||
173 |
||
174 |
(* worker threads *) |
|
175 |
||
29119 | 176 |
fun worker_wait () = (*requires SYNCHRONIZED*) |
177 |
(change_active false; wait (); change_active true); |
|
28162 | 178 |
|
29119 | 179 |
fun worker_next () = (*requires SYNCHRONIZED*) |
28167 | 180 |
if ! excessive > 0 then |
181 |
(dec excessive; |
|
28192 | 182 |
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
28203 | 183 |
notify_all (); |
28167 | 184 |
NONE) |
32095 | 185 |
else if overloaded () then (worker_wait (); worker_next ()) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
186 |
else |
29119 | 187 |
(case change_result queue Task_Queue.dequeue of |
188 |
NONE => (worker_wait (); worker_next ()) |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
189 |
| some => some); |
28156 | 190 |
|
28167 | 191 |
fun worker_loop name = |
29119 | 192 |
(case SYNCHRONIZED name worker_next of |
193 |
NONE => () |
|
28167 | 194 |
| SOME work => (execute name work; worker_loop name)); |
28156 | 195 |
|
28167 | 196 |
fun worker_start name = (*requires SYNCHRONIZED*) |
28242 | 197 |
change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true)); |
28156 | 198 |
|
199 |
||
200 |
(* scheduler *) |
|
201 |
||
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
202 |
fun scheduler_next () = (*requires SYNCHRONIZED*) |
28156 | 203 |
let |
32053 | 204 |
(*queue status*) |
205 |
val _ = Multithreading.tracing 1 (fn () => |
|
206 |
let val {ready, pending, running} = Task_Queue.status (! queue) in |
|
207 |
"SCHEDULE: " ^ |
|
208 |
string_of_int ready ^ " ready, " ^ |
|
209 |
string_of_int pending ^ " pending, " ^ |
|
210 |
string_of_int running ^ " running" |
|
211 |
end); |
|
212 |
||
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
213 |
(*worker threads*) |
32095 | 214 |
val ws = ! workers; |
28191 | 215 |
val _ = |
32095 | 216 |
if forall (Thread.isActive o #1) ws then () |
217 |
else |
|
218 |
(case List.partition (Thread.isActive o #1) ws of |
|
219 |
(_, []) => () |
|
220 |
| (active, inactive) => |
|
221 |
(workers := active; Multithreading.tracing 0 (fn () => |
|
222 |
"SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); |
|
28382 | 223 |
val _ = trace_active (); |
28191 | 224 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
225 |
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
32095 | 226 |
val mm = (m * 3) div 2; |
227 |
val l = length ws; |
|
228 |
val _ = excessive := l - mm; |
|
28203 | 229 |
val _ = |
32095 | 230 |
if mm > l then |
231 |
funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) () |
|
28203 | 232 |
else (); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
233 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
234 |
(*canceled groups*) |
32095 | 235 |
val _ = change canceled (filter_out (Task_Queue.cancel (! queue))); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
236 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
237 |
(*shutdown*) |
32095 | 238 |
val continue = not (! do_shutdown andalso null ws); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
239 |
val _ = if continue then () else scheduler := NONE; |
28167 | 240 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
241 |
val _ = notify_all (); |
30666
d6248d4508d5
future scheduler: reduced wait timeout if tasks need to be canceled -- to improve reactivity of interrupts;
wenzelm
parents:
30618
diff
changeset
|
242 |
val _ = interruptible (fn () => |
d6248d4508d5
future scheduler: reduced wait timeout if tasks need to be canceled -- to improve reactivity of interrupts;
wenzelm
parents:
30618
diff
changeset
|
243 |
wait_timeout (Time.fromMilliseconds (if null (! canceled) then 1000 else 50))) () |
29341
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
244 |
handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue)); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
245 |
in continue end; |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
246 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
247 |
fun scheduler_loop () = |
29119 | 248 |
while SYNCHRONIZED "scheduler" scheduler_next do (); |
28156 | 249 |
|
28203 | 250 |
fun scheduler_active () = (*requires SYNCHRONIZED*) |
251 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); |
|
252 |
||
28464 | 253 |
fun scheduler_check name = SYNCHRONIZED name (fn () => |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
254 |
if not (scheduler_active ()) then |
29119 | 255 |
(do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop)) |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
256 |
else if ! do_shutdown then error "Scheduler shutdown in progress" |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
257 |
else ()); |
28156 | 258 |
|
259 |
||
29366 | 260 |
|
261 |
(** futures **) |
|
28156 | 262 |
|
29366 | 263 |
(* future job: fill result *) |
264 |
||
265 |
fun future_job group (e: unit -> 'a) = |
|
28156 | 266 |
let |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
267 |
val result = ref (NONE: 'a Exn.result option); |
30612
cb6421b6a18f
future_job: do not inherit attributes, but enforce restricted interrupts -- attempt to prevent interrupt race conditions;
wenzelm
parents:
29551
diff
changeset
|
268 |
val job = Multithreading.with_attributes Multithreading.restricted_interrupts |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
269 |
(fn _ => fn ok => |
28532 | 270 |
let |
271 |
val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt; |
|
28548
003f52c2bb8f
future result: Interrupt invalidates group, but pretends success otherwise;
wenzelm
parents:
28534
diff
changeset
|
272 |
val _ = result := SOME res; |
28532 | 273 |
val res_ok = |
274 |
(case res of |
|
275 |
Exn.Result _ => true |
|
29119 | 276 |
| Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true) |
28532 | 277 |
| _ => false); |
28548
003f52c2bb8f
future result: Interrupt invalidates group, but pretends success otherwise;
wenzelm
parents:
28534
diff
changeset
|
278 |
in res_ok end); |
29366 | 279 |
in (result, job) end; |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
280 |
|
29366 | 281 |
|
282 |
(* fork *) |
|
283 |
||
284 |
fun fork_future opt_group deps pri e = |
|
285 |
let |
|
286 |
val _ = scheduler_check "future check"; |
|
287 |
||
288 |
val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); |
|
289 |
val (result, job) = future_job group e; |
|
28192 | 290 |
val task = SYNCHRONIZED "future" (fn () => |
29366 | 291 |
change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ()); |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
292 |
in Future {task = task, group = group, result = result} end; |
28162 | 293 |
|
29366 | 294 |
fun fork e = fork_future NONE [] 0 e; |
295 |
fun fork_group group e = fork_future (SOME group) [] 0 e; |
|
296 |
fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e; |
|
297 |
fun fork_pri pri e = fork_future NONE [] pri e; |
|
32058 | 298 |
fun fork_local pri e = fork_future (Option.map #3 (thread_data ())) [] pri e; |
28186 | 299 |
|
300 |
||
29366 | 301 |
(* join *) |
302 |
||
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
303 |
local |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
304 |
|
29366 | 305 |
fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); |
28186 | 306 |
|
32095 | 307 |
fun join_next deps = (*requires SYNCHRONIZED*) |
308 |
if overloaded () then (worker_wait (); join_next deps) |
|
309 |
else change_result queue (Task_Queue.dequeue_towards deps); |
|
310 |
||
32055
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
311 |
fun join_deps deps = |
32095 | 312 |
(case SYNCHRONIZED "join" (fn () => join_next deps) of |
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
313 |
NONE => () |
32055
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
314 |
| SOME (work, deps') => (execute "join" work; join_deps deps')); |
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
315 |
|
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
316 |
in |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
317 |
|
29366 | 318 |
fun join_results xs = |
319 |
if forall is_finished xs then map get_result xs |
|
320 |
else uninterruptible (fn _ => fn () => |
|
321 |
let |
|
322 |
val _ = scheduler_check "join check"; |
|
323 |
val _ = Multithreading.self_critical () andalso |
|
324 |
error "Cannot join future values within critical section"; |
|
32055
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
325 |
|
32058 | 326 |
val worker = is_worker (); |
32055
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
327 |
fun join_wait x = |
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
328 |
if SYNCHRONIZED "join_wait" (fn () => |
32058 | 329 |
is_finished x orelse (if worker then worker_wait () else wait (); false)) |
32055
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
330 |
then () else join_wait x; |
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
331 |
|
32058 | 332 |
val _ = if worker then join_deps (map task_of xs) else (); |
32055
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
333 |
val _ = List.app join_wait xs; |
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
wenzelm
parents:
32053
diff
changeset
|
334 |
|
29366 | 335 |
in map get_result xs end) (); |
28186 | 336 |
|
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
337 |
end; |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
338 |
|
28647
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
339 |
fun join_result x = singleton join_results x; |
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
340 |
fun join x = Exn.release (join_result x); |
28156 | 341 |
|
29366 | 342 |
|
343 |
(* map *) |
|
344 |
||
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
345 |
fun map_future f x = |
29366 | 346 |
let |
347 |
val _ = scheduler_check "map_future check"; |
|
348 |
||
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
349 |
val task = task_of x; |
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
350 |
val group = Task_Queue.new_group (); |
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
351 |
val (result, job) = future_job group (fn () => f (join x)); |
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
352 |
|
29366 | 353 |
val extended = SYNCHRONIZED "map_future" (fn () => |
354 |
(case Task_Queue.extend task job (! queue) of |
|
355 |
SOME queue' => (queue := queue'; true) |
|
356 |
| NONE => false)); |
|
357 |
in |
|
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
358 |
if extended then Future {task = task, group = group, result = result} |
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
359 |
else fork_future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) |
29366 | 360 |
end; |
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
361 |
|
28191 | 362 |
|
29431 | 363 |
(* cancellation *) |
28202
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
364 |
|
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
365 |
fun interruptible_task f x = |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
366 |
if Multithreading.available then |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
367 |
Multithreading.with_attributes |
32058 | 368 |
(if is_worker () |
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
369 |
then Multithreading.restricted_interrupts |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
370 |
else Multithreading.regular_interrupts) |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
371 |
(fn _ => f) x |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
372 |
else interruptible f x; |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
373 |
|
28202
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
374 |
(*interrupt: permissive signal, may get ignored*) |
28197 | 375 |
fun interrupt_task id = SYNCHRONIZED "interrupt" |
29119 | 376 |
(fn () => Task_Queue.interrupt_external (! queue) id); |
28191 | 377 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
378 |
(*cancel: present and future group members will be interrupted eventually*) |
29431 | 379 |
fun cancel_group group = |
28464 | 380 |
(scheduler_check "cancel check"; |
29431 | 381 |
SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ()))); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
382 |
|
29431 | 383 |
fun cancel x = cancel_group (group_of x); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
384 |
|
29366 | 385 |
|
386 |
(** global join and shutdown **) |
|
387 |
||
28203 | 388 |
fun shutdown () = |
28276 | 389 |
if Multithreading.available then |
28464 | 390 |
(scheduler_check "shutdown check"; |
28276 | 391 |
SYNCHRONIZED "shutdown" (fn () => |
29119 | 392 |
(while not (scheduler_active ()) do wait (); |
393 |
while not (Task_Queue.is_empty (! queue)) do wait (); |
|
28276 | 394 |
do_shutdown := true; |
395 |
notify_all (); |
|
29119 | 396 |
while not (null (! workers)) do wait (); |
397 |
while scheduler_active () do wait (); |
|
28470 | 398 |
OS.Process.sleep (Time.fromMilliseconds 300)))) |
28276 | 399 |
else (); |
28203 | 400 |
|
29366 | 401 |
|
402 |
(*final declarations of this structure!*) |
|
403 |
val map = map_future; |
|
404 |
||
28156 | 405 |
end; |
28972 | 406 |
|
407 |
type 'a future = 'a Future.future; |
|
408 |