equal
deleted
inserted
replaced
1 (* Title: Pure/Concurrent/future.ML |
1 (* Title: Pure/Concurrent/future.ML |
2 Author: Makarius |
2 Author: Makarius |
3 |
3 |
4 Future values. |
4 Future values, see also |
|
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf |
5 |
6 |
6 Notes: |
7 Notes: |
7 |
8 |
8 * Futures are similar to delayed evaluation, i.e. delay/force is |
9 * Futures are similar to delayed evaluation, i.e. delay/force is |
9 generalized to fork/join (and variants). The idea is to model |
10 generalized to fork/join (and variants). The idea is to model |
142 ConditionVar.broadcast work_finished); |
143 ConditionVar.broadcast work_finished); |
143 |
144 |
144 end; |
145 end; |
145 |
146 |
146 |
147 |
147 (* worker activity *) |
|
148 |
|
149 fun count_active ws = |
|
150 fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0; |
|
151 |
|
152 fun change_active active = (*requires SYNCHRONIZED*) |
|
153 change workers (AList.update Thread.equal (Thread.self (), active)); |
|
154 |
|
155 fun overloaded () = |
|
156 count_active (! workers) > Multithreading.max_threads_value (); |
|
157 |
|
158 |
|
159 (* execute future jobs *) |
148 (* execute future jobs *) |
160 |
149 |
161 fun future_job group (e: unit -> 'a) = |
150 fun future_job group (e: unit -> 'a) = |
162 let |
151 let |
163 val result = ref (NONE: 'a Exn.result option); |
152 val result = ref (NONE: 'a Exn.result option); |
184 fun execute name (task, group, jobs) = |
173 fun execute name (task, group, jobs) = |
185 let |
174 let |
186 val valid = not (Task_Queue.is_canceled group); |
175 val valid = not (Task_Queue.is_canceled group); |
187 val ok = setmp_thread_data (name, task, group) (fn () => |
176 val ok = setmp_thread_data (name, task, group) (fn () => |
188 fold (fn job => fn ok => job valid andalso ok) jobs true) (); |
177 fold (fn job => fn ok => job valid andalso ok) jobs true) (); |
189 val _ = SYNCHRONIZED "execute" (fn () => |
178 val _ = SYNCHRONIZED "finish" (fn () => |
190 let |
179 let |
191 val maximal = change_result queue (Task_Queue.finish task); |
180 val maximal = change_result queue (Task_Queue.finish task); |
192 val _ = |
181 val _ = |
193 if ok then () |
182 if ok then () |
194 else if Task_Queue.cancel (! queue) group then () |
183 else if Task_Queue.cancel (! queue) group then () |
197 val _ = if maximal then () else broadcast work_available; |
186 val _ = if maximal then () else broadcast work_available; |
198 in () end); |
187 in () end); |
199 in () end; |
188 in () end; |
200 |
189 |
201 |
190 |
|
191 (* worker activity *) |
|
192 |
|
193 fun count_active () = (*requires SYNCHRONIZED*) |
|
194 fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0; |
|
195 |
|
196 fun change_active active = (*requires SYNCHRONIZED*) |
|
197 change workers (AList.update Thread.equal (Thread.self (), active)); |
|
198 |
|
199 |
202 (* worker threads *) |
200 (* worker threads *) |
203 |
201 |
204 fun worker_wait cond = (*requires SYNCHRONIZED*) |
202 fun worker_wait cond = (*requires SYNCHRONIZED*) |
205 (change_active false; broadcast scheduler_event; |
203 (change_active false; broadcast scheduler_event; |
206 wait cond; |
204 wait cond; |
210 if ! excessive > 0 then |
208 if ! excessive > 0 then |
211 (dec excessive; |
209 (dec excessive; |
212 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
210 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
213 broadcast scheduler_event; |
211 broadcast scheduler_event; |
214 NONE) |
212 NONE) |
215 else if overloaded () then (worker_wait scheduler_event; worker_next ()) |
213 else if count_active () > Multithreading.max_threads_value () then |
|
214 (worker_wait scheduler_event; worker_next ()) |
216 else |
215 else |
217 (case change_result queue Task_Queue.dequeue of |
216 (case change_result queue Task_Queue.dequeue of |
218 NONE => (worker_wait work_available; worker_next ()) |
217 NONE => (worker_wait work_available; worker_next ()) |
219 | some => some); |
218 | some => some); |
220 |
219 |
241 else |
240 else |
242 (last_status := now; Multithreading.tracing 1 (fn () => |
241 (last_status := now; Multithreading.tracing 1 (fn () => |
243 let |
242 let |
244 val {ready, pending, running} = Task_Queue.status (! queue); |
243 val {ready, pending, running} = Task_Queue.status (! queue); |
245 val total = length (! workers); |
244 val total = length (! workers); |
246 val active = count_active (! workers); |
245 val active = count_active (); |
247 in |
246 in |
248 "SCHEDULE: " ^ |
247 "SCHEDULE: " ^ |
249 string_of_int ready ^ " ready, " ^ |
248 string_of_int ready ^ " ready, " ^ |
250 string_of_int pending ^ " pending, " ^ |
249 string_of_int pending ^ " pending, " ^ |
251 string_of_int running ^ " running; " ^ |
250 string_of_int running ^ " running; " ^ |
317 val group = |
316 val group = |
318 (case opt_group of |
317 (case opt_group of |
319 SOME group => group |
318 SOME group => group |
320 | NONE => Task_Queue.new_group (worker_group ())); |
319 | NONE => Task_Queue.new_group (worker_group ())); |
321 val (result, job) = future_job group e; |
320 val (result, job) = future_job group e; |
322 val task = SYNCHRONIZED "future" (fn () => |
321 val task = SYNCHRONIZED "enqueue" (fn () => |
323 let |
322 let |
324 val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job); |
323 val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job); |
325 val _ = if minimal then signal work_available else (); |
324 val _ = if minimal then signal work_available else (); |
326 val _ = scheduler_check (); |
325 val _ = scheduler_check (); |
327 in task end); |
326 in task end); |
364 |
363 |
365 in |
364 in |
366 |
365 |
367 fun join_results xs = |
366 fun join_results xs = |
368 if forall is_finished xs then map get_result xs |
367 if forall is_finished xs then map get_result xs |
|
368 else if Multithreading.self_critical () then |
|
369 error "Cannot join future values within critical section" |
369 else uninterruptible (fn _ => fn () => |
370 else uninterruptible (fn _ => fn () => |
370 let |
371 (if is_worker () |
371 val _ = Multithreading.self_critical () andalso |
372 then join_work (map task_of xs) |
372 error "Cannot join future values within critical section"; |
373 else List.app join_wait xs; |
373 val _ = |
374 map get_result xs)) (); |
374 if is_worker () then join_work (map task_of xs) |
|
375 else List.app join_wait xs; |
|
376 in map get_result xs end) (); |
|
377 |
375 |
378 end; |
376 end; |
379 |
377 |
380 fun join_result x = singleton join_results x; |
378 fun join_result x = singleton join_results x; |
381 fun join x = Exn.release (join_result x); |
379 fun join x = Exn.release (join_result x); |
387 let |
385 let |
388 val task = task_of x; |
386 val task = task_of x; |
389 val group = Task_Queue.new_group (SOME (group_of x)); |
387 val group = Task_Queue.new_group (SOME (group_of x)); |
390 val (result, job) = future_job group (fn () => f (join x)); |
388 val (result, job) = future_job group (fn () => f (join x)); |
391 |
389 |
392 val extended = SYNCHRONIZED "map_future" (fn () => |
390 val extended = SYNCHRONIZED "extend" (fn () => |
393 (case Task_Queue.extend task job (! queue) of |
391 (case Task_Queue.extend task job (! queue) of |
394 SOME queue' => (queue := queue'; true) |
392 SOME queue' => (queue := queue'; true) |
395 | NONE => false)); |
393 | NONE => false)); |
396 in |
394 in |
397 if extended then Future {task = task, group = group, result = result} |
395 if extended then Future {task = task, group = group, result = result} |