| author | wenzelm |
| Fri, 02 Jan 2009 23:28:47 +0100 | |
| changeset 29328 | eba7f9f3b06d |
| parent 29119 | 99941fd0cb0e |
| child 29341 | 6bb007a0f9f2 |
| permissions | -rw-r--r-- |
| 28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
Author: Makarius |
|
3 |
||
| 28201 | 4 |
Future values. |
5 |
||
6 |
Notes: |
|
7 |
||
8 |
* Futures are similar to delayed evaluation, i.e. delay/force is |
|
9 |
generalized to fork/join (and variants). The idea is to model |
|
10 |
parallel value-oriented computations, but *not* communicating |
|
11 |
processes. |
|
12 |
||
13 |
* Futures are grouped; failure of one group member causes the whole |
|
14 |
group to be interrupted eventually. |
|
15 |
||
16 |
* Forked futures are evaluated spontaneously by a farm of worker |
|
17 |
threads in the background; join resynchronizes the computation and |
|
18 |
delivers results (values or exceptions). |
|
19 |
||
20 |
* The pool of worker threads is limited, usually in correlation with |
|
21 |
the number of physical cores on the machine. Note that allocation |
|
22 |
of runtime resources is distorted either if workers yield CPU time |
|
23 |
(e.g. via system sleep or wait operations), or if non-worker |
|
24 |
threads contend for significant runtime resources independently. |
|
| 28156 | 25 |
*) |
26 |
||
27 |
signature FUTURE = |
|
28 |
sig |
|
| 28645 | 29 |
val enabled: unit -> bool |
| 29119 | 30 |
type task = Task_Queue.task |
31 |
type group = Task_Queue.group |
|
|
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
32 |
val thread_data: unit -> (string * task) option |
| 28972 | 33 |
type 'a future |
34 |
val task_of: 'a future -> task |
|
35 |
val group_of: 'a future -> group |
|
36 |
val peek: 'a future -> 'a Exn.result option |
|
37 |
val is_finished: 'a future -> bool |
|
| 28997 | 38 |
val value: 'a -> 'a future |
| 28972 | 39 |
val fork: (unit -> 'a) -> 'a future |
|
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
40 |
val fork_group: group -> (unit -> 'a) -> 'a future |
|
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
41 |
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future |
| 29119 | 42 |
val fork_pri: int -> (unit -> 'a) -> 'a future |
| 28972 | 43 |
val join_results: 'a future list -> 'a Exn.result list |
44 |
val join_result: 'a future -> 'a Exn.result |
|
45 |
val join: 'a future -> 'a |
|
46 |
val map: ('a -> 'b) -> 'a future -> 'b future
|
|
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
47 |
val interrupt_task: string -> unit |
| 28972 | 48 |
val cancel: 'a future -> unit |
| 28203 | 49 |
val shutdown: unit -> unit |
| 28156 | 50 |
end; |
51 |
||
52 |
structure Future: FUTURE = |
|
53 |
struct |
|
54 |
||
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
55 |
(** future values **) |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
56 |
|
| 28645 | 57 |
fun enabled () = |
| 29118 | 58 |
Multithreading.enabled () andalso |
| 28645 | 59 |
not (Multithreading.self_critical ()); |
60 |
||
61 |
||
| 28167 | 62 |
(* identifiers *) |
63 |
||
| 29119 | 64 |
type task = Task_Queue.task; |
65 |
type group = Task_Queue.group; |
|
| 28167 | 66 |
|
|
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
67 |
local val tag = Universal.tag () : (string * task) option Universal.tag in |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
68 |
fun thread_data () = the_default NONE (Thread.getLocal tag); |
|
28390
0b9fb63b8e1d
proper setmp_thread_data for nested execute (cf. join_loop);
wenzelm
parents:
28386
diff
changeset
|
69 |
fun setmp_thread_data data f x = Library.setmp_thread_data tag (thread_data ()) (SOME data) f x; |
| 28167 | 70 |
end; |
71 |
||
72 |
||
73 |
(* datatype future *) |
|
74 |
||
| 28972 | 75 |
datatype 'a future = Future of |
| 28167 | 76 |
{task: task,
|
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
77 |
group: group, |
| 28167 | 78 |
result: 'a Exn.result option ref}; |
79 |
||
80 |
fun task_of (Future {task, ...}) = task;
|
|
81 |
fun group_of (Future {group, ...}) = group;
|
|
82 |
||
| 28558 | 83 |
fun peek (Future {result, ...}) = ! result;
|
84 |
fun is_finished x = is_some (peek x); |
|
| 28320 | 85 |
|
| 28997 | 86 |
fun value x = Future |
| 29119 | 87 |
{task = Task_Queue.new_task 0,
|
88 |
group = Task_Queue.new_group (), |
|
| 28997 | 89 |
result = ref (SOME (Exn.Result x))}; |
90 |
||
| 28167 | 91 |
|
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
92 |
|
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
93 |
(** scheduling **) |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
94 |
|
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
95 |
(* global state *) |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
96 |
|
| 29119 | 97 |
val queue = ref Task_Queue.empty; |
| 28468 | 98 |
val next = ref 0; |
| 28192 | 99 |
val workers = ref ([]: (Thread.thread * bool) list); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
100 |
val scheduler = ref (NONE: Thread.thread option); |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
101 |
val excessive = ref 0; |
| 29119 | 102 |
val canceled = ref ([]: Task_Queue.group list); |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
103 |
val do_shutdown = ref false; |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
104 |
|
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
105 |
|
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
106 |
(* synchronization *) |
| 28156 | 107 |
|
108 |
local |
|
109 |
val lock = Mutex.mutex (); |
|
110 |
val cond = ConditionVar.conditionVar (); |
|
111 |
in |
|
112 |
||
| 28575 | 113 |
fun SYNCHRONIZED name = SimpleThread.synchronized name lock; |
| 28156 | 114 |
|
| 29119 | 115 |
fun wait () = (*requires SYNCHRONIZED*) |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
116 |
ConditionVar.wait (cond, lock); |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
117 |
|
| 29119 | 118 |
fun wait_timeout timeout = (*requires SYNCHRONIZED*) |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
119 |
ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)); |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
120 |
|
|
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
121 |
fun notify_all () = (*requires SYNCHRONIZED*) |
|
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
122 |
ConditionVar.broadcast cond; |
| 28156 | 123 |
|
124 |
end; |
|
125 |
||
126 |
||
| 28382 | 127 |
(* worker activity *) |
128 |
||
129 |
fun trace_active () = |
|
130 |
let |
|
131 |
val ws = ! workers; |
|
132 |
val m = string_of_int (length ws); |
|
133 |
val n = string_of_int (length (filter #2 ws)); |
|
134 |
in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; |
|
135 |
||
136 |
fun change_active active = (*requires SYNCHRONIZED*) |
|
137 |
change workers (AList.update Thread.equal (Thread.self (), active)); |
|
138 |
||
139 |
||
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
140 |
(* execute *) |
| 28156 | 141 |
|
| 28167 | 142 |
fun execute name (task, group, run) = |
143 |
let |
|
| 28382 | 144 |
val _ = trace_active (); |
|
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
145 |
val ok = setmp_thread_data (name, task) run (); |
| 28192 | 146 |
val _ = SYNCHRONIZED "execute" (fn () => |
| 29119 | 147 |
(change queue (Task_Queue.finish task); |
| 28186 | 148 |
if ok then () |
| 29119 | 149 |
else if Task_Queue.cancel (! queue) group then () |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
150 |
else change canceled (cons group); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
151 |
notify_all ())); |
| 28167 | 152 |
in () end; |
153 |
||
154 |
||
155 |
(* worker threads *) |
|
156 |
||
| 29119 | 157 |
fun worker_wait () = (*requires SYNCHRONIZED*) |
158 |
(change_active false; wait (); change_active true); |
|
| 28162 | 159 |
|
| 29119 | 160 |
fun worker_next () = (*requires SYNCHRONIZED*) |
| 28167 | 161 |
if ! excessive > 0 then |
162 |
(dec excessive; |
|
| 28192 | 163 |
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
| 28203 | 164 |
notify_all (); |
| 28167 | 165 |
NONE) |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
166 |
else |
| 29119 | 167 |
(case change_result queue Task_Queue.dequeue of |
168 |
NONE => (worker_wait (); worker_next ()) |
|
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
169 |
| some => some); |
| 28156 | 170 |
|
| 28167 | 171 |
fun worker_loop name = |
| 29119 | 172 |
(case SYNCHRONIZED name worker_next of |
173 |
NONE => () |
|
| 28167 | 174 |
| SOME work => (execute name work; worker_loop name)); |
| 28156 | 175 |
|
| 28167 | 176 |
fun worker_start name = (*requires SYNCHRONIZED*) |
| 28242 | 177 |
change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true)); |
| 28156 | 178 |
|
179 |
||
180 |
(* scheduler *) |
|
181 |
||
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
182 |
fun scheduler_next () = (*requires SYNCHRONIZED*) |
| 28156 | 183 |
let |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
184 |
(*worker threads*) |
| 28191 | 185 |
val _ = |
| 28192 | 186 |
(case List.partition (Thread.isActive o #1) (! workers) of |
| 28191 | 187 |
(_, []) => () |
188 |
| (active, inactive) => |
|
189 |
(workers := active; Multithreading.tracing 0 (fn () => |
|
| 28192 | 190 |
"SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); |
| 28382 | 191 |
val _ = trace_active (); |
| 28191 | 192 |
|
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
193 |
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
| 28167 | 194 |
val l = length (! workers); |
195 |
val _ = excessive := l - m; |
|
| 28203 | 196 |
val _ = |
| 28468 | 197 |
if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
|
| 28203 | 198 |
else (); |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
199 |
|
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
200 |
(*canceled groups*) |
| 29119 | 201 |
val _ = change canceled (filter_out (Task_Queue.cancel (! queue))); |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
202 |
|
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
203 |
(*shutdown*) |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
204 |
val continue = not (! do_shutdown andalso null (! workers)); |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
205 |
val _ = if continue then () else scheduler := NONE; |
| 28167 | 206 |
|
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
207 |
val _ = notify_all (); |
| 29119 | 208 |
val _ = wait_timeout (Time.fromSeconds 3); |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
209 |
in continue end; |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
210 |
|
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
211 |
fun scheduler_loop () = |
| 29119 | 212 |
while SYNCHRONIZED "scheduler" scheduler_next do (); |
| 28156 | 213 |
|
| 28203 | 214 |
fun scheduler_active () = (*requires SYNCHRONIZED*) |
215 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); |
|
216 |
||
| 28464 | 217 |
fun scheduler_check name = SYNCHRONIZED name (fn () => |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
218 |
if not (scheduler_active ()) then |
| 29119 | 219 |
(do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop)) |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
220 |
else if ! do_shutdown then error "Scheduler shutdown in progress" |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
221 |
else ()); |
| 28156 | 222 |
|
223 |
||
| 28191 | 224 |
(* future values: fork independent computation *) |
| 28156 | 225 |
|
|
28304
4b0477452943
future tasks: support boolean priorities (true = high, false = low/irrelevant);
wenzelm
parents:
28276
diff
changeset
|
226 |
fun future opt_group deps pri (e: unit -> 'a) = |
| 28156 | 227 |
let |
| 28464 | 228 |
val _ = scheduler_check "future check"; |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
229 |
|
| 29119 | 230 |
val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
231 |
|
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
232 |
val result = ref (NONE: 'a Exn.result option); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
233 |
val run = Multithreading.with_attributes (Thread.getAttributes ()) |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
234 |
(fn _ => fn ok => |
| 28532 | 235 |
let |
236 |
val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt; |
|
|
28548
003f52c2bb8f
future result: Interrupt invalidates group, but pretends success otherwise;
wenzelm
parents:
28534
diff
changeset
|
237 |
val _ = result := SOME res; |
| 28532 | 238 |
val res_ok = |
239 |
(case res of |
|
240 |
Exn.Result _ => true |
|
| 29119 | 241 |
| Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true) |
| 28532 | 242 |
| _ => false); |
|
28548
003f52c2bb8f
future result: Interrupt invalidates group, but pretends success otherwise;
wenzelm
parents:
28534
diff
changeset
|
243 |
in res_ok end); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
244 |
|
| 28192 | 245 |
val task = SYNCHRONIZED "future" (fn () => |
| 29119 | 246 |
change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ()); |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
247 |
in Future {task = task, group = group, result = result} end;
|
| 28162 | 248 |
|
| 29119 | 249 |
fun fork e = future NONE [] 0 e; |
250 |
fun fork_group group e = future (SOME group) [] 0 e; |
|
251 |
fun fork_deps deps e = future NONE (map task_of deps) 0 e; |
|
252 |
fun fork_pri pri e = future NONE [] pri e; |
|
| 28186 | 253 |
|
254 |
||
| 28191 | 255 |
(* join: retrieve results *) |
| 28186 | 256 |
|
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
257 |
fun join_results [] = [] |
| 28532 | 258 |
| join_results xs = uninterruptible (fn _ => fn () => |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
259 |
let |
| 28464 | 260 |
val _ = scheduler_check "join check"; |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
261 |
val _ = Multithreading.self_critical () andalso |
|
28647
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
262 |
exists (not o is_finished) xs andalso |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
263 |
error "Cannot join future values within critical section"; |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
264 |
|
| 28386 | 265 |
fun join_loop _ [] = () |
266 |
| join_loop name tasks = |
|
267 |
(case SYNCHRONIZED name (fn () => |
|
| 29119 | 268 |
change_result queue (Task_Queue.dequeue_towards tasks)) of |
| 28382 | 269 |
NONE => () |
| 28386 | 270 |
| SOME (work, tasks') => (execute name work; join_loop name tasks')); |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
271 |
val _ = |
|
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
272 |
(case thread_data () of |
| 28382 | 273 |
NONE => |
274 |
(*alien thread -- refrain from contending for resources*) |
|
275 |
while exists (not o is_finished) xs |
|
| 29119 | 276 |
do SYNCHRONIZED "join_thread" (fn () => wait ()) |
|
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
277 |
| SOME (name, task) => |
| 28382 | 278 |
(*proper task -- actively work towards results*) |
279 |
let |
|
280 |
val unfinished = xs |> map_filter |
|
281 |
(fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
|
|
282 |
val _ = SYNCHRONIZED "join" (fn () => |
|
| 29119 | 283 |
(change queue (Task_Queue.depend unfinished task); notify_all ())); |
| 28386 | 284 |
val _ = join_loop ("join_loop: " ^ name) unfinished;
|
| 28382 | 285 |
val _ = |
286 |
while exists (not o is_finished) xs |
|
| 29119 | 287 |
do SYNCHRONIZED "join_task" (fn () => worker_wait ()); |
| 28382 | 288 |
in () end); |
| 28186 | 289 |
|
| 28532 | 290 |
in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
|
| 28186 | 291 |
|
|
28647
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
292 |
fun join_result x = singleton join_results x; |
|
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
293 |
fun join x = Exn.release (join_result x); |
| 28156 | 294 |
|
| 29119 | 295 |
fun map f x = |
296 |
let val task = task_of x |
|
297 |
in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end; |
|
|
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
298 |
|
| 28191 | 299 |
|
|
28202
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
300 |
(* misc operations *) |
|
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
301 |
|
|
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
302 |
(*interrupt: permissive signal, may get ignored*) |
| 28197 | 303 |
fun interrupt_task id = SYNCHRONIZED "interrupt" |
| 29119 | 304 |
(fn () => Task_Queue.interrupt_external (! queue) id); |
| 28191 | 305 |
|
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
306 |
(*cancel: present and future group members will be interrupted eventually*) |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
307 |
fun cancel x = |
| 28464 | 308 |
(scheduler_check "cancel check"; |
| 28208 | 309 |
SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ()))); |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
310 |
|
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
311 |
|
| 28203 | 312 |
(*global join and shutdown*) |
313 |
fun shutdown () = |
|
| 28276 | 314 |
if Multithreading.available then |
| 28464 | 315 |
(scheduler_check "shutdown check"; |
| 28276 | 316 |
SYNCHRONIZED "shutdown" (fn () => |
| 29119 | 317 |
(while not (scheduler_active ()) do wait (); |
318 |
while not (Task_Queue.is_empty (! queue)) do wait (); |
|
| 28276 | 319 |
do_shutdown := true; |
320 |
notify_all (); |
|
| 29119 | 321 |
while not (null (! workers)) do wait (); |
322 |
while scheduler_active () do wait (); |
|
| 28470 | 323 |
OS.Process.sleep (Time.fromMilliseconds 300)))) |
| 28276 | 324 |
else (); |
| 28203 | 325 |
|
| 28156 | 326 |
end; |
| 28972 | 327 |
|
328 |
type 'a future = 'a Future.future; |
|
329 |