--- a/src/Pure/Concurrent/task_queue.ML Wed Jan 06 13:14:28 2010 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Wed Jan 06 15:07:56 2010 +0100
@@ -22,8 +22,8 @@
val empty: queue
val all_passive: queue -> bool
val status: queue -> {ready: int, pending: int, running: int, passive: int}
- val cancel: queue -> group -> bool
- val cancel_all: queue -> group list
+ val cancel: group -> queue -> bool * queue
+ val cancel_all: queue -> group list * queue
val enqueue_passive: group -> queue -> task * queue
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
val extend: task -> (bool -> bool) -> queue -> queue option
@@ -81,6 +81,8 @@
not (null (Synchronized.value status)) orelse
(case parent of NONE => false | SOME group => is_canceled group);
+fun is_ready deps group = null deps orelse is_canceled group;
+
fun group_status (Group {parent, status, ...}) =
Synchronized.value status @
(case parent of NONE => [] | SOME group => group_status group);
@@ -137,9 +139,9 @@
fun status (Queue {jobs, ...}) =
let
val (x, y, z, w) =
- Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
+ Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) =>
(case job of
- Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
+ Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w)
| Running _ => (x, y, z + 1, w)
| Passive => (x, y, z, w + 1)))
jobs (0, 0, 0, 0);
@@ -148,16 +150,16 @@
(* cancel -- peers and sub-groups *)
-fun cancel (Queue {groups, jobs, ...}) group =
+fun cancel group (Queue {groups, jobs, ...}) =
let
val _ = cancel_group group Exn.Interrupt;
val tasks = Inttab.lookup_list groups (group_id group);
val running =
fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
val _ = List.app SimpleThread.interrupt running;
- in null running end;
+ in (null running, make_queue groups jobs Unknown) end;
-fun cancel_all (Queue {jobs, ...}) =
+fun cancel_all (Queue {groups, jobs, ...}) =
let
fun cancel_job (group, job) (groups, running) =
(cancel_group group Exn.Interrupt;
@@ -166,7 +168,7 @@
| _ => (groups, running)));
val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
val _ = List.app SimpleThread.interrupt running;
- in running_groups end;
+ in (running_groups, make_queue groups jobs Unknown) end;
(* enqueue *)
@@ -207,7 +209,8 @@
fun dequeue thread (queue as Queue {groups, jobs, cache}) =
let
- fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
+ fun ready (task, ((group, Job list), (deps, _))) =
+ if is_ready deps group then SOME (task, group, rev list) else NONE
| ready _ = NONE;
fun deq boundary =
(case Task_Graph.get_first boundary ready jobs of
@@ -235,7 +238,7 @@
fun ready task =
(case Task_Graph.get_node jobs task of
(group, Job list) =>
- if null (get_deps jobs task)
+ if is_ready (get_deps jobs task) group
then SOME (task, group, rev list)
else NONE
| _ => NONE);