future: allow explicit group;
cancel: invalidate group identifier for all future members;
tuned comments;
tuned;
--- a/src/Pure/Concurrent/future.ML Wed Sep 10 19:44:28 2008 +0200
+++ b/src/Pure/Concurrent/future.ML Wed Sep 10 19:44:29 2008 +0200
@@ -13,7 +13,7 @@
val task_of: 'a T -> task
val group_of: 'a T -> group
val shutdown_request: unit -> unit
- val future: bool -> task list -> (unit -> 'a) -> 'a T
+ val future: group option -> task list -> (unit -> 'a) -> 'a T
val fork: (unit -> 'a) -> 'a T
val cancel: 'a T -> unit
val join_all: 'a T list -> 'a list
@@ -111,7 +111,7 @@
val _ = SYNCHRONIZED (fn () =>
(change queue (TaskQueue.finish task);
if ok then ()
- else if change_result queue (TaskQueue.cancel group) then ()
+ else if TaskQueue.cancel (! queue) group then ()
else cancel_request group;
notify_all ()));
in () end;
@@ -148,41 +148,45 @@
(* scheduler *)
-fun scheduler_fork () = SYNCHRONIZED (fn () =>
+fun scheduler_fork shutdown = SYNCHRONIZED (fn () =>
let
val _ = trace_active ();
- val m = Multithreading.max_threads_value ();
+ val _ =
+ (case List.partition Thread.isActive (! workers) of
+ (_, []) => ()
+ | (active, inactive) =>
+ (workers := active; Multithreading.tracing 0 (fn () =>
+ "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " inactive worker threads")));
+
+ val m = if shutdown then 0 else Multithreading.max_threads_value ();
val l = length (! workers);
val _ = excessive := l - m;
- in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end);
+ val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1);
+ in null (! workers) end);
-fun scheduler_loop canceled =
- let
- val canceled' = SYNCHRONIZED (fn () =>
- filter_out (change_result queue o TaskQueue.cancel) canceled);
- val _ = scheduler_fork ();
- in
- (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
- SOME Shutdown => () (* FIXME proper worker shutdown *)
- | SOME (Cancel group) => scheduler_loop (group :: canceled')
- | NONE => scheduler_loop canceled')
- end;
+fun scheduler_loop (shutdown, canceled) =
+ if scheduler_fork shutdown then ()
+ else
+ let val canceled' = SYNCHRONIZED (fn () => filter_out (TaskQueue.cancel (! queue)) canceled) in
+ (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
+ SOME Shutdown => scheduler_loop (true, canceled')
+ | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled')
+ | NONE => scheduler_loop (shutdown, canceled'))
+ end;
-fun check_scheduler () = SYNCHRONIZED (fn () =>
+fun scheduler_check () = SYNCHRONIZED (fn () =>
if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then ()
- else scheduler := SOME (Thread.fork (fn () => scheduler_loop [], Multithreading.no_interrupts)));
+ else scheduler :=
+ SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts)));
-(* future values *)
+(* future values: fork independent computation *)
-fun future new_group deps (e: unit -> 'a) =
+fun future opt_group deps (e: unit -> 'a) =
let
- val _ = check_scheduler ();
+ val _ = scheduler_check ();
- val group =
- (case (new_group, thread_data ()) of
- (false, SOME (_, group)) => group
- | _ => TaskQueue.new_group ());
+ val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
val result = ref (NONE: 'a Exn.result option);
val run = Multithreading.with_attributes (Thread.getAttributes ())
@@ -194,16 +198,14 @@
change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
in Future {task = task, group = group, result = result} end;
-fun fork e = future false [] e;
-
-fun cancel x = (check_scheduler (); cancel_request (group_of x));
+fun fork e = future (Option.map #2 (thread_data ())) [] e;
-(* join *)
+(* join: retrieve results *)
fun join_all xs =
let
- val _ = check_scheduler ();
+ val _ = scheduler_check ();
fun unfinished () =
xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
@@ -236,4 +238,13 @@
fun join x = singleton join_all x;
+
+(* termination *)
+
+(*cancel: present and future group members will be interrupted eventually*)
+fun cancel x = (scheduler_check (); cancel_request (group_of x));
+
+(*interrupt: adhoc signal, permissive, may get ignored*)
+fun interrupt_task id = SYNCHRONIZED (fn () => TaskQueue.interrupt (! queue) id);
+
end;