268 fun fork_pri pri e = fork_future NONE [] pri e; |
268 fun fork_pri pri e = fork_future NONE [] pri e; |
269 |
269 |
270 |
270 |
271 (* join *) |
271 (* join *) |
272 |
272 |
|
273 local |
|
274 |
273 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); |
275 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); |
|
276 |
|
277 fun join_next pending = (*requires SYNCHRONIZED*) |
|
278 if forall is_finished pending then NONE |
|
279 else |
|
280 (case change_result queue Task_Queue.dequeue of |
|
281 NONE => (worker_wait (); join_next pending) |
|
282 | some => some); |
|
283 |
|
284 fun join_loop name pending = |
|
285 (case SYNCHRONIZED name (fn () => join_next pending) of |
|
286 NONE => () |
|
287 | SOME work => (execute name work; join_loop name pending)); |
|
288 |
|
289 in |
274 |
290 |
275 fun join_results xs = |
291 fun join_results xs = |
276 if forall is_finished xs then map get_result xs |
292 if forall is_finished xs then map get_result xs |
277 else uninterruptible (fn _ => fn () => |
293 else uninterruptible (fn _ => fn () => |
278 let |
294 let |
279 val _ = scheduler_check "join check"; |
295 val _ = scheduler_check "join check"; |
280 val _ = Multithreading.self_critical () andalso |
296 val _ = Multithreading.self_critical () andalso |
281 error "Cannot join future values within critical section"; |
297 error "Cannot join future values within critical section"; |
282 |
298 |
283 fun join_loop _ [] = () |
299 fun join_deps _ [] = () |
284 | join_loop name deps = |
300 | join_deps name deps = |
285 (case SYNCHRONIZED name (fn () => |
301 (case SYNCHRONIZED name (fn () => |
286 change_result queue (Task_Queue.dequeue_towards deps)) of |
302 change_result queue (Task_Queue.dequeue_towards deps)) of |
287 NONE => () |
303 NONE => () |
288 | SOME (work, deps') => (execute name work; join_loop name deps')); |
304 | SOME (work, deps') => (execute name work; join_deps name deps')); |
|
305 |
289 val _ = |
306 val _ = |
290 (case thread_data () of |
307 (case thread_data () of |
291 NONE => |
308 NONE => |
292 (*alien thread -- refrain from contending for resources*) |
309 (*alien thread -- refrain from contending for resources*) |
293 while not (forall is_finished xs) |
310 while not (forall is_finished xs) |
297 let |
314 let |
298 val pending = filter_out is_finished xs; |
315 val pending = filter_out is_finished xs; |
299 val deps = map task_of pending; |
316 val deps = map task_of pending; |
300 val _ = SYNCHRONIZED "join" (fn () => |
317 val _ = SYNCHRONIZED "join" (fn () => |
301 (change queue (Task_Queue.depend deps task); notify_all ())); |
318 (change queue (Task_Queue.depend deps task); notify_all ())); |
302 val _ = join_loop ("join_loop: " ^ name) deps; |
319 val _ = join_deps ("join_deps: " ^ name) deps; |
303 val _ = |
320 val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending); |
304 while not (forall is_finished pending) |
|
305 do SYNCHRONIZED "join_task" (fn () => worker_wait ()); |
|
306 in () end); |
321 in () end); |
307 |
322 |
308 in map get_result xs end) (); |
323 in map get_result xs end) (); |
|
324 |
|
325 end; |
309 |
326 |
310 fun join_result x = singleton join_results x; |
327 fun join_result x = singleton join_results x; |
311 fun join x = Exn.release (join_result x); |
328 fun join x = Exn.release (join_result x); |
312 |
329 |
313 |
330 |