# HG changeset patch # User wenzelm # Date 1220995805 -7200 # Node ID 6a8417f368371ce08a359b3315ada8a3cc685289 # Parent 0f20cbce493592436860cb75028275896e0cc0a3 cancel: check_scheduler; adapted to simplified TaskQueue.cancel; improved join/join_all: actively work towards results, i.e. do not yield unnecessarily; misc tuning; diff -r 0f20cbce4935 -r 6a8417f36837 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Sep 09 23:30:00 2008 +0200 +++ b/src/Pure/Concurrent/future.ML Tue Sep 09 23:30:05 2008 +0200 @@ -13,9 +13,10 @@ val task_of: 'a T -> task val group_of: 'a T -> group val shutdown_request: unit -> unit - val cancel: 'a T -> unit val future: bool -> task list -> (unit -> 'a) -> 'a T val fork: (unit -> 'a) -> 'a T + val cancel: 'a T -> unit + val join_all: 'a T list -> 'a list val join: 'a T -> 'a end; @@ -69,7 +70,6 @@ fun shutdown_request () = Mailbox.send requests Shutdown; fun cancel_request group = Mailbox.send requests (Cancel group); -fun cancel x = cancel_request (group_of x); (* synchronization *) @@ -101,11 +101,6 @@ (* execute *) -fun cancel_group group = (*requires SYNCHRONIZED*) - (case change_result queue (TaskQueue.cancel group) of - [] => true - | running => (List.app (fn t => Thread.interrupt t handle Thread _ => ()) running; false)); - fun execute name (task, group, run) = let val _ = set_thread_data (SOME (task, group)); @@ -115,7 +110,9 @@ val _ = set_thread_data NONE; val _ = SYNCHRONIZED (fn () => (change queue (TaskQueue.finish task); - if ok then () else if cancel_group group then () else cancel_request group; + if ok then () + else if change_result queue (TaskQueue.cancel group) then () + else cancel_request group; notify_all ())); in () end; @@ -123,7 +120,10 @@ (* worker threads *) fun change_active b = (*requires SYNCHRONIZED*) - (change active (fn n => if b then n + 1 else n - 1); trace_active ()); + (change active (fn n => if b then n + 1 else n - 1); trace_active ()); + +fun worker_wait name = (*requires SYNCHRONIZED*) + (change_active false; wait name; change_active true); fun worker_next name = (*requires SYNCHRONIZED*) if ! excessive > 0 then @@ -132,8 +132,8 @@ change workers (remove Thread.equal (Thread.self ())); NONE) else - (case change_result queue (TaskQueue.dequeue (Thread.self ())) of - NONE => (change_active false; wait name; change_active true; worker_next name) + (case change_result queue TaskQueue.dequeue of + NONE => (worker_wait name; worker_next name) | some => some); fun worker_loop name = @@ -158,7 +158,8 @@ fun scheduler_loop canceled = let - val canceled' = SYNCHRONIZED (fn () => filter_out cancel_group canceled); + val canceled' = SYNCHRONIZED (fn () => + filter_out (change_result queue o TaskQueue.cancel) canceled); val _ = scheduler_fork (); in (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of @@ -195,14 +196,44 @@ fun fork e = future false [] e; -fun join (Future {result, ...}) = +fun cancel x = (check_scheduler (); cancel_request (group_of x)); + + +(* join *) + +fun join_all xs = let val _ = check_scheduler (); - fun passive_loop () = - (case ! result of - NONE => (wait "join"; passive_loop ()) - | SOME res => res); - in Exn.release (SYNCHRONIZED passive_loop) end; + fun unfinished () = + xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); + + (*alien thread -- refrain from contending for resources*) + fun passive_join () = (*requires SYNCHRONIZED*) + (case unfinished () of [] => () + | _ => (wait "join"; passive_join ())); + + (*proper worker thread -- actively work towards results*) + fun active_join () = (*requires SYNCHRONIZED*) + (case unfinished () of [] => () + | tasks => + (case change_result queue (TaskQueue.dequeue_towards tasks) of + NONE => (worker_wait "join"; active_join ()) + | SOME work => (execute "join" work; active_join ()))); + + val _ = + (case thread_data () of + NONE => SYNCHRONIZED passive_join + | SOME (task, _) => SYNCHRONIZED (fn () => + (change queue (TaskQueue.depend (unfinished ()) task); active_join ()))); + + val res = xs |> map (fn Future {result = ref (SOME res), ...} => res); + in + (case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of + NONE => map Exn.release res + | SOME e => raise e) + end; + +fun join x = singleton join_all x; end;