283 |
283 |
284 local |
284 local |
285 |
285 |
286 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); |
286 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); |
287 |
287 |
288 fun join_wait x = |
288 fun join_deps deps = |
289 if SYNCHRONIZED "join_wait" (fn () => is_finished x orelse (wait (); false)) |
289 (case SYNCHRONIZED "join" (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of |
290 then () else join_wait x; |
|
291 |
|
292 fun join_next x = (*requires SYNCHRONIZED*) |
|
293 if is_finished x then NONE |
|
294 else |
|
295 (case change_result queue Task_Queue.dequeue of |
|
296 NONE => (worker_wait (); join_next x) |
|
297 | some => some); |
|
298 |
|
299 fun join_loop x = |
|
300 (case SYNCHRONIZED "join" (fn () => join_next x) of |
|
301 NONE => () |
290 NONE => () |
302 | SOME work => (execute "join" work; join_loop x)); |
291 | SOME (work, deps') => (execute "join" work; join_deps deps')); |
303 |
292 |
304 in |
293 in |
305 |
294 |
306 fun join_results xs = |
295 fun join_results xs = |
307 if forall is_finished xs then map get_result xs |
296 if forall is_finished xs then map get_result xs |
308 else uninterruptible (fn _ => fn () => |
297 else uninterruptible (fn _ => fn () => |
309 let |
298 let |
310 val _ = scheduler_check "join check"; |
299 val _ = scheduler_check "join check"; |
311 val _ = Multithreading.self_critical () andalso |
300 val _ = Multithreading.self_critical () andalso |
312 error "Cannot join future values within critical section"; |
301 error "Cannot join future values within critical section"; |
313 val _ = |
302 |
314 if is_some (thread_data ()) |
303 val is_worker = is_some (thread_data ()); |
315 then List.app join_loop xs (*proper task -- continue work*) |
304 fun join_wait x = |
316 else List.app join_wait xs; (*alien thread -- refrain from contending for resources*) |
305 if SYNCHRONIZED "join_wait" (fn () => |
|
306 is_finished x orelse (if is_worker then worker_wait () else wait (); false)) |
|
307 then () else join_wait x; |
|
308 |
|
309 val _ = if is_worker then join_deps (map task_of xs) else (); |
|
310 val _ = List.app join_wait xs; |
|
311 |
317 in map get_result xs end) (); |
312 in map get_result xs end) (); |
318 |
313 |
319 end; |
314 end; |
320 |
315 |
321 fun join_result x = singleton join_results x; |
316 fun join_result x = singleton join_results x; |