# HG changeset patch # User wenzelm # Date 1222532286 -7200 # Node ID a97fa342540d341ba5c1370d62c2fae2491fb970 # Parent 0b8237df37bd09b3732c7a34cb106e9e67fdad65 added release_results (formerly in par_list.ML); more informative trace_active; join_results: avoid deadlock via nested SYNCHRONOIZED, more robust active join, less rechecking of tasks; diff -r 0b8237df37bd -r a97fa342540d src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Sat Sep 27 18:18:05 2008 +0200 +++ b/src/Pure/Concurrent/future.ML Sat Sep 27 18:18:06 2008 +0200 @@ -38,6 +38,7 @@ val fork_irrelevant: (unit -> 'a) -> 'a T val join_results: 'a T list -> 'a Exn.result list val join: 'a T -> 'a + val release_results: 'a Exn.result list -> 'a list val focus: task list -> unit val interrupt_task: string -> unit val cancel: 'a T -> unit @@ -115,10 +116,24 @@ end; +(* worker activity *) + +fun trace_active () = + let + val ws = ! workers; + val m = string_of_int (length ws); + val n = string_of_int (length (filter #2 ws)); + in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; + +fun change_active active = (*requires SYNCHRONIZED*) + change workers (AList.update Thread.equal (Thread.self (), active)); + + (* execute *) fun execute name (task, group, run) = let + val _ = trace_active (); val _ = set_thread_data (SOME (task, group)); val _ = Multithreading.tracing 3 (fn () => name ^ ": running"); val ok = run (); @@ -135,15 +150,6 @@ (* worker threads *) -fun change_active active = (*requires SYNCHRONIZED*) - let - val _ = change workers (AList.update Thread.equal (Thread.self (), active)); - val ws = ! workers; - val m = string_of_int (length ws); - val n = string_of_int (length (filter #2 ws)); - in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; - - fun worker_wait name = (*requires SYNCHRONIZED*) (change_active false; wait name; change_active true); @@ -178,6 +184,7 @@ | (active, inactive) => (workers := active; Multithreading.tracing 0 (fn () => "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); + val _ = trace_active (); val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); val l = length (! workers); @@ -242,33 +249,48 @@ val _ = Multithreading.self_critical () andalso error "Cannot join future values within critical section"; - fun unfinished () = - xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); - - (*alien thread -- refrain from contending for resources*) - fun passive_join () = (*requires SYNCHRONIZED*) - (case unfinished () of [] => () - | _ => (wait "passive_join"; passive_join ())); - - (*proper worker thread -- actively work towards results*) - fun active_join () = (*requires SYNCHRONIZED*) - (case unfinished () of [] => () - | tasks => - (case change_result queue (TaskQueue.dequeue_towards tasks) of - NONE => (worker_wait "active_join"; active_join ()) - | SOME work => (execute "active_join" work; active_join ()))); - + fun join_loop [] = () + | join_loop tasks = + (case SYNCHRONIZED "join_loop" (fn () => + change_result queue (TaskQueue.dequeue_towards tasks)) of + NONE => () + | SOME (work, tasks') => (execute "join_loop" work; join_loop tasks')); val _ = (case thread_data () of - NONE => SYNCHRONIZED "passive_join" passive_join - | SOME (task, _) => SYNCHRONIZED "active_join" (fn () => - (change queue (TaskQueue.depend (unfinished ()) task); active_join ()))); + NONE => + (*alien thread -- refrain from contending for resources*) + while exists (not o is_finished) xs + do SYNCHRONIZED "join_thread" (fn () => wait "join_thread") + | SOME (task, _) => + (*proper task -- actively work towards results*) + let + val unfinished = xs |> map_filter + (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); + val _ = SYNCHRONIZED "join" (fn () => + (change queue (TaskQueue.depend unfinished task); notify_all ())); + val _ = join_loop unfinished; + val _ = + while exists (not o is_finished) xs + do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task"); + in () end); in xs |> map (fn Future {result = ref (SOME res), ...} => res) end; fun join x = Exn.release (singleton join_results x); +(* release results *) + +fun proper_exn (Exn.Result _) = NONE + | proper_exn (Exn.Exn Interrupt) = NONE + | proper_exn (Exn.Exn exn) = SOME exn; + +fun release_results results = + (case get_first proper_exn results of + SOME exn => raise exn + | NONE => List.map Exn.release results); + + (* misc operations *) (*focus: collection of high-priority task*)