diff -r b8f8488704e2 -r 061599cb6eb0 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Fri Aug 19 14:01:20 2011 +0200 +++ b/src/Pure/Concurrent/future.ML Fri Aug 19 15:56:26 2011 +0200 @@ -31,6 +31,9 @@ that lack regular result information, will pick up parallel exceptions from the cumulative group context (as Par_Exn). + * Future task groups may be canceled: present and future group + members will be interrupted eventually. + * Promised "passive" futures are fulfilled by external means. There is no associated evaluation task, but other futures can depend on them via regular join operations. @@ -46,9 +49,6 @@ val peek: 'a future -> 'a Exn.result option val is_finished: 'a future -> bool val get_finished: 'a future -> 'a - val interruptible_task: ('a -> 'b) -> 'a -> 'b - val cancel_group: Task_Queue.group -> unit - val cancel: 'a future -> unit type fork_params = {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int, interrupts: bool} @@ -61,6 +61,9 @@ val value_result: 'a Exn.result -> 'a future val value: 'a -> 'a future val map: ('a -> 'b) -> 'a future -> 'b future + val cancel_group: Task_Queue.group -> unit future + val cancel: 'a future -> unit future + val interruptible_task: ('a -> 'b) -> 'a -> 'b val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future val promise: (unit -> unit) -> 'a future @@ -173,16 +176,6 @@ (* cancellation primitives *) -fun interruptible_task f x = - (if Multithreading.available then - Multithreading.with_attributes - (if is_some (worker_task ()) - then Multithreading.private_interrupts - else Multithreading.public_interrupts) - (fn _ => f x) - else interruptible f x) - before Multithreading.interrupted (); - fun cancel_now group = (*requires SYNCHRONIZED*) Task_Queue.cancel (! queue) group; @@ -213,7 +206,7 @@ val test = Exn.capture Multithreading.interrupted (); val _ = if ok andalso not (Exn.is_interrupt_exn test) then () - else if cancel_now group then () + else if null (cancel_now group) then () else cancel_later group; val _ = broadcast work_finished; val _ = if maximal then () else signal work_available; @@ -347,7 +340,7 @@ else (Multithreading.tracing 1 (fn () => string_of_int (length (! canceled)) ^ " canceled groups"); - Unsynchronized.change canceled (filter_out cancel_now); + Unsynchronized.change canceled (filter_out (null o cancel_now)); broadcast_work ()); @@ -386,20 +379,18 @@ if scheduler_active () then () else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); +fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () => + let + val running = cancel_now group; + val _ = + if null running then () + else (cancel_later group; signal work_available; scheduler_check ()); + in running end); + (** futures **) -(* cancellation *) - -(*cancel: present and future group members will be interrupted eventually*) -fun cancel_group group = SYNCHRONIZED "cancel" (fn () => - (if cancel_now group then () else cancel_later group; - signal work_available; scheduler_check ())); - -fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); - - (* future jobs *) fun assign_result group result raw_res = @@ -559,6 +550,29 @@ else map (fn e => value_result (Exn.interruptible_capture e ())) es; +(* cancel *) + +fun cancel_group group = + (case scheduler_cancel group of + [] => value () + | running => + singleton + (forks {name = "cancel_group", group = SOME (Task_Queue.new_group NONE), + deps = running, pri = 0, interrupts = false}) I); + +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); + +fun interruptible_task f x = + (if Multithreading.available then + Multithreading.with_attributes + (if is_some (worker_task ()) + then Multithreading.private_interrupts + else Multithreading.public_interrupts) + (fn _ => f x) + else interruptible f x) + before Multithreading.interrupted (); + + (* promised futures -- fulfilled by external means *) fun promise_group group abort : 'a future =