cancel: check_scheduler;
adapted to simplified TaskQueue.cancel;
improved join/join_all: actively work towards results, i.e. do not yield unnecessarily;
misc tuning;
--- a/src/Pure/Concurrent/future.ML Tue Sep 09 23:30:00 2008 +0200
+++ b/src/Pure/Concurrent/future.ML Tue Sep 09 23:30:05 2008 +0200
@@ -13,9 +13,10 @@
val task_of: 'a T -> task
val group_of: 'a T -> group
val shutdown_request: unit -> unit
- val cancel: 'a T -> unit
val future: bool -> task list -> (unit -> 'a) -> 'a T
val fork: (unit -> 'a) -> 'a T
+ val cancel: 'a T -> unit
+ val join_all: 'a T list -> 'a list
val join: 'a T -> 'a
end;
@@ -69,7 +70,6 @@
fun shutdown_request () = Mailbox.send requests Shutdown;
fun cancel_request group = Mailbox.send requests (Cancel group);
-fun cancel x = cancel_request (group_of x);
(* synchronization *)
@@ -101,11 +101,6 @@
(* execute *)
-fun cancel_group group = (*requires SYNCHRONIZED*)
- (case change_result queue (TaskQueue.cancel group) of
- [] => true
- | running => (List.app (fn t => Thread.interrupt t handle Thread _ => ()) running; false));
-
fun execute name (task, group, run) =
let
val _ = set_thread_data (SOME (task, group));
@@ -115,7 +110,9 @@
val _ = set_thread_data NONE;
val _ = SYNCHRONIZED (fn () =>
(change queue (TaskQueue.finish task);
- if ok then () else if cancel_group group then () else cancel_request group;
+ if ok then ()
+ else if change_result queue (TaskQueue.cancel group) then ()
+ else cancel_request group;
notify_all ()));
in () end;
@@ -123,7 +120,10 @@
(* worker threads *)
fun change_active b = (*requires SYNCHRONIZED*)
- (change active (fn n => if b then n + 1 else n - 1); trace_active ());
+ (change active (fn n => if b then n + 1 else n - 1); trace_active ());
+
+fun worker_wait name = (*requires SYNCHRONIZED*)
+ (change_active false; wait name; change_active true);
fun worker_next name = (*requires SYNCHRONIZED*)
if ! excessive > 0 then
@@ -132,8 +132,8 @@
change workers (remove Thread.equal (Thread.self ()));
NONE)
else
- (case change_result queue (TaskQueue.dequeue (Thread.self ())) of
- NONE => (change_active false; wait name; change_active true; worker_next name)
+ (case change_result queue TaskQueue.dequeue of
+ NONE => (worker_wait name; worker_next name)
| some => some);
fun worker_loop name =
@@ -158,7 +158,8 @@
fun scheduler_loop canceled =
let
- val canceled' = SYNCHRONIZED (fn () => filter_out cancel_group canceled);
+ val canceled' = SYNCHRONIZED (fn () =>
+ filter_out (change_result queue o TaskQueue.cancel) canceled);
val _ = scheduler_fork ();
in
(case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
@@ -195,14 +196,44 @@
fun fork e = future false [] e;
-fun join (Future {result, ...}) =
+fun cancel x = (check_scheduler (); cancel_request (group_of x));
+
+
+(* join *)
+
+fun join_all xs =
let
val _ = check_scheduler ();
- fun passive_loop () =
- (case ! result of
- NONE => (wait "join"; passive_loop ())
- | SOME res => res);
- in Exn.release (SYNCHRONIZED passive_loop) end;
+ fun unfinished () =
+ xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
+
+ (*alien thread -- refrain from contending for resources*)
+ fun passive_join () = (*requires SYNCHRONIZED*)
+ (case unfinished () of [] => ()
+ | _ => (wait "join"; passive_join ()));
+
+ (*proper worker thread -- actively work towards results*)
+ fun active_join () = (*requires SYNCHRONIZED*)
+ (case unfinished () of [] => ()
+ | tasks =>
+ (case change_result queue (TaskQueue.dequeue_towards tasks) of
+ NONE => (worker_wait "join"; active_join ())
+ | SOME work => (execute "join" work; active_join ())));
+
+ val _ =
+ (case thread_data () of
+ NONE => SYNCHRONIZED passive_join
+ | SOME (task, _) => SYNCHRONIZED (fn () =>
+ (change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
+
+ val res = xs |> map (fn Future {result = ref (SOME res), ...} => res);
+ in
+ (case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of
+ NONE => map Exn.release res
+ | SOME e => raise e)
+ end;
+
+fun join x = singleton join_all x;
end;