simplified Future.cancel/cancel_group (again) -- running threads only;
authorwenzelm
Mon, 09 Apr 2012 17:22:23 +0200
changeset 47404 e6e5750f1311
parent 47403 296b478df14e
child 47405 559b44b5164c
simplified Future.cancel/cancel_group (again) -- running threads only; more robust update/cancel_execution: full join_tasks of group before exec state assignment; tuned signature;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/par_list.ML
src/Pure/Concurrent/task_queue.ML
src/Pure/PIDE/document.ML
src/Pure/PIDE/protocol.ML
--- a/src/Pure/Concurrent/future.ML	Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/Concurrent/future.ML	Mon Apr 09 17:22:23 2012 +0200
@@ -52,8 +52,8 @@
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
   val interruptible_task: ('a -> 'b) -> 'a -> 'b
-  val cancel_group: group -> task list
-  val cancel: 'a future -> task list
+  val cancel_group: group -> unit
+  val cancel: 'a future -> unit
   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
   val default_params: params
   val forks: params -> (unit -> 'a) list -> 'a future list
@@ -74,6 +74,8 @@
   val fulfill: 'a future -> 'a -> unit
   val shutdown: unit -> unit
   val status: (unit -> 'a) -> 'a
+  val group_tasks: group -> task list
+  val queue_status: unit -> {ready: int, pending: int, running: int, passive: int}
 end;
 
 structure Future: FUTURE =
@@ -181,9 +183,9 @@
 
 fun cancel_now group = (*requires SYNCHRONIZED*)
   let
-    val (tasks, threads) = Task_Queue.cancel (! queue) group;
-    val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
-  in tasks end;
+    val running = Task_Queue.cancel (! queue) group;
+    val _ = List.app Simple_Thread.interrupt_unsynchronized running;
+  in running end;
 
 fun cancel_all () = (*requires SYNCHRONIZED*)
   let
@@ -413,7 +415,7 @@
     val _ =
       if null running then ()
       else (cancel_later group; signal work_available; scheduler_check ());
-  in running end);
+  in () end);
 
 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
 
@@ -650,6 +652,13 @@
   in x end;
 
 
+(* queue status *)
+
+fun group_tasks group = Task_Queue.group_tasks (! queue) group;
+
+fun queue_status () = Task_Queue.status (! queue);
+
+
 (*final declarations of this structure!*)
 val map = map_future;
 
--- a/src/Pure/Concurrent/par_list.ML	Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/Concurrent/par_list.ML	Mon Apr 09 17:22:23 2012 +0200
@@ -39,8 +39,7 @@
         Future.forks {name = name, group = SOME group, deps = [], pri = 0, interrupts = true}
           (map (fn x => fn () => f x) xs);
       val results = Future.join_results futures
-        handle exn =>
-          (if Exn.is_interrupt exn then ignore (Future.cancel_group group) else (); reraise exn);
+        handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
     in results end;
 
 fun map_name name f xs = Par_Exn.release_first (managed_results name f xs);
--- a/src/Pure/Concurrent/task_queue.ML	Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Mon Apr 09 17:22:23 2012 +0200
@@ -28,10 +28,11 @@
   val waiting: task -> task list -> (unit -> 'a) -> 'a
   type queue
   val empty: queue
+  val group_tasks: queue -> group -> task list
   val known_task: queue -> task -> bool
   val all_passive: queue -> bool
   val status: queue -> {ready: int, pending: int, running: int, passive: int}
-  val cancel: queue -> group -> task list * Thread.thread list
+  val cancel: queue -> group -> Thread.thread list
   val cancel_all: queue -> group list * Thread.thread list
   val finish: task -> queue -> bool * queue
   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
@@ -210,6 +211,7 @@
 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
 val empty = make_queue Inttab.empty Task_Graph.empty;
 
+fun group_tasks (Queue {groups, ...}) group = Tasks.keys (get_tasks groups (group_id group));
 fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task;
 
 
@@ -254,12 +256,9 @@
   let
     val _ = cancel_group group Exn.Interrupt;
     val running =
-      Tasks.fold (fn (task, _) => fn (tasks, threads) =>
-          (case get_job jobs task of
-            Running thread => (task :: tasks, insert Thread.equal thread threads)
-          | Passive _ => (task :: tasks, threads)
-          | _ => (tasks, threads)))
-        (get_tasks groups (group_id group)) ([], []);
+      Tasks.fold (fn (task, _) =>
+          (case get_job jobs task of Running thread => insert Thread.equal thread | _ => I))
+        (get_tasks groups (group_id group)) [];
   in running end;
 
 fun cancel_all (Queue {jobs, ...}) =
--- a/src/Pure/PIDE/document.ML	Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/PIDE/document.ML	Mon Apr 09 17:22:23 2012 +0200
@@ -26,7 +26,7 @@
   val init_state: state
   val define_command: command_id -> string -> string -> state -> state
   val discontinue_execution: state -> unit
-  val cancel_execution: state -> Future.task list
+  val cancel_execution: state -> unit
   val execute: version_id -> state -> state
   val update: version_id -> version_id -> edit list -> state ->
     (command_id * exec_id option) list * state
@@ -301,6 +301,7 @@
 fun discontinue_execution (State {execution = (_, _, running), ...}) = running := false;
 
 fun cancel_execution (State {execution = (_, group, _), ...}) = Future.cancel_group group;
+fun execution_tasks (State {execution = (_, group, _), ...}) = Future.group_tasks group;
 
 end;
 
@@ -476,6 +477,7 @@
     val nodes = nodes_of new_version;
     val is_required = make_required nodes;
 
+    val _ = Future.join_tasks (execution_tasks state);
     val updated =
       nodes |> Graph.schedule
         (fn deps => fn (name, node) =>
--- a/src/Pure/PIDE/protocol.ML	Mon Apr 09 15:10:52 2012 +0200
+++ b/src/Pure/PIDE/protocol.ML	Mon Apr 09 17:22:23 2012 +0200
@@ -18,12 +18,14 @@
 
 val _ =
   Isabelle_Process.protocol_command "Document.cancel_execution"
-    (fn [] => ignore (Document.cancel_execution (Document.state ())));
+    (fn [] => Document.cancel_execution (Document.state ()));
 
 val _ =
   Isabelle_Process.protocol_command "Document.update"
     (fn [old_id_string, new_id_string, edits_yxml] => Document.change_state (fn state =>
       let
+        val _ = Document.cancel_execution state;
+
         val old_id = Document.parse_id old_id_string;
         val new_id = Document.parse_id new_id_string;
         val edits =
@@ -48,9 +50,7 @@
                   fn (a, []) => Document.Perspective (map int_atom a)]))
             end;
 
-        val running = Document.cancel_execution state;
         val (assignment, state1) = Document.update old_id new_id edits state;
-        val _ = Future.join_tasks running;
         val _ =
           Output.protocol_message Isabelle_Markup.assign_execs
             ((new_id, assignment) |>