326 NONE => Exn.Exn (SYS_ERROR "unfinished future") |
326 NONE => Exn.Exn (SYS_ERROR "unfinished future") |
327 | SOME (Exn.Exn Exn.Interrupt) => |
327 | SOME (Exn.Exn Exn.Interrupt) => |
328 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) |
328 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) |
329 | SOME res => res); |
329 | SOME res => res); |
330 |
330 |
|
331 fun join_wait x = |
|
332 if SYNCHRONIZED "join_wait" (fn () => |
|
333 is_finished x orelse (wait work_finished; false)) |
|
334 then () else join_wait x; |
|
335 |
331 fun join_next deps = (*requires SYNCHRONIZED*) |
336 fun join_next deps = (*requires SYNCHRONIZED*) |
332 if overloaded () then (worker_wait scheduler_event; join_next deps) |
337 if null deps then NONE |
333 else change_result queue (Task_Queue.dequeue_towards deps); |
338 else if overloaded () then (worker_wait scheduler_event; join_next deps) |
334 |
339 else |
335 fun join_deps deps = |
340 (case change_result queue (Task_Queue.dequeue_towards deps) of |
|
341 (NONE, []) => NONE |
|
342 | (NONE, deps') => (worker_wait work_finished; join_next deps') |
|
343 | (SOME work, deps') => SOME (work, deps')); |
|
344 |
|
345 fun join_work deps = |
336 (case SYNCHRONIZED "join" (fn () => join_next deps) of |
346 (case SYNCHRONIZED "join" (fn () => join_next deps) of |
337 NONE => () |
347 NONE => () |
338 | SOME (work, deps') => (execute "join" work; join_deps deps')); |
348 | SOME (work, deps') => (execute "join" work; join_work deps')); |
339 |
349 |
340 in |
350 in |
341 |
351 |
342 fun join_results xs = |
352 fun join_results xs = |
343 if forall is_finished xs then map get_result xs |
353 if forall is_finished xs then map get_result xs |
344 else uninterruptible (fn _ => fn () => |
354 else uninterruptible (fn _ => fn () => |
345 let |
355 let |
346 val _ = scheduler_check "join check"; |
356 val _ = scheduler_check "join check"; |
347 val _ = Multithreading.self_critical () andalso |
357 val _ = Multithreading.self_critical () andalso |
348 error "Cannot join future values within critical section"; |
358 error "Cannot join future values within critical section"; |
349 |
359 val _ = |
350 val worker = is_worker (); |
360 if is_worker () then join_work (map task_of xs) |
351 val _ = if worker then join_deps (map task_of xs) else (); |
361 else List.app join_wait xs; |
352 |
|
353 fun join_wait x = |
|
354 if SYNCHRONIZED "join_wait" (fn () => |
|
355 is_finished x orelse ((if worker then worker_wait else wait) work_finished; false)) |
|
356 then () else join_wait x; |
|
357 |
|
358 val _ = xs |> List.app (fn x => |
|
359 let val time = Multithreading.real_time join_wait x in |
|
360 Multithreading.tracing_time true time |
|
361 (fn () => "joined after " ^ Time.toString time) |
|
362 end); |
|
363 in map get_result xs end) (); |
362 in map get_result xs end) (); |
364 |
363 |
365 end; |
364 end; |
366 |
365 |
367 fun join_result x = singleton join_results x; |
366 fun join_result x = singleton join_results x; |