support for nested groups -- cancellation is propagated to peers and subgroups;
authorwenzelm
Tue, 21 Jul 2009 20:37:31 +0200
changeset 32101 e25107ff4f56
parent 32100 8ac6b1102f16
child 32102 81d03a29980c
support for nested groups -- cancellation is propagated to peers and subgroups; added is_canceled; tuned;
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/task_queue.ML	Tue Jul 21 20:24:02 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Tue Jul 21 20:37:31 2009 +0200
@@ -13,8 +13,8 @@
   type group
   val group_id: group -> int
   val eq_group: group * group -> bool
-  val new_group: unit -> group
-  val group_exns: group -> exn list
+  val new_group: group option -> group
+  val group_status: group -> exn list
   val str_of_group: group -> string
   type queue
   val empty: queue
@@ -27,6 +27,7 @@
     (((task * group * (bool -> bool) list) * task list) option * queue)
   val interrupt: queue -> task -> unit
   val interrupt_external: queue -> string -> unit
+  val is_canceled: group -> bool
   val cancel_group: group -> exn -> unit
   val cancel: queue -> group -> bool
   val cancel_all: queue -> group list
@@ -48,19 +49,37 @@
 structure Task_Graph = Graph(type key = task val ord = task_ord);
 
 
-(* groups *)
+(* nested groups *)
+
+datatype group = Group of
+ {parent: group option,
+  id: serial,
+  status: exn list ref};
 
-datatype group = Group of serial * exn list ref;
+fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
 
-fun group_id (Group (gid, _)) = gid;
-fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2;
+fun new_group parent = make_group (parent, serial (), ref []);
+
+fun group_id (Group {id, ...}) = id;
+fun eq_group (group1, group2) = group_id group1 = group_id group2;
 
-fun new_group () = Group (serial (), ref []);
+fun group_ancestry (Group {parent, id, ...}) =
+  id :: (case parent of NONE => [] | SOME group => group_ancestry group);
+
+
+fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () =>
+  (case exn of
+    Exn.Interrupt => if null (! status) then status := [exn] else ()
+  | _ => change status (cons exn)));
 
-fun group_exns (Group (_, ref exns)) = exns;
+fun group_status (Group {parent, status, ...}) = (*non-critical*)
+  ! status @ (case parent of NONE => [] | SOME group => group_status group);
 
-fun str_of_group (Group (i, ref exns)) =
-  if null exns then string_of_int i else enclose "(" ")" (string_of_int i);
+fun is_canceled (Group {parent, status, ...}) = (*non-critical*)
+  not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group);
+
+fun str_of_group group =
+  (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
 
 
 (* jobs *)
@@ -94,7 +113,7 @@
 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
 
 
-(* status *)
+(* queue status *)
 
 fun status (Queue {jobs, ...}) =
   let
@@ -107,14 +126,38 @@
   in {ready = x, pending = y, running = z} end;
 
 
+(* cancel -- peers and sub-groups *)
+
+fun cancel (Queue {groups, jobs, ...}) group =
+  let
+    val _ = cancel_group group Exn.Interrupt;
+    val tasks = Inttab.lookup_list groups (group_id group);
+    val running =
+      fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
+    val _ = List.app SimpleThread.interrupt running;
+  in null running end;
+
+fun cancel_all (Queue {jobs, ...}) =
+  let
+    fun cancel_job (group, job) (groups, running) =
+      (cancel_group group Exn.Interrupt;
+        (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
+        | _ => (groups, running)));
+    val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
+    val _ = List.app SimpleThread.interrupt running;
+  in groups end;
+
+
 (* enqueue *)
 
-fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, cache}) =
+fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
   let
     val task = new_task pri;
-    val groups' = Inttab.cons_list (gid, task) groups;
+    val groups' = groups
+      |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
     val jobs' = jobs
-      |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
+      |> Task_Graph.new_node (task, (group, Job [job]))
+      |> fold (add_job task) deps;
     val cache' =
       (case cache of
         Result last =>
@@ -158,7 +201,8 @@
     fun ready task =
       (case Task_Graph.get_node jobs task of
         (group, Job list) =>
-          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
+          if null (Task_Graph.imm_preds jobs task)
+          then SOME (task, group, rev list)
           else NONE
       | _ => NONE);
 
@@ -181,7 +225,9 @@
 (* sporadic interrupts *)
 
 fun interrupt (Queue {jobs, ...}) task =
-  (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
+  (case try (get_job jobs) task of
+    SOME (Running thread) => SimpleThread.interrupt thread
+  | _ => ());
 
 fun interrupt_external (queue as Queue {jobs, ...}) str =
   (case Int.fromString str of
@@ -194,33 +240,11 @@
 
 (* termination *)
 
-fun cancel_group (Group (_, r)) exn = CRITICAL (fn () =>
-  (case exn of
-    Exn.Interrupt => if null (! r) then r := [exn] else ()
-  | _ => change r (cons exn)));
-
-fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) =
-  let
-    val _ = cancel_group group Exn.Interrupt;
-    val tasks = Inttab.lookup_list groups gid;
-    val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
-    val _ = List.app SimpleThread.interrupt running;
-  in null running end;
-
-fun cancel_all (Queue {jobs, ...}) =
-  let
-    fun cancel_job (group, job) (groups, running) =
-      (cancel_group group Exn.Interrupt;
-        (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
-        | _ => (groups, running)));
-    val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
-    val _ = List.app SimpleThread.interrupt running;
-  in groups end;
-
 fun finish task (Queue {groups, jobs, cache}) =
   let
-    val Group (gid, _) = get_group jobs task;
-    val groups' = Inttab.remove_list (op =) (gid, task) groups;
+    val group = get_group jobs task;
+    val groups' = groups
+      |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
     val jobs' = Task_Graph.del_node task jobs;
     val cache' =
       if null (Task_Graph.imm_succs jobs task) then cache