--- a/src/Pure/Concurrent/future.ML Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/Concurrent/future.ML Fri Aug 19 15:56:26 2011 +0200
@@ -31,6 +31,9 @@
that lack regular result information, will pick up parallel
exceptions from the cumulative group context (as Par_Exn).
+ * Future task groups may be canceled: present and future group
+ members will be interrupted eventually.
+
* Promised "passive" futures are fulfilled by external means. There
is no associated evaluation task, but other futures can depend on
them via regular join operations.
@@ -46,9 +49,6 @@
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
val get_finished: 'a future -> 'a
- val interruptible_task: ('a -> 'b) -> 'a -> 'b
- val cancel_group: Task_Queue.group -> unit
- val cancel: 'a future -> unit
type fork_params =
{name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
pri: int, interrupts: bool}
@@ -61,6 +61,9 @@
val value_result: 'a Exn.result -> 'a future
val value: 'a -> 'a future
val map: ('a -> 'b) -> 'a future -> 'b future
+ val cancel_group: Task_Queue.group -> unit future
+ val cancel: 'a future -> unit future
+ val interruptible_task: ('a -> 'b) -> 'a -> 'b
val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future
val promise: (unit -> unit) -> 'a future
@@ -173,16 +176,6 @@
(* cancellation primitives *)
-fun interruptible_task f x =
- (if Multithreading.available then
- Multithreading.with_attributes
- (if is_some (worker_task ())
- then Multithreading.private_interrupts
- else Multithreading.public_interrupts)
- (fn _ => f x)
- else interruptible f x)
- before Multithreading.interrupted ();
-
fun cancel_now group = (*requires SYNCHRONIZED*)
Task_Queue.cancel (! queue) group;
@@ -213,7 +206,7 @@
val test = Exn.capture Multithreading.interrupted ();
val _ =
if ok andalso not (Exn.is_interrupt_exn test) then ()
- else if cancel_now group then ()
+ else if null (cancel_now group) then ()
else cancel_later group;
val _ = broadcast work_finished;
val _ = if maximal then () else signal work_available;
@@ -347,7 +340,7 @@
else
(Multithreading.tracing 1 (fn () =>
string_of_int (length (! canceled)) ^ " canceled groups");
- Unsynchronized.change canceled (filter_out cancel_now);
+ Unsynchronized.change canceled (filter_out (null o cancel_now));
broadcast_work ());
@@ -386,20 +379,18 @@
if scheduler_active () then ()
else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
+fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () =>
+ let
+ val running = cancel_now group;
+ val _ =
+ if null running then ()
+ else (cancel_later group; signal work_available; scheduler_check ());
+ in running end);
+
(** futures **)
-(* cancellation *)
-
-(*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
- (if cancel_now group then () else cancel_later group;
- signal work_available; scheduler_check ()));
-
-fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
-
-
(* future jobs *)
fun assign_result group result raw_res =
@@ -559,6 +550,29 @@
else map (fn e => value_result (Exn.interruptible_capture e ())) es;
+(* cancel *)
+
+fun cancel_group group =
+ (case scheduler_cancel group of
+ [] => value ()
+ | running =>
+ singleton
+ (forks {name = "cancel_group", group = SOME (Task_Queue.new_group NONE),
+ deps = running, pri = 0, interrupts = false}) I);
+
+fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
+
+fun interruptible_task f x =
+ (if Multithreading.available then
+ Multithreading.with_attributes
+ (if is_some (worker_task ())
+ then Multithreading.private_interrupts
+ else Multithreading.public_interrupts)
+ (fn _ => f x)
+ else interruptible f x)
+ before Multithreading.interrupted ();
+
+
(* promised futures -- fulfilled by external means *)
fun promise_group group abort : 'a future =