src/Pure/Concurrent/task_queue.ML
changeset 34279 02936e77a07c
parent 34277 7325a5e3587f
child 34280 16bf3e9786a3
--- a/src/Pure/Concurrent/task_queue.ML	Wed Jan 06 13:14:28 2010 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Wed Jan 06 15:07:56 2010 +0100
@@ -22,8 +22,8 @@
   val empty: queue
   val all_passive: queue -> bool
   val status: queue -> {ready: int, pending: int, running: int, passive: int}
-  val cancel: queue -> group -> bool
-  val cancel_all: queue -> group list
+  val cancel: group -> queue -> bool * queue
+  val cancel_all: queue -> group list * queue
   val enqueue_passive: group -> queue -> task * queue
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
@@ -81,6 +81,8 @@
   not (null (Synchronized.value status)) orelse
     (case parent of NONE => false | SOME group => is_canceled group);
 
+fun is_ready deps group = null deps orelse is_canceled group;
+
 fun group_status (Group {parent, status, ...}) =
   Synchronized.value status @
     (case parent of NONE => [] | SOME group => group_status group);
@@ -137,9 +139,9 @@
 fun status (Queue {jobs, ...}) =
   let
     val (x, y, z, w) =
-      Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
+      Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) =>
           (case job of
-            Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
+            Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w)
           | Running _ => (x, y, z + 1, w)
           | Passive => (x, y, z, w + 1)))
         jobs (0, 0, 0, 0);
@@ -148,16 +150,16 @@
 
 (* cancel -- peers and sub-groups *)
 
-fun cancel (Queue {groups, jobs, ...}) group =
+fun cancel group (Queue {groups, jobs, ...}) =
   let
     val _ = cancel_group group Exn.Interrupt;
     val tasks = Inttab.lookup_list groups (group_id group);
     val running =
       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
     val _ = List.app SimpleThread.interrupt running;
-  in null running end;
+  in (null running, make_queue groups jobs Unknown) end;
 
-fun cancel_all (Queue {jobs, ...}) =
+fun cancel_all (Queue {groups, jobs, ...}) =
   let
     fun cancel_job (group, job) (groups, running) =
       (cancel_group group Exn.Interrupt;
@@ -166,7 +168,7 @@
         | _ => (groups, running)));
     val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
     val _ = List.app SimpleThread.interrupt running;
-  in running_groups end;
+  in (running_groups, make_queue groups jobs Unknown) end;
 
 
 (* enqueue *)
@@ -207,7 +209,8 @@
 
 fun dequeue thread (queue as Queue {groups, jobs, cache}) =
   let
-    fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
+    fun ready (task, ((group, Job list), (deps, _))) =
+          if is_ready deps group then SOME (task, group, rev list) else NONE
       | ready _ = NONE;
     fun deq boundary =
       (case Task_Graph.get_first boundary ready jobs of
@@ -235,7 +238,7 @@
     fun ready task =
       (case Task_Graph.get_node jobs task of
         (group, Job list) =>
-          if null (get_deps jobs task)
+          if is_ready (get_deps jobs task) group
           then SOME (task, group, rev list)
           else NONE
       | _ => NONE);