refined Future.cancel: explicit future allows to join actual cancellation;
Document.cancel_execution: join nested future groups as well;
--- a/src/Pure/Concurrent/future.ML Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/Concurrent/future.ML Fri Aug 19 15:56:26 2011 +0200
@@ -31,6 +31,9 @@
that lack regular result information, will pick up parallel
exceptions from the cumulative group context (as Par_Exn).
+ * Future task groups may be canceled: present and future group
+ members will be interrupted eventually.
+
* Promised "passive" futures are fulfilled by external means. There
is no associated evaluation task, but other futures can depend on
them via regular join operations.
@@ -46,9 +49,6 @@
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
val get_finished: 'a future -> 'a
- val interruptible_task: ('a -> 'b) -> 'a -> 'b
- val cancel_group: Task_Queue.group -> unit
- val cancel: 'a future -> unit
type fork_params =
{name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
pri: int, interrupts: bool}
@@ -61,6 +61,9 @@
val value_result: 'a Exn.result -> 'a future
val value: 'a -> 'a future
val map: ('a -> 'b) -> 'a future -> 'b future
+ val cancel_group: Task_Queue.group -> unit future
+ val cancel: 'a future -> unit future
+ val interruptible_task: ('a -> 'b) -> 'a -> 'b
val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future
val promise: (unit -> unit) -> 'a future
@@ -173,16 +176,6 @@
(* cancellation primitives *)
-fun interruptible_task f x =
- (if Multithreading.available then
- Multithreading.with_attributes
- (if is_some (worker_task ())
- then Multithreading.private_interrupts
- else Multithreading.public_interrupts)
- (fn _ => f x)
- else interruptible f x)
- before Multithreading.interrupted ();
-
fun cancel_now group = (*requires SYNCHRONIZED*)
Task_Queue.cancel (! queue) group;
@@ -213,7 +206,7 @@
val test = Exn.capture Multithreading.interrupted ();
val _ =
if ok andalso not (Exn.is_interrupt_exn test) then ()
- else if cancel_now group then ()
+ else if null (cancel_now group) then ()
else cancel_later group;
val _ = broadcast work_finished;
val _ = if maximal then () else signal work_available;
@@ -347,7 +340,7 @@
else
(Multithreading.tracing 1 (fn () =>
string_of_int (length (! canceled)) ^ " canceled groups");
- Unsynchronized.change canceled (filter_out cancel_now);
+ Unsynchronized.change canceled (filter_out (null o cancel_now));
broadcast_work ());
@@ -386,20 +379,18 @@
if scheduler_active () then ()
else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
+fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () =>
+ let
+ val running = cancel_now group;
+ val _ =
+ if null running then ()
+ else (cancel_later group; signal work_available; scheduler_check ());
+ in running end);
+
(** futures **)
-(* cancellation *)
-
-(*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
- (if cancel_now group then () else cancel_later group;
- signal work_available; scheduler_check ()));
-
-fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
-
-
(* future jobs *)
fun assign_result group result raw_res =
@@ -559,6 +550,29 @@
else map (fn e => value_result (Exn.interruptible_capture e ())) es;
+(* cancel *)
+
+fun cancel_group group =
+ (case scheduler_cancel group of
+ [] => value ()
+ | running =>
+ singleton
+ (forks {name = "cancel_group", group = SOME (Task_Queue.new_group NONE),
+ deps = running, pri = 0, interrupts = false}) I);
+
+fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
+
+fun interruptible_task f x =
+ (if Multithreading.available then
+ Multithreading.with_attributes
+ (if is_some (worker_task ())
+ then Multithreading.private_interrupts
+ else Multithreading.public_interrupts)
+ (fn _ => f x)
+ else interruptible f x)
+ before Multithreading.interrupted ();
+
+
(* promised futures -- fulfilled by external means *)
fun promise_group group abort : 'a future =
--- a/src/Pure/Concurrent/par_list.ML Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/Concurrent/par_list.ML Fri Aug 19 15:56:26 2011 +0200
@@ -39,7 +39,8 @@
Future.forks {name = name, group = SOME group, deps = [], pri = 0, interrupts = true}
(map (fn x => fn () => f x) xs);
val results = Future.join_results futures
- handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
+ handle exn =>
+ (if Exn.is_interrupt exn then ignore (Future.cancel_group group) else (); reraise exn);
in results end;
fun map_name name f xs = Par_Exn.release_first (managed_results name f xs);
--- a/src/Pure/Concurrent/task_queue.ML Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Fri Aug 19 15:56:26 2011 +0200
@@ -31,7 +31,7 @@
val known_task: queue -> task -> bool
val all_passive: queue -> bool
val status: queue -> {ready: int, pending: int, running: int, passive: int}
- val cancel: queue -> group -> bool
+ val cancel: queue -> group -> task list
val cancel_all: queue -> group list
val finish: task -> queue -> bool * queue
val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
@@ -248,10 +248,12 @@
let
val _ = cancel_group group Exn.Interrupt;
val running =
- Tasks.fold (#1 #> get_job jobs #> (fn Running t => insert Thread.equal t | _ => I))
+ Tasks.fold (fn (task, _) =>
+ (case get_job jobs task of Running thread => cons (task, thread) | _ => I))
(get_tasks groups (group_id group)) [];
- val _ = List.app Simple_Thread.interrupt_unsynchronized running;
- in null running end;
+ val threads = fold (insert Thread.equal o #2) running [];
+ val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
+ in map #1 running end;
fun cancel_all (Queue {jobs, ...}) =
let
--- a/src/Pure/PIDE/document.ML Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/PIDE/document.ML Fri Aug 19 15:56:26 2011 +0200
@@ -24,7 +24,7 @@
type state
val init_state: state
val join_commands: state -> unit
- val cancel_execution: state -> unit -> unit
+ val cancel_execution: state -> unit future
val define_command: command_id -> string -> state -> state
val edit: version_id -> version_id -> edit list -> state -> (command_id * exec_id) list * state
val execute: version_id -> state -> state
@@ -164,7 +164,7 @@
{versions: version Inttab.table, (*version_id -> document content*)
commands: Toplevel.transition future Inttab.table, (*command_id -> transition (future parsing)*)
execs: Toplevel.state lazy Inttab.table, (*exec_id -> execution process*)
- execution: unit future list} (*global execution process*)
+ execution: Task_Queue.group} (*global execution process*)
with
fun make_state (versions, commands, execs, execution) =
@@ -177,7 +177,7 @@
make_state (Inttab.make [(no_id, empty_version)],
Inttab.make [(no_id, Future.value Toplevel.empty)],
Inttab.make [(no_id, empty_exec)],
- []);
+ Task_Queue.new_group NONE);
(* document versions *)
@@ -233,9 +233,7 @@
(* document execution *)
-fun cancel_execution (State {execution, ...}) =
- (List.app Future.cancel execution;
- fn () => ignore (Future.join_results execution));
+fun cancel_execution (State {execution, ...}) = Future.cancel_group execution;
end;
@@ -393,17 +391,18 @@
fun force_exec NONE = ()
| force_exec (SOME exec_id) = ignore (Lazy.force (the_exec state exec_id));
- val execution' =
+ val execution = Task_Queue.new_group NONE;
+ val _ =
nodes_of version |> Graph.schedule
(fn deps => fn (name, node) =>
singleton
(Future.forks
- {name = "theory:" ^ name, group = NONE,
+ {name = "theory:" ^ name, group = SOME (Task_Queue.new_group (SOME execution)),
deps = map (Future.task_of o #2) deps,
pri = 1, interrupts = true})
(fold_entries NONE (fn (_, exec) => fn () => force_exec exec) node));
- in (versions, commands, execs, execution') end);
+ in (versions, commands, execs, execution) end);
--- a/src/Pure/PIDE/isar_document.ML Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/PIDE/isar_document.ML Fri Aug 19 15:56:26 2011 +0200
@@ -30,9 +30,9 @@
fn ([a], []) => Document.Header (Exn.Exn (ERROR a))]))
end;
- val await_cancellation = Document.cancel_execution state;
+ val cancellation = Document.cancel_execution state;
val (updates, state') = Document.edit old_id new_id edits state;
- val _ = await_cancellation ();
+ val _ = Future.join cancellation;
val _ = Document.join_commands state';
val _ =
Output.status (Markup.markup (Markup.assign new_id)