author | haftmann |
Tue, 23 Sep 2008 18:11:44 +0200 | |
changeset 28337 | 93964076e7b8 |
parent 28331 | 33d58fdc177d |
child 28380 | 0130201cc0e3 |
permissions | -rw-r--r-- |
28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
28201 | 5 |
Future values. |
6 |
||
7 |
Notes: |
|
8 |
||
9 |
* Futures are similar to delayed evaluation, i.e. delay/force is |
|
10 |
generalized to fork/join (and variants). The idea is to model |
|
11 |
parallel value-oriented computations, but *not* communicating |
|
12 |
processes. |
|
13 |
||
14 |
* Futures are grouped; failure of one group member causes the whole |
|
15 |
group to be interrupted eventually. |
|
16 |
||
17 |
* Forked futures are evaluated spontaneously by a farm of worker |
|
18 |
threads in the background; join resynchronizes the computation and |
|
19 |
delivers results (values or exceptions). |
|
20 |
||
21 |
* The pool of worker threads is limited, usually in correlation with |
|
22 |
the number of physical cores on the machine. Note that allocation |
|
23 |
of runtime resources is distorted either if workers yield CPU time |
|
24 |
(e.g. via system sleep or wait operations), or if non-worker |
|
25 |
threads contend for significant runtime resources independently. |
|
28156 | 26 |
*) |
27 |
||
28 |
signature FUTURE = |
|
29 |
sig |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
30 |
type task = TaskQueue.task |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
31 |
type group = TaskQueue.group |
28156 | 32 |
type 'a T |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
33 |
val task_of: 'a T -> task |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
34 |
val group_of: 'a T -> group |
28320 | 35 |
val is_finished: 'a T -> bool |
28304
4b0477452943
future tasks: support boolean priorities (true = high, false = low/irrelevant);
wenzelm
parents:
28276
diff
changeset
|
36 |
val future: group option -> task list -> bool -> (unit -> 'a) -> 'a T |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
37 |
val fork: (unit -> 'a) -> 'a T |
28304
4b0477452943
future tasks: support boolean priorities (true = high, false = low/irrelevant);
wenzelm
parents:
28276
diff
changeset
|
38 |
val fork_irrelevant: (unit -> 'a) -> 'a T |
28193
7ed74d0ba607
replaced join_all by join_results, which returns Exn.results;
wenzelm
parents:
28192
diff
changeset
|
39 |
val join_results: 'a T list -> 'a Exn.result list |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
40 |
val join: 'a T -> 'a |
28202
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
41 |
val focus: task list -> unit |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
42 |
val interrupt_task: string -> unit |
28197 | 43 |
val cancel: 'a T -> unit |
28203 | 44 |
val shutdown: unit -> unit |
28156 | 45 |
end; |
46 |
||
47 |
structure Future: FUTURE = |
|
48 |
struct |
|
49 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
50 |
(** future values **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
51 |
|
28167 | 52 |
(* identifiers *) |
53 |
||
54 |
type task = TaskQueue.task; |
|
55 |
type group = TaskQueue.group; |
|
56 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
57 |
local val tag = Universal.tag () : (task * group) option Universal.tag in |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
58 |
fun thread_data () = the_default NONE (Thread.getLocal tag); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
59 |
fun set_thread_data x = Thread.setLocal (tag, x); |
28167 | 60 |
end; |
61 |
||
62 |
||
63 |
(* datatype future *) |
|
64 |
||
65 |
datatype 'a T = Future of |
|
66 |
{task: task, |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
67 |
group: group, |
28167 | 68 |
result: 'a Exn.result option ref}; |
69 |
||
70 |
fun task_of (Future {task, ...}) = task; |
|
71 |
fun group_of (Future {group, ...}) = group; |
|
72 |
||
28320 | 73 |
fun is_finished (Future {result, ...}) = is_some (! result); |
74 |
||
28167 | 75 |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
76 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
77 |
(** scheduling **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
78 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
79 |
(* global state *) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
80 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
81 |
val queue = ref TaskQueue.empty; |
28192 | 82 |
val workers = ref ([]: (Thread.thread * bool) list); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
83 |
val scheduler = ref (NONE: Thread.thread option); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
84 |
val excessive = ref 0; |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
85 |
val canceled = ref ([]: TaskQueue.group list); |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
86 |
val do_shutdown = ref false; |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
87 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
88 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
89 |
(* synchronization *) |
28156 | 90 |
|
91 |
local |
|
92 |
val lock = Mutex.mutex (); |
|
93 |
val cond = ConditionVar.conditionVar (); |
|
94 |
in |
|
95 |
||
28192 | 96 |
fun SYNCHRONIZED name e = uninterruptible (fn restore_attributes => fn () => |
28162 | 97 |
let |
28192 | 98 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": locking"); |
28162 | 99 |
val _ = Mutex.lock lock; |
28192 | 100 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": locked"); |
28162 | 101 |
val result = Exn.capture (restore_attributes e) (); |
102 |
val _ = Mutex.unlock lock; |
|
28192 | 103 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": unlocked"); |
28162 | 104 |
in Exn.release result end) (); |
28156 | 105 |
|
28167 | 106 |
fun wait name = (*requires SYNCHRONIZED*) |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
107 |
ConditionVar.wait (cond, lock); |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
108 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
109 |
fun wait_timeout name timeout = (*requires SYNCHRONIZED*) |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
110 |
ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)); |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
111 |
|
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
112 |
fun notify_all () = (*requires SYNCHRONIZED*) |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
113 |
ConditionVar.broadcast cond; |
28156 | 114 |
|
115 |
end; |
|
116 |
||
117 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
118 |
(* execute *) |
28156 | 119 |
|
28167 | 120 |
fun execute name (task, group, run) = |
121 |
let |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
122 |
val _ = set_thread_data (SOME (task, group)); |
28167 | 123 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": running"); |
124 |
val ok = run (); |
|
125 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
126 |
val _ = set_thread_data NONE; |
28192 | 127 |
val _ = SYNCHRONIZED "execute" (fn () => |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
128 |
(change queue (TaskQueue.finish task); |
28186 | 129 |
if ok then () |
28191 | 130 |
else if TaskQueue.cancel (! queue) group then () |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
131 |
else change canceled (cons group); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
132 |
notify_all ())); |
28167 | 133 |
in () end; |
134 |
||
135 |
||
136 |
(* worker threads *) |
|
137 |
||
28192 | 138 |
fun change_active active = (*requires SYNCHRONIZED*) |
28203 | 139 |
let |
140 |
val _ = change workers (AList.update Thread.equal (Thread.self (), active)); |
|
141 |
val ws = ! workers; |
|
142 |
val m = string_of_int (length ws); |
|
143 |
val n = string_of_int (length (filter #2 ws)); |
|
144 |
in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; |
|
145 |
||
28186 | 146 |
|
147 |
fun worker_wait name = (*requires SYNCHRONIZED*) |
|
148 |
(change_active false; wait name; change_active true); |
|
28162 | 149 |
|
28167 | 150 |
fun worker_next name = (*requires SYNCHRONIZED*) |
151 |
if ! excessive > 0 then |
|
152 |
(dec excessive; |
|
28192 | 153 |
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
28203 | 154 |
notify_all (); |
28167 | 155 |
NONE) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
156 |
else |
28186 | 157 |
(case change_result queue TaskQueue.dequeue of |
158 |
NONE => (worker_wait name; worker_next name) |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
159 |
| some => some); |
28156 | 160 |
|
28167 | 161 |
fun worker_loop name = |
28192 | 162 |
(case SYNCHRONIZED name (fn () => worker_next name) of |
28203 | 163 |
NONE => Multithreading.tracing 4 (fn () => name ^ ": exit") |
28167 | 164 |
| SOME work => (execute name work; worker_loop name)); |
28156 | 165 |
|
28167 | 166 |
fun worker_start name = (*requires SYNCHRONIZED*) |
28242 | 167 |
change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true)); |
28156 | 168 |
|
169 |
||
170 |
(* scheduler *) |
|
171 |
||
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
172 |
fun scheduler_next () = (*requires SYNCHRONIZED*) |
28156 | 173 |
let |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
174 |
(*worker threads*) |
28191 | 175 |
val _ = |
28192 | 176 |
(case List.partition (Thread.isActive o #1) (! workers) of |
28191 | 177 |
(_, []) => () |
178 |
| (active, inactive) => |
|
179 |
(workers := active; Multithreading.tracing 0 (fn () => |
|
28192 | 180 |
"SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); |
28191 | 181 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
182 |
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
28167 | 183 |
val l = length (! workers); |
184 |
val _ = excessive := l - m; |
|
28203 | 185 |
val _ = |
186 |
if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) () |
|
187 |
else (); |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
188 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
189 |
(*canceled groups*) |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
190 |
val _ = change canceled (filter_out (TaskQueue.cancel (! queue))); |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
191 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
192 |
(*shutdown*) |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
193 |
val continue = not (! do_shutdown andalso null (! workers)); |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
194 |
val _ = if continue then () else scheduler := NONE; |
28167 | 195 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
196 |
val _ = notify_all (); |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
197 |
val _ = wait_timeout "scheduler" (Time.fromSeconds 1); |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
198 |
in continue end; |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
199 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
200 |
fun scheduler_loop () = |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
201 |
(while SYNCHRONIZED "scheduler" scheduler_next do (); |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
202 |
Multithreading.tracing 4 (fn () => "scheduler: exit")); |
28156 | 203 |
|
28203 | 204 |
fun scheduler_active () = (*requires SYNCHRONIZED*) |
205 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); |
|
206 |
||
28192 | 207 |
fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () => |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
208 |
if not (scheduler_active ()) then |
28242 | 209 |
(do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop)) |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
210 |
else if ! do_shutdown then error "Scheduler shutdown in progress" |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
211 |
else ()); |
28156 | 212 |
|
213 |
||
28191 | 214 |
(* future values: fork independent computation *) |
28156 | 215 |
|
28304
4b0477452943
future tasks: support boolean priorities (true = high, false = low/irrelevant);
wenzelm
parents:
28276
diff
changeset
|
216 |
fun future opt_group deps pri (e: unit -> 'a) = |
28156 | 217 |
let |
28191 | 218 |
val _ = scheduler_check (); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
219 |
|
28191 | 220 |
val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ()); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
221 |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
222 |
val result = ref (NONE: 'a Exn.result option); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
223 |
val run = Multithreading.with_attributes (Thread.getAttributes ()) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
224 |
(fn _ => fn ok => |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
225 |
let val res = if ok then Exn.capture e () else Exn.Exn Interrupt |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
226 |
in result := SOME res; is_some (Exn.get_result res) end); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
227 |
|
28192 | 228 |
val task = SYNCHRONIZED "future" (fn () => |
28304
4b0477452943
future tasks: support boolean priorities (true = high, false = low/irrelevant);
wenzelm
parents:
28276
diff
changeset
|
229 |
change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ()); |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
230 |
in Future {task = task, group = group, result = result} end; |
28162 | 231 |
|
28304
4b0477452943
future tasks: support boolean priorities (true = high, false = low/irrelevant);
wenzelm
parents:
28276
diff
changeset
|
232 |
fun fork e = future (Option.map #2 (thread_data ())) [] true e; |
4b0477452943
future tasks: support boolean priorities (true = high, false = low/irrelevant);
wenzelm
parents:
28276
diff
changeset
|
233 |
fun fork_irrelevant e = future (Option.map #2 (thread_data ())) [] false e; |
28186 | 234 |
|
235 |
||
28191 | 236 |
(* join: retrieve results *) |
28186 | 237 |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
238 |
fun join_results [] = [] |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
239 |
| join_results xs = |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
240 |
let |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
241 |
val _ = scheduler_check (); |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
242 |
val _ = Multithreading.self_critical () andalso |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
243 |
error "Cannot join future values within critical section"; |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
244 |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
245 |
fun unfinished () = |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
246 |
xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); |
28186 | 247 |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
248 |
(*alien thread -- refrain from contending for resources*) |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
249 |
fun passive_join () = (*requires SYNCHRONIZED*) |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
250 |
(case unfinished () of [] => () |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
251 |
| _ => (wait "passive_join"; passive_join ())); |
28186 | 252 |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
253 |
(*proper worker thread -- actively work towards results*) |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
254 |
fun active_join () = (*requires SYNCHRONIZED*) |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
255 |
(case unfinished () of [] => () |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
256 |
| tasks => |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
257 |
(case change_result queue (TaskQueue.dequeue_towards tasks) of |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
258 |
NONE => (worker_wait "active_join"; active_join ()) |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
259 |
| SOME work => (execute "active_join" work; active_join ()))); |
28186 | 260 |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
261 |
val _ = |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
262 |
(case thread_data () of |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
263 |
NONE => SYNCHRONIZED "passive_join" passive_join |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
264 |
| SOME (task, _) => SYNCHRONIZED "active_join" (fn () => |
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
265 |
(change queue (TaskQueue.depend (unfinished ()) task); active_join ()))); |
28186 | 266 |
|
28331
33d58fdc177d
join_results: special case for empty list, works without multithreading;
wenzelm
parents:
28320
diff
changeset
|
267 |
in xs |> map (fn Future {result = ref (SOME res), ...} => res) end; |
28186 | 268 |
|
28193
7ed74d0ba607
replaced join_all by join_results, which returns Exn.results;
wenzelm
parents:
28192
diff
changeset
|
269 |
fun join x = Exn.release (singleton join_results x); |
28156 | 270 |
|
28191 | 271 |
|
28202
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
272 |
(* misc operations *) |
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
273 |
|
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
274 |
(*focus: collection of high-priority task*) |
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
275 |
fun focus tasks = SYNCHRONIZED "interrupt" (fn () => |
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
276 |
change queue (TaskQueue.focus tasks)); |
28191 | 277 |
|
28202
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
278 |
(*interrupt: permissive signal, may get ignored*) |
28197 | 279 |
fun interrupt_task id = SYNCHRONIZED "interrupt" |
280 |
(fn () => TaskQueue.interrupt_external (! queue) id); |
|
28191 | 281 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
282 |
(*cancel: present and future group members will be interrupted eventually*) |
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
283 |
fun cancel x = |
28208 | 284 |
(scheduler_check (); |
285 |
SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ()))); |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
286 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
287 |
|
28203 | 288 |
(*global join and shutdown*) |
289 |
fun shutdown () = |
|
28276 | 290 |
if Multithreading.available then |
291 |
(scheduler_check (); |
|
292 |
SYNCHRONIZED "shutdown" (fn () => |
|
293 |
(while not (scheduler_active ()) do wait "shutdown: scheduler inactive"; |
|
294 |
while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join"; |
|
295 |
do_shutdown := true; |
|
296 |
notify_all (); |
|
297 |
while not (null (! workers)) do wait "shutdown: workers"; |
|
298 |
while scheduler_active () do wait "shutdown: scheduler still active"))) |
|
299 |
else (); |
|
28203 | 300 |
|
28156 | 301 |
end; |