src/Pure/Concurrent/task_queue.ML
changeset 28185 0f20cbce4935
parent 28184 5ed5cb73a2e9
child 28190 0a2434cf38c9
--- a/src/Pure/Concurrent/task_queue.ML	Tue Sep 09 20:22:40 2008 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Tue Sep 09 23:30:00 2008 +0200
@@ -15,8 +15,10 @@
   type queue
   val empty: queue
   val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
-  val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue
-  val cancel: group -> queue -> Thread.thread list * queue
+  val depend: task list -> task -> queue -> queue
+  val dequeue: queue -> (task * group * (unit -> bool)) option * queue
+  val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
+  val cancel: group -> queue -> bool * queue
   val finish: task -> queue -> queue
 end;
 
@@ -42,9 +44,12 @@
 
 type jobs = (group * job) IntGraph.T;
 
+fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id;
 fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
 fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
 fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
+fun add_job (Task id) (Task dep) (jobs: jobs) =
+  IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
 
 
 (* queue of grouped jobs *)
@@ -57,31 +62,43 @@
 val empty = make_queue Inttab.empty IntGraph.empty;
 
 
-(* queue operations *)
+(* enqueue *)
 
 fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) =
   let
     val id = serial ();
     val task = Task id;
     val groups' = Inttab.cons_list (gid, task) groups;
-
-    fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G
-      handle IntGraph.UNDEF _ => G;
-    val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps;
+    val jobs' = jobs
+      |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps;
   in (task, make_queue groups' jobs') end;
 
-fun dequeue thread (queue as Queue {groups, jobs}) =
+fun depend deps task (Queue {groups, jobs}) =
+  make_queue groups (fold (add_job task) deps jobs);
+
+
+(* dequeue *)
+
+fun dequeue_if P (queue as Queue {groups, jobs}) =
   let
-    fun ready (id, ((group, Job (ok, job)), ([], _))) = SOME (Task id, group, (fn () => job ok))
+    fun ready (id, ((group, Job (ok, job)), ([], _))) =
+          if P id then SOME (Task id, group, (fn () => job ok)) else NONE
       | ready _ = NONE;
   in
     (case IntGraph.get_first ready jobs of
       NONE => (NONE, queue)
     | SOME result =>
-        let val jobs' = map_job (#1 result) (K (Running thread)) jobs
+        let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs
         in (SOME result, make_queue groups jobs') end)
   end;
 
+val dequeue = dequeue_if (K true);
+
+fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
+  let val ids = tasks
+    |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE)
+  in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
+
 
 (* termination *)
 
@@ -93,7 +110,8 @@
         (case get_job jobs task of
           Job (true, job) => map_job task (K (Job (false, job)))
         | _ => I)) tasks jobs;
-  in (running, make_queue groups jobs') end;
+    val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running;
+  in (null running, make_queue groups jobs') end;
 
 fun finish (task as Task id) (Queue {groups, jobs}) =
   let