simplified dequeue: provide Thread.self internally;
simplified cancel: interrupt running threads internally;
added depend;
added dequeue_towards;
misc tuning;
--- a/src/Pure/Concurrent/task_queue.ML Tue Sep 09 20:22:40 2008 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Tue Sep 09 23:30:00 2008 +0200
@@ -15,8 +15,10 @@
type queue
val empty: queue
val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
- val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue
- val cancel: group -> queue -> Thread.thread list * queue
+ val depend: task list -> task -> queue -> queue
+ val dequeue: queue -> (task * group * (unit -> bool)) option * queue
+ val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
+ val cancel: group -> queue -> bool * queue
val finish: task -> queue -> queue
end;
@@ -42,9 +44,12 @@
type jobs = (group * job) IntGraph.T;
+fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id;
fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
+fun add_job (Task id) (Task dep) (jobs: jobs) =
+ IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
(* queue of grouped jobs *)
@@ -57,31 +62,43 @@
val empty = make_queue Inttab.empty IntGraph.empty;
-(* queue operations *)
+(* enqueue *)
fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) =
let
val id = serial ();
val task = Task id;
val groups' = Inttab.cons_list (gid, task) groups;
-
- fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G
- handle IntGraph.UNDEF _ => G;
- val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps;
+ val jobs' = jobs
+ |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps;
in (task, make_queue groups' jobs') end;
-fun dequeue thread (queue as Queue {groups, jobs}) =
+fun depend deps task (Queue {groups, jobs}) =
+ make_queue groups (fold (add_job task) deps jobs);
+
+
+(* dequeue *)
+
+fun dequeue_if P (queue as Queue {groups, jobs}) =
let
- fun ready (id, ((group, Job (ok, job)), ([], _))) = SOME (Task id, group, (fn () => job ok))
+ fun ready (id, ((group, Job (ok, job)), ([], _))) =
+ if P id then SOME (Task id, group, (fn () => job ok)) else NONE
| ready _ = NONE;
in
(case IntGraph.get_first ready jobs of
NONE => (NONE, queue)
| SOME result =>
- let val jobs' = map_job (#1 result) (K (Running thread)) jobs
+ let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs
in (SOME result, make_queue groups jobs') end)
end;
+val dequeue = dequeue_if (K true);
+
+fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
+ let val ids = tasks
+ |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE)
+ in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
+
(* termination *)
@@ -93,7 +110,8 @@
(case get_job jobs task of
Job (true, job) => map_job task (K (Job (false, job)))
| _ => I)) tasks jobs;
- in (running, make_queue groups jobs') end;
+ val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running;
+ in (null running, make_queue groups jobs') end;
fun finish (task as Task id) (Queue {groups, jobs}) =
let