simplified Future.cancel/cancel_group (again) -- running threads only;
more robust update/cancel_execution: full join_tasks of group before exec state assignment;
tuned signature;
--- a/src/Pure/Concurrent/future.ML Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/Concurrent/future.ML Mon Apr 09 17:22:23 2012 +0200
@@ -52,8 +52,8 @@
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
val interruptible_task: ('a -> 'b) -> 'a -> 'b
- val cancel_group: group -> task list
- val cancel: 'a future -> task list
+ val cancel_group: group -> unit
+ val cancel: 'a future -> unit
type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
val default_params: params
val forks: params -> (unit -> 'a) list -> 'a future list
@@ -74,6 +74,8 @@
val fulfill: 'a future -> 'a -> unit
val shutdown: unit -> unit
val status: (unit -> 'a) -> 'a
+ val group_tasks: group -> task list
+ val queue_status: unit -> {ready: int, pending: int, running: int, passive: int}
end;
structure Future: FUTURE =
@@ -181,9 +183,9 @@
fun cancel_now group = (*requires SYNCHRONIZED*)
let
- val (tasks, threads) = Task_Queue.cancel (! queue) group;
- val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
- in tasks end;
+ val running = Task_Queue.cancel (! queue) group;
+ val _ = List.app Simple_Thread.interrupt_unsynchronized running;
+ in running end;
fun cancel_all () = (*requires SYNCHRONIZED*)
let
@@ -413,7 +415,7 @@
val _ =
if null running then ()
else (cancel_later group; signal work_available; scheduler_check ());
- in running end);
+ in () end);
fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
@@ -650,6 +652,13 @@
in x end;
+(* queue status *)
+
+fun group_tasks group = Task_Queue.group_tasks (! queue) group;
+
+fun queue_status () = Task_Queue.status (! queue);
+
+
(*final declarations of this structure!*)
val map = map_future;
--- a/src/Pure/Concurrent/par_list.ML Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/Concurrent/par_list.ML Mon Apr 09 17:22:23 2012 +0200
@@ -39,8 +39,7 @@
Future.forks {name = name, group = SOME group, deps = [], pri = 0, interrupts = true}
(map (fn x => fn () => f x) xs);
val results = Future.join_results futures
- handle exn =>
- (if Exn.is_interrupt exn then ignore (Future.cancel_group group) else (); reraise exn);
+ handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
in results end;
fun map_name name f xs = Par_Exn.release_first (managed_results name f xs);
--- a/src/Pure/Concurrent/task_queue.ML Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Mon Apr 09 17:22:23 2012 +0200
@@ -28,10 +28,11 @@
val waiting: task -> task list -> (unit -> 'a) -> 'a
type queue
val empty: queue
+ val group_tasks: queue -> group -> task list
val known_task: queue -> task -> bool
val all_passive: queue -> bool
val status: queue -> {ready: int, pending: int, running: int, passive: int}
- val cancel: queue -> group -> task list * Thread.thread list
+ val cancel: queue -> group -> Thread.thread list
val cancel_all: queue -> group list * Thread.thread list
val finish: task -> queue -> bool * queue
val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
@@ -210,6 +211,7 @@
fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
val empty = make_queue Inttab.empty Task_Graph.empty;
+fun group_tasks (Queue {groups, ...}) group = Tasks.keys (get_tasks groups (group_id group));
fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task;
@@ -254,12 +256,9 @@
let
val _ = cancel_group group Exn.Interrupt;
val running =
- Tasks.fold (fn (task, _) => fn (tasks, threads) =>
- (case get_job jobs task of
- Running thread => (task :: tasks, insert Thread.equal thread threads)
- | Passive _ => (task :: tasks, threads)
- | _ => (tasks, threads)))
- (get_tasks groups (group_id group)) ([], []);
+ Tasks.fold (fn (task, _) =>
+ (case get_job jobs task of Running thread => insert Thread.equal thread | _ => I))
+ (get_tasks groups (group_id group)) [];
in running end;
fun cancel_all (Queue {jobs, ...}) =
--- a/src/Pure/PIDE/document.ML Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/PIDE/document.ML Mon Apr 09 17:22:23 2012 +0200
@@ -26,7 +26,7 @@
val init_state: state
val define_command: command_id -> string -> string -> state -> state
val discontinue_execution: state -> unit
- val cancel_execution: state -> Future.task list
+ val cancel_execution: state -> unit
val execute: version_id -> state -> state
val update: version_id -> version_id -> edit list -> state ->
(command_id * exec_id option) list * state
@@ -301,6 +301,7 @@
fun discontinue_execution (State {execution = (_, _, running), ...}) = running := false;
fun cancel_execution (State {execution = (_, group, _), ...}) = Future.cancel_group group;
+fun execution_tasks (State {execution = (_, group, _), ...}) = Future.group_tasks group;
end;
@@ -476,6 +477,7 @@
val nodes = nodes_of new_version;
val is_required = make_required nodes;
+ val _ = Future.join_tasks (execution_tasks state);
val updated =
nodes |> Graph.schedule
(fn deps => fn (name, node) =>
--- a/src/Pure/PIDE/protocol.ML Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/PIDE/protocol.ML Mon Apr 09 17:22:23 2012 +0200
@@ -18,12 +18,14 @@
val _ =
Isabelle_Process.protocol_command "Document.cancel_execution"
- (fn [] => ignore (Document.cancel_execution (Document.state ())));
+ (fn [] => Document.cancel_execution (Document.state ()));
val _ =
Isabelle_Process.protocol_command "Document.update"
(fn [old_id_string, new_id_string, edits_yxml] => Document.change_state (fn state =>
let
+ val _ = Document.cancel_execution state;
+
val old_id = Document.parse_id old_id_string;
val new_id = Document.parse_id new_id_string;
val edits =
@@ -48,9 +50,7 @@
fn (a, []) => Document.Perspective (map int_atom a)]))
end;
- val running = Document.cancel_execution state;
val (assignment, state1) = Document.update old_id new_id edits state;
- val _ = Future.join_tasks running;
val _ =
Output.protocol_message Isabelle_Markup.assign_execs
((new_id, assignment) |>