--- a/src/Pure/Concurrent/task_queue.ML Tue Jul 21 20:24:02 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Tue Jul 21 20:37:31 2009 +0200
@@ -13,8 +13,8 @@
type group
val group_id: group -> int
val eq_group: group * group -> bool
- val new_group: unit -> group
- val group_exns: group -> exn list
+ val new_group: group option -> group
+ val group_status: group -> exn list
val str_of_group: group -> string
type queue
val empty: queue
@@ -27,6 +27,7 @@
(((task * group * (bool -> bool) list) * task list) option * queue)
val interrupt: queue -> task -> unit
val interrupt_external: queue -> string -> unit
+ val is_canceled: group -> bool
val cancel_group: group -> exn -> unit
val cancel: queue -> group -> bool
val cancel_all: queue -> group list
@@ -48,19 +49,37 @@
structure Task_Graph = Graph(type key = task val ord = task_ord);
-(* groups *)
+(* nested groups *)
+
+datatype group = Group of
+ {parent: group option,
+ id: serial,
+ status: exn list ref};
-datatype group = Group of serial * exn list ref;
+fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
-fun group_id (Group (gid, _)) = gid;
-fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2;
+fun new_group parent = make_group (parent, serial (), ref []);
+
+fun group_id (Group {id, ...}) = id;
+fun eq_group (group1, group2) = group_id group1 = group_id group2;
-fun new_group () = Group (serial (), ref []);
+fun group_ancestry (Group {parent, id, ...}) =
+ id :: (case parent of NONE => [] | SOME group => group_ancestry group);
+
+
+fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () =>
+ (case exn of
+ Exn.Interrupt => if null (! status) then status := [exn] else ()
+ | _ => change status (cons exn)));
-fun group_exns (Group (_, ref exns)) = exns;
+fun group_status (Group {parent, status, ...}) = (*non-critical*)
+ ! status @ (case parent of NONE => [] | SOME group => group_status group);
-fun str_of_group (Group (i, ref exns)) =
- if null exns then string_of_int i else enclose "(" ")" (string_of_int i);
+fun is_canceled (Group {parent, status, ...}) = (*non-critical*)
+ not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group);
+
+fun str_of_group group =
+ (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
(* jobs *)
@@ -94,7 +113,7 @@
fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
-(* status *)
+(* queue status *)
fun status (Queue {jobs, ...}) =
let
@@ -107,14 +126,38 @@
in {ready = x, pending = y, running = z} end;
+(* cancel -- peers and sub-groups *)
+
+fun cancel (Queue {groups, jobs, ...}) group =
+ let
+ val _ = cancel_group group Exn.Interrupt;
+ val tasks = Inttab.lookup_list groups (group_id group);
+ val running =
+ fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
+ val _ = List.app SimpleThread.interrupt running;
+ in null running end;
+
+fun cancel_all (Queue {jobs, ...}) =
+ let
+ fun cancel_job (group, job) (groups, running) =
+ (cancel_group group Exn.Interrupt;
+ (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
+ | _ => (groups, running)));
+ val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
+ val _ = List.app SimpleThread.interrupt running;
+ in groups end;
+
+
(* enqueue *)
-fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, cache}) =
+fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
let
val task = new_task pri;
- val groups' = Inttab.cons_list (gid, task) groups;
+ val groups' = groups
+ |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
val jobs' = jobs
- |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
+ |> Task_Graph.new_node (task, (group, Job [job]))
+ |> fold (add_job task) deps;
val cache' =
(case cache of
Result last =>
@@ -158,7 +201,8 @@
fun ready task =
(case Task_Graph.get_node jobs task of
(group, Job list) =>
- if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
+ if null (Task_Graph.imm_preds jobs task)
+ then SOME (task, group, rev list)
else NONE
| _ => NONE);
@@ -181,7 +225,9 @@
(* sporadic interrupts *)
fun interrupt (Queue {jobs, ...}) task =
- (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
+ (case try (get_job jobs) task of
+ SOME (Running thread) => SimpleThread.interrupt thread
+ | _ => ());
fun interrupt_external (queue as Queue {jobs, ...}) str =
(case Int.fromString str of
@@ -194,33 +240,11 @@
(* termination *)
-fun cancel_group (Group (_, r)) exn = CRITICAL (fn () =>
- (case exn of
- Exn.Interrupt => if null (! r) then r := [exn] else ()
- | _ => change r (cons exn)));
-
-fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) =
- let
- val _ = cancel_group group Exn.Interrupt;
- val tasks = Inttab.lookup_list groups gid;
- val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
- val _ = List.app SimpleThread.interrupt running;
- in null running end;
-
-fun cancel_all (Queue {jobs, ...}) =
- let
- fun cancel_job (group, job) (groups, running) =
- (cancel_group group Exn.Interrupt;
- (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
- | _ => (groups, running)));
- val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
- val _ = List.app SimpleThread.interrupt running;
- in groups end;
-
fun finish task (Queue {groups, jobs, cache}) =
let
- val Group (gid, _) = get_group jobs task;
- val groups' = Inttab.remove_list (op =) (gid, task) groups;
+ val group = get_group jobs task;
+ val groups' = groups
+ |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
val jobs' = Task_Graph.del_node task jobs;
val cache' =
if null (Task_Graph.imm_succs jobs task) then cache