# HG changeset patch # User wenzelm # Date 1231246008 -3600 # Node ID 1ffc8cbf39ecc33ea44a7b56d302975427939ef7 # Parent 5c5bc17d9135dd2e908b63315b1443188f48eef0 tuned map: reduced overhead due to bulk jobs; tuned join_results: reduced overhead for finished futures; tuned; diff -r 5c5bc17d9135 -r 1ffc8cbf39ec src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jan 06 13:43:17 2009 +0100 +++ b/src/Pure/Concurrent/future.ML Tue Jan 06 13:46:48 2009 +0100 @@ -137,15 +137,16 @@ change workers (AList.update Thread.equal (Thread.self (), active)); -(* execute *) +(* execute jobs *) fun do_cancel group = (*requires SYNCHRONIZED*) change canceled (insert Task_Queue.eq_group group); -fun execute name (task, group, run) = +fun execute name (task, group, jobs) = let val _ = trace_active (); - val ok = setmp_thread_data (name, task) run (); + val ok = setmp_thread_data (name, task) (fn () => + fold (fn job => fn ok => job ok) jobs (Task_Queue.is_valid group)) (); val _ = SYNCHRONIZED "execute" (fn () => (change queue (Task_Queue.finish task); if ok then () @@ -225,16 +226,15 @@ else ()); -(* future values: fork independent computation *) + +(** futures **) -fun future opt_group deps pri (e: unit -> 'a) = +(* future job: fill result *) + +fun future_job group (e: unit -> 'a) = let - val _ = scheduler_check "future check"; - - val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); - val result = ref (NONE: 'a Exn.result option); - val run = Multithreading.with_attributes (Thread.getAttributes ()) + val job = Multithreading.with_attributes (Thread.getAttributes ()) (fn _ => fn ok => let val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt; @@ -245,63 +245,88 @@ | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true) | _ => false); in res_ok end); + in (result, job) end; + +(* fork *) + +fun fork_future opt_group deps pri e = + let + val _ = scheduler_check "future check"; + + val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); + val (result, job) = future_job group e; val task = SYNCHRONIZED "future" (fn () => - change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ()); + change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ()); in Future {task = task, group = group, result = result} end; -fun fork e = future NONE [] 0 e; -fun fork_group group e = future (SOME group) [] 0 e; -fun fork_deps deps e = future NONE (map task_of deps) 0 e; -fun fork_pri pri e = future NONE [] pri e; +fun fork e = fork_future NONE [] 0 e; +fun fork_group group e = fork_future (SOME group) [] 0 e; +fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e; +fun fork_pri pri e = fork_future NONE [] pri e; -(* join: retrieve results *) +(* join *) + +fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); -fun join_results [] = [] - | join_results xs = uninterruptible (fn _ => fn () => - let - val _ = scheduler_check "join check"; - val _ = Multithreading.self_critical () andalso - exists (not o is_finished) xs andalso - error "Cannot join future values within critical section"; +fun join_results xs = + if forall is_finished xs then map get_result xs + else uninterruptible (fn _ => fn () => + let + val _ = scheduler_check "join check"; + val _ = Multithreading.self_critical () andalso + error "Cannot join future values within critical section"; - fun join_loop _ [] = () - | join_loop name tasks = - (case SYNCHRONIZED name (fn () => - change_result queue (Task_Queue.dequeue_towards tasks)) of - NONE => () - | SOME (work, tasks') => (execute name work; join_loop name tasks')); - val _ = - (case thread_data () of - NONE => - (*alien thread -- refrain from contending for resources*) - while exists (not o is_finished) xs - do SYNCHRONIZED "join_thread" (fn () => wait ()) - | SOME (name, task) => - (*proper task -- actively work towards results*) - let - val unfinished = xs |> map_filter - (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); - val _ = SYNCHRONIZED "join" (fn () => - (change queue (Task_Queue.depend unfinished task); notify_all ())); - val _ = join_loop ("join_loop: " ^ name) unfinished; - val _ = - while exists (not o is_finished) xs - do SYNCHRONIZED "join_task" (fn () => worker_wait ()); - in () end); + fun join_loop _ [] = () + | join_loop name deps = + (case SYNCHRONIZED name (fn () => + change_result queue (Task_Queue.dequeue_towards deps)) of + NONE => () + | SOME (work, deps') => (execute name work; join_loop name deps')); + val _ = + (case thread_data () of + NONE => + (*alien thread -- refrain from contending for resources*) + while not (forall is_finished xs) + do SYNCHRONIZED "join_thread" (fn () => wait ()) + | SOME (name, task) => + (*proper task -- actively work towards results*) + let + val pending = filter_out is_finished xs; + val deps = map task_of pending; + val _ = SYNCHRONIZED "join" (fn () => + (change queue (Task_Queue.depend deps task); notify_all ())); + val _ = join_loop ("join_loop: " ^ name) deps; + val _ = + while not (forall is_finished pending) + do SYNCHRONIZED "join_task" (fn () => worker_wait ()); + in () end); - in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) (); + in map get_result xs end) (); fun join_result x = singleton join_results x; fun join x = Exn.release (join_result x); -fun map f x = - let val task = task_of x - in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end; + +(* map *) + +fun map_future f (x as Future {task, group, ...}) = + let + val _ = scheduler_check "map_future check"; + + val (result', job) = future_job group (fn () => f (join x)); + val extended = SYNCHRONIZED "map_future" (fn () => + (case Task_Queue.extend task job (! queue) of + SOME queue' => (queue := queue'; true) + | NONE => false)); + in + if extended then Future {task = task, group = group, result = result'} + else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) + end; -(* misc operations *) +(* cancel *) (*interrupt: permissive signal, may get ignored*) fun interrupt_task id = SYNCHRONIZED "interrupt" @@ -313,7 +338,9 @@ SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ()))); -(*global join and shutdown*) + +(** global join and shutdown **) + fun shutdown () = if Multithreading.available then (scheduler_check "shutdown check"; @@ -327,6 +354,10 @@ OS.Process.sleep (Time.fromMilliseconds 300)))) else (); + +(*final declarations of this structure!*) +val map = map_future; + end; type 'a future = 'a Future.future;