src/Pure/Concurrent/future.ML
changeset 44299 061599cb6eb0
parent 44298 b8f8488704e2
child 44300 349cc426d929
--- a/src/Pure/Concurrent/future.ML	Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/Concurrent/future.ML	Fri Aug 19 15:56:26 2011 +0200
@@ -31,6 +31,9 @@
     that lack regular result information, will pick up parallel
     exceptions from the cumulative group context (as Par_Exn).
 
+  * Future task groups may be canceled: present and future group
+    members will be interrupted eventually.
+
   * Promised "passive" futures are fulfilled by external means.  There
     is no associated evaluation task, but other futures can depend on
     them via regular join operations.
@@ -46,9 +49,6 @@
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
   val get_finished: 'a future -> 'a
-  val interruptible_task: ('a -> 'b) -> 'a -> 'b
-  val cancel_group: Task_Queue.group -> unit
-  val cancel: 'a future -> unit
   type fork_params =
    {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
     pri: int, interrupts: bool}
@@ -61,6 +61,9 @@
   val value_result: 'a Exn.result -> 'a future
   val value: 'a -> 'a future
   val map: ('a -> 'b) -> 'a future -> 'b future
+  val cancel_group: Task_Queue.group -> unit future
+  val cancel: 'a future -> unit future
+  val interruptible_task: ('a -> 'b) -> 'a -> 'b
   val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
   val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future
   val promise: (unit -> unit) -> 'a future
@@ -173,16 +176,6 @@
 
 (* cancellation primitives *)
 
-fun interruptible_task f x =
-  (if Multithreading.available then
-    Multithreading.with_attributes
-      (if is_some (worker_task ())
-       then Multithreading.private_interrupts
-       else Multithreading.public_interrupts)
-      (fn _ => f x)
-   else interruptible f x)
-  before Multithreading.interrupted ();
-
 fun cancel_now group = (*requires SYNCHRONIZED*)
   Task_Queue.cancel (! queue) group;
 
@@ -213,7 +206,7 @@
         val test = Exn.capture Multithreading.interrupted ();
         val _ =
           if ok andalso not (Exn.is_interrupt_exn test) then ()
-          else if cancel_now group then ()
+          else if null (cancel_now group) then ()
           else cancel_later group;
         val _ = broadcast work_finished;
         val _ = if maximal then () else signal work_available;
@@ -347,7 +340,7 @@
       else
        (Multithreading.tracing 1 (fn () =>
           string_of_int (length (! canceled)) ^ " canceled groups");
-        Unsynchronized.change canceled (filter_out cancel_now);
+        Unsynchronized.change canceled (filter_out (null o cancel_now));
         broadcast_work ());
 
 
@@ -386,20 +379,18 @@
   if scheduler_active () then ()
   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
 
+fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () =>
+  let
+    val running = cancel_now group;
+    val _ =
+      if null running then ()
+      else (cancel_later group; signal work_available; scheduler_check ());
+  in running end);
+
 
 
 (** futures **)
 
-(* cancellation *)
-
-(*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
- (if cancel_now group then () else cancel_later group;
-  signal work_available; scheduler_check ()));
-
-fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
-
-
 (* future jobs *)
 
 fun assign_result group result raw_res =
@@ -559,6 +550,29 @@
   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
 
 
+(* cancel *)
+
+fun cancel_group group =
+  (case scheduler_cancel group of
+    [] => value ()
+  | running =>
+      singleton
+        (forks {name = "cancel_group", group = SOME (Task_Queue.new_group NONE),
+          deps = running, pri = 0, interrupts = false}) I);
+
+fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
+
+fun interruptible_task f x =
+  (if Multithreading.available then
+    Multithreading.with_attributes
+      (if is_some (worker_task ())
+       then Multithreading.private_interrupts
+       else Multithreading.public_interrupts)
+      (fn _ => f x)
+   else interruptible f x)
+  before Multithreading.interrupted ();
+
+
 (* promised futures -- fulfilled by external means *)
 
 fun promise_group group abort : 'a future =