author | wenzelm |
Sun, 20 Sep 2009 19:17:33 +0200 | |
changeset 32617 | bfbdeddc03bc |
parent 32616 | 8ef1aa1cfcc7 |
child 32644 | e4511a1b4c1b |
permissions | -rw-r--r-- |
28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
Author: Makarius |
|
3 |
||
32246 | 4 |
Future values, see also |
5 |
http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf |
|
28201 | 6 |
|
7 |
Notes: |
|
8 |
||
9 |
* Futures are similar to delayed evaluation, i.e. delay/force is |
|
10 |
generalized to fork/join (and variants). The idea is to model |
|
11 |
parallel value-oriented computations, but *not* communicating |
|
12 |
processes. |
|
13 |
||
14 |
* Futures are grouped; failure of one group member causes the whole |
|
32220 | 15 |
group to be interrupted eventually. Groups are block-structured. |
28201 | 16 |
|
17 |
* Forked futures are evaluated spontaneously by a farm of worker |
|
18 |
threads in the background; join resynchronizes the computation and |
|
19 |
delivers results (values or exceptions). |
|
20 |
||
21 |
* The pool of worker threads is limited, usually in correlation with |
|
22 |
the number of physical cores on the machine. Note that allocation |
|
23 |
of runtime resources is distorted either if workers yield CPU time |
|
24 |
(e.g. via system sleep or wait operations), or if non-worker |
|
25 |
threads contend for significant runtime resources independently. |
|
28156 | 26 |
*) |
27 |
||
28 |
signature FUTURE = |
|
29 |
sig |
|
29119 | 30 |
type task = Task_Queue.task |
31 |
type group = Task_Queue.group |
|
32058 | 32 |
val is_worker: unit -> bool |
32102 | 33 |
val worker_group: unit -> Task_Queue.group option |
28972 | 34 |
type 'a future |
35 |
val task_of: 'a future -> task |
|
36 |
val group_of: 'a future -> group |
|
37 |
val peek: 'a future -> 'a Exn.result option |
|
38 |
val is_finished: 'a future -> bool |
|
28997 | 39 |
val value: 'a -> 'a future |
28972 | 40 |
val fork: (unit -> 'a) -> 'a future |
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
41 |
val fork_group: group -> (unit -> 'a) -> 'a future |
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
42 |
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future |
29119 | 43 |
val fork_pri: int -> (unit -> 'a) -> 'a future |
28972 | 44 |
val join_results: 'a future list -> 'a Exn.result list |
45 |
val join_result: 'a future -> 'a Exn.result |
|
46 |
val join: 'a future -> 'a |
|
47 |
val map: ('a -> 'b) -> 'a future -> 'b future |
|
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
48 |
val interruptible_task: ('a -> 'b) -> 'a -> 'b |
29431 | 49 |
val cancel_group: group -> unit |
28972 | 50 |
val cancel: 'a future -> unit |
28203 | 51 |
val shutdown: unit -> unit |
28156 | 52 |
end; |
53 |
||
54 |
structure Future: FUTURE = |
|
55 |
struct |
|
56 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
57 |
(** future values **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
58 |
|
28167 | 59 |
(* identifiers *) |
60 |
||
29119 | 61 |
type task = Task_Queue.task; |
62 |
type group = Task_Queue.group; |
|
28167 | 63 |
|
32058 | 64 |
local |
65 |
val tag = Universal.tag () : (string * task * group) option Universal.tag; |
|
66 |
in |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
67 |
fun thread_data () = the_default NONE (Thread.getLocal tag); |
32058 | 68 |
fun setmp_thread_data data f x = |
69 |
Library.setmp_thread_data tag (thread_data ()) (SOME data) f x; |
|
28167 | 70 |
end; |
71 |
||
32058 | 72 |
val is_worker = is_some o thread_data; |
32102 | 73 |
val worker_group = Option.map #3 o thread_data; |
32058 | 74 |
|
28167 | 75 |
|
76 |
(* datatype future *) |
|
77 |
||
28972 | 78 |
datatype 'a future = Future of |
28167 | 79 |
{task: task, |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
80 |
group: group, |
32253 | 81 |
result: 'a Exn.result option Synchronized.var}; |
28167 | 82 |
|
83 |
fun task_of (Future {task, ...}) = task; |
|
84 |
fun group_of (Future {group, ...}) = group; |
|
32253 | 85 |
fun result_of (Future {result, ...}) = result; |
28167 | 86 |
|
32592
e29c0b7dcf66
Synchronized.value does not require locking, since assigments are atomic;
wenzelm
parents:
32420
diff
changeset
|
87 |
fun peek x = Synchronized.value (result_of x); |
28558 | 88 |
fun is_finished x = is_some (peek x); |
28320 | 89 |
|
28997 | 90 |
fun value x = Future |
29119 | 91 |
{task = Task_Queue.new_task 0, |
32102 | 92 |
group = Task_Queue.new_group NONE, |
32253 | 93 |
result = Synchronized.var "future" (SOME (Exn.Result x))}; |
28997 | 94 |
|
28167 | 95 |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
96 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
97 |
(** scheduling **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
98 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
99 |
(* global state *) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
100 |
|
29119 | 101 |
val queue = ref Task_Queue.empty; |
28468 | 102 |
val next = ref 0; |
28192 | 103 |
val workers = ref ([]: (Thread.thread * bool) list); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
104 |
val scheduler = ref (NONE: Thread.thread option); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
105 |
val excessive = ref 0; |
29119 | 106 |
val canceled = ref ([]: Task_Queue.group list); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
107 |
val do_shutdown = ref false; |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
108 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
109 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
110 |
(* synchronization *) |
28156 | 111 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
112 |
val scheduler_event = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
113 |
val work_available = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
114 |
val work_finished = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
115 |
|
28156 | 116 |
local |
117 |
val lock = Mutex.mutex (); |
|
118 |
in |
|
119 |
||
28575 | 120 |
fun SYNCHRONIZED name = SimpleThread.synchronized name lock; |
28156 | 121 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
122 |
fun wait cond = (*requires SYNCHRONIZED*) |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
123 |
Multithreading.sync_wait NONE NONE cond lock; |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
124 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
125 |
fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
126 |
Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock; |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
127 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
128 |
fun signal cond = (*requires SYNCHRONIZED*) |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
129 |
ConditionVar.signal cond; |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
130 |
|
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
131 |
fun broadcast cond = (*requires SYNCHRONIZED*) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
132 |
ConditionVar.broadcast cond; |
28156 | 133 |
|
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
134 |
fun broadcast_work () = (*requires SYNCHRONIZED*) |
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
135 |
(ConditionVar.broadcast work_available; |
32225
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
136 |
ConditionVar.broadcast work_finished); |
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
137 |
|
28156 | 138 |
end; |
139 |
||
140 |
||
32099 | 141 |
(* execute future jobs *) |
142 |
||
143 |
fun future_job group (e: unit -> 'a) = |
|
144 |
let |
|
32253 | 145 |
val result = Synchronized.var "future" (NONE: 'a Exn.result option); |
32107
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
146 |
fun job ok = |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
147 |
let |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
148 |
val res = |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
149 |
if ok then |
32230
9f6461b1c9cc
interruptible: Thread.testInterrupt before changing thread attributes;
wenzelm
parents:
32229
diff
changeset
|
150 |
Exn.capture (fn () => |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
151 |
Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) () |
32107
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
152 |
else Exn.Exn Exn.Interrupt; |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
153 |
val _ = Synchronized.change result |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
154 |
(fn NONE => SOME res |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
155 |
| SOME _ => raise Fail "Duplicate assignment of future value"); |
32107
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
156 |
in |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
157 |
(case res of |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
158 |
Exn.Exn exn => (Task_Queue.cancel_group group exn; false) |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
159 |
| Exn.Result _ => true) |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
160 |
end; |
32099 | 161 |
in (result, job) end; |
28156 | 162 |
|
29341
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
163 |
fun do_cancel group = (*requires SYNCHRONIZED*) |
32225
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
164 |
(change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event); |
29341
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
165 |
|
29366 | 166 |
fun execute name (task, group, jobs) = |
28167 | 167 |
let |
32102 | 168 |
val valid = not (Task_Queue.is_canceled group); |
32058 | 169 |
val ok = setmp_thread_data (name, task, group) (fn () => |
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
170 |
fold (fn job => fn ok => job valid andalso ok) jobs true) (); |
32246 | 171 |
val _ = SYNCHRONIZED "finish" (fn () => |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
172 |
let |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
173 |
val maximal = change_result queue (Task_Queue.finish task); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
174 |
val _ = |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
175 |
if ok then () |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
176 |
else if Task_Queue.cancel (! queue) group then () |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
177 |
else do_cancel group; |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
178 |
val _ = broadcast work_finished; |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
179 |
val _ = if maximal then () else broadcast work_available; |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
180 |
in () end); |
28167 | 181 |
in () end; |
182 |
||
183 |
||
32246 | 184 |
(* worker activity *) |
185 |
||
186 |
fun count_active () = (*requires SYNCHRONIZED*) |
|
187 |
fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0; |
|
188 |
||
189 |
fun change_active active = (*requires SYNCHRONIZED*) |
|
190 |
change workers (AList.update Thread.equal (Thread.self (), active)); |
|
191 |
||
192 |
||
28167 | 193 |
(* worker threads *) |
194 |
||
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
195 |
fun worker_wait cond = (*requires SYNCHRONIZED*) |
32286
1fb5db48002d
added Multithreading.sync_wait, which turns enabled interrupts to sync ones, to ensure that wait will reaquire its lock when interrupted;
wenzelm
parents:
32255
diff
changeset
|
196 |
(change_active false; wait cond; change_active true); |
28162 | 197 |
|
29119 | 198 |
fun worker_next () = (*requires SYNCHRONIZED*) |
28167 | 199 |
if ! excessive > 0 then |
200 |
(dec excessive; |
|
28192 | 201 |
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
202 |
broadcast scheduler_event; |
28167 | 203 |
NONE) |
32246 | 204 |
else if count_active () > Multithreading.max_threads_value () then |
205 |
(worker_wait scheduler_event; worker_next ()) |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
206 |
else |
32249 | 207 |
(case change_result queue (Task_Queue.dequeue (Thread.self ())) of |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
208 |
NONE => (worker_wait work_available; worker_next ()) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
209 |
| some => some); |
28156 | 210 |
|
28167 | 211 |
fun worker_loop name = |
32107
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
212 |
(case SYNCHRONIZED name (fn () => worker_next ()) of |
29119 | 213 |
NONE => () |
28167 | 214 |
| SOME work => (execute name work; worker_loop name)); |
28156 | 215 |
|
28167 | 216 |
fun worker_start name = (*requires SYNCHRONIZED*) |
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
217 |
change workers (cons (SimpleThread.fork false (fn () => |
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
218 |
(broadcast scheduler_event; worker_loop name)), true)); |
28156 | 219 |
|
220 |
||
221 |
(* scheduler *) |
|
222 |
||
32226 | 223 |
val last_status = ref Time.zeroTime; |
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
224 |
val next_status = Time.fromMilliseconds 500; |
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
225 |
val next_round = Time.fromMilliseconds 50; |
32226 | 226 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
227 |
fun scheduler_next () = (*requires SYNCHRONIZED*) |
28156 | 228 |
let |
32226 | 229 |
(*queue and worker status*) |
230 |
val _ = |
|
231 |
let val now = Time.now () in |
|
232 |
if Time.> (Time.+ (! last_status, next_status), now) then () |
|
233 |
else |
|
234 |
(last_status := now; Multithreading.tracing 1 (fn () => |
|
235 |
let |
|
236 |
val {ready, pending, running} = Task_Queue.status (! queue); |
|
237 |
val total = length (! workers); |
|
32246 | 238 |
val active = count_active (); |
32226 | 239 |
in |
32617 | 240 |
"SCHEDULE " ^ Time.toString now ^ ": " ^ |
32226 | 241 |
string_of_int ready ^ " ready, " ^ |
242 |
string_of_int pending ^ " pending, " ^ |
|
243 |
string_of_int running ^ " running; " ^ |
|
244 |
string_of_int total ^ " workers, " ^ |
|
245 |
string_of_int active ^ " active" |
|
246 |
end)) |
|
247 |
end; |
|
32053 | 248 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
249 |
(*worker threads*) |
28191 | 250 |
val _ = |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
251 |
if forall (Thread.isActive o #1) (! workers) then () |
32095 | 252 |
else |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
253 |
(case List.partition (Thread.isActive o #1) (! workers) of |
32095 | 254 |
(_, []) => () |
32220 | 255 |
| (alive, dead) => |
256 |
(workers := alive; Multithreading.tracing 0 (fn () => |
|
257 |
"SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads"))); |
|
28191 | 258 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
259 |
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
32616 | 260 |
val mm = if m = 9999 then 1 else (m * 3) div 2; |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
261 |
val l = length (! workers); |
32095 | 262 |
val _ = excessive := l - mm; |
28203 | 263 |
val _ = |
32095 | 264 |
if mm > l then |
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
265 |
funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) () |
28203 | 266 |
else (); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
267 |
|
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
268 |
(*canceled groups*) |
32225
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
269 |
val _ = |
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
270 |
if null (! canceled) then () |
32293 | 271 |
else |
272 |
(Multithreading.tracing 1 (fn () => |
|
273 |
string_of_int (length (! canceled)) ^ " canceled groups"); |
|
274 |
change canceled (filter_out (Task_Queue.cancel (! queue))); |
|
275 |
broadcast_work ()); |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
276 |
|
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
277 |
(*delay loop*) |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
278 |
val _ = Exn.release (wait_timeout next_round scheduler_event); |
28167 | 279 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
280 |
(*shutdown*) |
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
281 |
val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else (); |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
282 |
val continue = not (! do_shutdown andalso null (! workers)); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
283 |
val _ = if continue then () else scheduler := NONE; |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
284 |
val _ = broadcast scheduler_event; |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
285 |
in continue end |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
286 |
handle Exn.Interrupt => |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
287 |
(Multithreading.tracing 1 (fn () => "Interrupt"); |
32296 | 288 |
uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) (); |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
289 |
scheduler_next ()); |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
290 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
291 |
fun scheduler_loop () = |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
292 |
Multithreading.with_attributes |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
293 |
(Multithreading.sync_interrupts Multithreading.public_interrupts) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
294 |
(fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ()); |
28156 | 295 |
|
28203 | 296 |
fun scheduler_active () = (*requires SYNCHRONIZED*) |
297 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); |
|
298 |
||
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
299 |
fun scheduler_check () = (*requires SYNCHRONIZED*) |
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
300 |
(do_shutdown := false; |
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
301 |
if scheduler_active () then () |
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
302 |
else scheduler := SOME (SimpleThread.fork false scheduler_loop)); |
28156 | 303 |
|
304 |
||
29366 | 305 |
|
306 |
(** futures **) |
|
28156 | 307 |
|
29366 | 308 |
(* fork *) |
309 |
||
310 |
fun fork_future opt_group deps pri e = |
|
311 |
let |
|
32102 | 312 |
val group = |
313 |
(case opt_group of |
|
314 |
SOME group => group |
|
315 |
| NONE => Task_Queue.new_group (worker_group ())); |
|
29366 | 316 |
val (result, job) = future_job group e; |
32246 | 317 |
val task = SYNCHRONIZED "enqueue" (fn () => |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
318 |
let |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
319 |
val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
320 |
val _ = if minimal then signal work_available else (); |
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
321 |
val _ = scheduler_check (); |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
322 |
in task end); |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
323 |
in Future {task = task, group = group, result = result} end; |
28162 | 324 |
|
29366 | 325 |
fun fork e = fork_future NONE [] 0 e; |
326 |
fun fork_group group e = fork_future (SOME group) [] 0 e; |
|
327 |
fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e; |
|
328 |
fun fork_pri pri e = fork_future NONE [] pri e; |
|
28186 | 329 |
|
330 |
||
29366 | 331 |
(* join *) |
332 |
||
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
333 |
local |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
334 |
|
32099 | 335 |
fun get_result x = |
336 |
(case peek x of |
|
32102 | 337 |
NONE => Exn.Exn (SYS_ERROR "unfinished future") |
338 |
| SOME (Exn.Exn Exn.Interrupt) => |
|
339 |
Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) |
|
340 |
| SOME res => res); |
|
28186 | 341 |
|
32224 | 342 |
fun join_wait x = |
32253 | 343 |
Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some)); |
32224 | 344 |
|
32095 | 345 |
fun join_next deps = (*requires SYNCHRONIZED*) |
32224 | 346 |
if null deps then NONE |
347 |
else |
|
32249 | 348 |
(case change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of |
32224 | 349 |
(NONE, []) => NONE |
350 |
| (NONE, deps') => (worker_wait work_finished; join_next deps') |
|
351 |
| (SOME work, deps') => SOME (work, deps')); |
|
32095 | 352 |
|
32224 | 353 |
fun join_work deps = |
32095 | 354 |
(case SYNCHRONIZED "join" (fn () => join_next deps) of |
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
355 |
NONE => () |
32224 | 356 |
| SOME (work, deps') => (execute "join" work; join_work deps')); |
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
357 |
|
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
358 |
in |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
359 |
|
29366 | 360 |
fun join_results xs = |
361 |
if forall is_finished xs then map get_result xs |
|
32246 | 362 |
else if Multithreading.self_critical () then |
363 |
error "Cannot join future values within critical section" |
|
29366 | 364 |
else uninterruptible (fn _ => fn () => |
32246 | 365 |
(if is_worker () |
366 |
then join_work (map task_of xs) |
|
367 |
else List.app join_wait xs; |
|
368 |
map get_result xs)) (); |
|
28186 | 369 |
|
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
370 |
end; |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
371 |
|
28647
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
372 |
fun join_result x = singleton join_results x; |
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
373 |
fun join x = Exn.release (join_result x); |
28156 | 374 |
|
29366 | 375 |
|
376 |
(* map *) |
|
377 |
||
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
378 |
fun map_future f x = |
29366 | 379 |
let |
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
380 |
val task = task_of x; |
32102 | 381 |
val group = Task_Queue.new_group (SOME (group_of x)); |
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
382 |
val (result, job) = future_job group (fn () => f (join x)); |
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
383 |
|
32246 | 384 |
val extended = SYNCHRONIZED "extend" (fn () => |
29366 | 385 |
(case Task_Queue.extend task job (! queue) of |
386 |
SOME queue' => (queue := queue'; true) |
|
387 |
| NONE => false)); |
|
388 |
in |
|
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
389 |
if extended then Future {task = task, group = group, result = result} |
32099 | 390 |
else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) |
29366 | 391 |
end; |
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
392 |
|
28191 | 393 |
|
29431 | 394 |
(* cancellation *) |
28202
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
395 |
|
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
396 |
fun interruptible_task f x = |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
397 |
if Multithreading.available then |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
398 |
Multithreading.with_attributes |
32058 | 399 |
(if is_worker () |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
400 |
then Multithreading.private_interrupts |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
401 |
else Multithreading.public_interrupts) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
402 |
(fn _ => f x) |
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
403 |
else interruptible f x; |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
404 |
|
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
405 |
(*cancel: present and future group members will be interrupted eventually*) |
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
406 |
fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group); |
29431 | 407 |
fun cancel x = cancel_group (group_of x); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
408 |
|
29366 | 409 |
|
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
410 |
(* shutdown *) |
29366 | 411 |
|
28203 | 412 |
fun shutdown () = |
28276 | 413 |
if Multithreading.available then |
414 |
SYNCHRONIZED "shutdown" (fn () => |
|
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
415 |
while scheduler_active () do |
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
416 |
(wait scheduler_event; broadcast_work ())) |
28276 | 417 |
else (); |
28203 | 418 |
|
29366 | 419 |
|
420 |
(*final declarations of this structure!*) |
|
421 |
val map = map_future; |
|
422 |
||
28156 | 423 |
end; |
28972 | 424 |
|
425 |
type 'a future = 'a Future.future; |
|
426 |