refined Future.cancel: explicit future allows to join actual cancellation;
authorwenzelm
Fri, 19 Aug 2011 15:56:26 +0200
changeset 44299 061599cb6eb0
parent 44298 b8f8488704e2
child 44300 349cc426d929
refined Future.cancel: explicit future allows to join actual cancellation; Document.cancel_execution: join nested future groups as well;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/par_list.ML
src/Pure/Concurrent/task_queue.ML
src/Pure/PIDE/document.ML
src/Pure/PIDE/isar_document.ML
--- a/src/Pure/Concurrent/future.ML	Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/Concurrent/future.ML	Fri Aug 19 15:56:26 2011 +0200
@@ -31,6 +31,9 @@
     that lack regular result information, will pick up parallel
     exceptions from the cumulative group context (as Par_Exn).
 
+  * Future task groups may be canceled: present and future group
+    members will be interrupted eventually.
+
   * Promised "passive" futures are fulfilled by external means.  There
     is no associated evaluation task, but other futures can depend on
     them via regular join operations.
@@ -46,9 +49,6 @@
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
   val get_finished: 'a future -> 'a
-  val interruptible_task: ('a -> 'b) -> 'a -> 'b
-  val cancel_group: Task_Queue.group -> unit
-  val cancel: 'a future -> unit
   type fork_params =
    {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
     pri: int, interrupts: bool}
@@ -61,6 +61,9 @@
   val value_result: 'a Exn.result -> 'a future
   val value: 'a -> 'a future
   val map: ('a -> 'b) -> 'a future -> 'b future
+  val cancel_group: Task_Queue.group -> unit future
+  val cancel: 'a future -> unit future
+  val interruptible_task: ('a -> 'b) -> 'a -> 'b
   val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
   val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future
   val promise: (unit -> unit) -> 'a future
@@ -173,16 +176,6 @@
 
 (* cancellation primitives *)
 
-fun interruptible_task f x =
-  (if Multithreading.available then
-    Multithreading.with_attributes
-      (if is_some (worker_task ())
-       then Multithreading.private_interrupts
-       else Multithreading.public_interrupts)
-      (fn _ => f x)
-   else interruptible f x)
-  before Multithreading.interrupted ();
-
 fun cancel_now group = (*requires SYNCHRONIZED*)
   Task_Queue.cancel (! queue) group;
 
@@ -213,7 +206,7 @@
         val test = Exn.capture Multithreading.interrupted ();
         val _ =
           if ok andalso not (Exn.is_interrupt_exn test) then ()
-          else if cancel_now group then ()
+          else if null (cancel_now group) then ()
           else cancel_later group;
         val _ = broadcast work_finished;
         val _ = if maximal then () else signal work_available;
@@ -347,7 +340,7 @@
       else
        (Multithreading.tracing 1 (fn () =>
           string_of_int (length (! canceled)) ^ " canceled groups");
-        Unsynchronized.change canceled (filter_out cancel_now);
+        Unsynchronized.change canceled (filter_out (null o cancel_now));
         broadcast_work ());
 
 
@@ -386,20 +379,18 @@
   if scheduler_active () then ()
   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
 
+fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () =>
+  let
+    val running = cancel_now group;
+    val _ =
+      if null running then ()
+      else (cancel_later group; signal work_available; scheduler_check ());
+  in running end);
+
 
 
 (** futures **)
 
-(* cancellation *)
-
-(*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
- (if cancel_now group then () else cancel_later group;
-  signal work_available; scheduler_check ()));
-
-fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
-
-
 (* future jobs *)
 
 fun assign_result group result raw_res =
@@ -559,6 +550,29 @@
   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
 
 
+(* cancel *)
+
+fun cancel_group group =
+  (case scheduler_cancel group of
+    [] => value ()
+  | running =>
+      singleton
+        (forks {name = "cancel_group", group = SOME (Task_Queue.new_group NONE),
+          deps = running, pri = 0, interrupts = false}) I);
+
+fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
+
+fun interruptible_task f x =
+  (if Multithreading.available then
+    Multithreading.with_attributes
+      (if is_some (worker_task ())
+       then Multithreading.private_interrupts
+       else Multithreading.public_interrupts)
+      (fn _ => f x)
+   else interruptible f x)
+  before Multithreading.interrupted ();
+
+
 (* promised futures -- fulfilled by external means *)
 
 fun promise_group group abort : 'a future =
--- a/src/Pure/Concurrent/par_list.ML	Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/Concurrent/par_list.ML	Fri Aug 19 15:56:26 2011 +0200
@@ -39,7 +39,8 @@
         Future.forks {name = name, group = SOME group, deps = [], pri = 0, interrupts = true}
           (map (fn x => fn () => f x) xs);
       val results = Future.join_results futures
-        handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
+        handle exn =>
+          (if Exn.is_interrupt exn then ignore (Future.cancel_group group) else (); reraise exn);
     in results end;
 
 fun map_name name f xs = Par_Exn.release_first (managed_results name f xs);
--- a/src/Pure/Concurrent/task_queue.ML	Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Fri Aug 19 15:56:26 2011 +0200
@@ -31,7 +31,7 @@
   val known_task: queue -> task -> bool
   val all_passive: queue -> bool
   val status: queue -> {ready: int, pending: int, running: int, passive: int}
-  val cancel: queue -> group -> bool
+  val cancel: queue -> group -> task list
   val cancel_all: queue -> group list
   val finish: task -> queue -> bool * queue
   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
@@ -248,10 +248,12 @@
   let
     val _ = cancel_group group Exn.Interrupt;
     val running =
-      Tasks.fold (#1 #> get_job jobs #> (fn Running t => insert Thread.equal t | _ => I))
+      Tasks.fold (fn (task, _) =>
+          (case get_job jobs task of Running thread => cons (task, thread) | _ => I))
         (get_tasks groups (group_id group)) [];
-    val _ = List.app Simple_Thread.interrupt_unsynchronized running;
-  in null running end;
+    val threads = fold (insert Thread.equal o #2) running [];
+    val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
+  in map #1 running end;
 
 fun cancel_all (Queue {jobs, ...}) =
   let
--- a/src/Pure/PIDE/document.ML	Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/PIDE/document.ML	Fri Aug 19 15:56:26 2011 +0200
@@ -24,7 +24,7 @@
   type state
   val init_state: state
   val join_commands: state -> unit
-  val cancel_execution: state -> unit -> unit
+  val cancel_execution: state -> unit future
   val define_command: command_id -> string -> state -> state
   val edit: version_id -> version_id -> edit list -> state -> (command_id * exec_id) list * state
   val execute: version_id -> state -> state
@@ -164,7 +164,7 @@
  {versions: version Inttab.table,  (*version_id -> document content*)
   commands: Toplevel.transition future Inttab.table,  (*command_id -> transition (future parsing)*)
   execs: Toplevel.state lazy Inttab.table,  (*exec_id -> execution process*)
-  execution: unit future list}  (*global execution process*)
+  execution: Task_Queue.group}  (*global execution process*)
 with
 
 fun make_state (versions, commands, execs, execution) =
@@ -177,7 +177,7 @@
   make_state (Inttab.make [(no_id, empty_version)],
     Inttab.make [(no_id, Future.value Toplevel.empty)],
     Inttab.make [(no_id, empty_exec)],
-    []);
+    Task_Queue.new_group NONE);
 
 
 (* document versions *)
@@ -233,9 +233,7 @@
 
 (* document execution *)
 
-fun cancel_execution (State {execution, ...}) =
-  (List.app Future.cancel execution;
-    fn () => ignore (Future.join_results execution));
+fun cancel_execution (State {execution, ...}) = Future.cancel_group execution;
 
 end;
 
@@ -393,17 +391,18 @@
       fun force_exec NONE = ()
         | force_exec (SOME exec_id) = ignore (Lazy.force (the_exec state exec_id));
 
-      val execution' =
+      val execution = Task_Queue.new_group NONE;
+      val _ =
         nodes_of version |> Graph.schedule
           (fn deps => fn (name, node) =>
             singleton
               (Future.forks
-                {name = "theory:" ^ name, group = NONE,
+                {name = "theory:" ^ name, group = SOME (Task_Queue.new_group (SOME execution)),
                   deps = map (Future.task_of o #2) deps,
                   pri = 1, interrupts = true})
               (fold_entries NONE (fn (_, exec) => fn () => force_exec exec) node));
 
-    in (versions, commands, execs, execution') end);
+    in (versions, commands, execs, execution) end);
 
 
 
--- a/src/Pure/PIDE/isar_document.ML	Fri Aug 19 14:01:20 2011 +0200
+++ b/src/Pure/PIDE/isar_document.ML	Fri Aug 19 15:56:26 2011 +0200
@@ -30,9 +30,9 @@
                   fn ([a], []) => Document.Header (Exn.Exn (ERROR a))]))
             end;
 
-      val await_cancellation = Document.cancel_execution state;
+      val cancellation = Document.cancel_execution state;
       val (updates, state') = Document.edit old_id new_id edits state;
-      val _ = await_cancellation ();
+      val _ = Future.join cancellation;
       val _ = Document.join_commands state';
       val _ =
         Output.status (Markup.markup (Markup.assign new_id)