--- a/src/Pure/Concurrent/task_queue.ML Fri Feb 04 20:40:25 2011 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Fri Feb 04 21:52:36 2011 +0100
@@ -64,8 +64,8 @@
fun group_id (Group {id, ...}) = id;
fun eq_group (group1, group2) = group_id group1 = group_id group2;
-fun group_ancestry (Group {parent, id, ...}) =
- id :: (case parent of NONE => [] | SOME group => group_ancestry group);
+fun group_ancestry f (Group {parent = NONE, id, ...}) a = f id a
+ | group_ancestry f (Group {parent = SOME group, id, ...}) a = group_ancestry f group (f id a);
(* group status *)
@@ -131,10 +131,9 @@
fun task_ord (Task {id = id1, pri = pri1, ...}, Task {id = id2, pri = pri2, ...}) =
prod_ord (rev_order o option_ord int_ord) int_ord ((pri1, id1), (pri2, id2));
-val eq_task = is_equal o task_ord;
-
end;
+structure Tasks = Table(type key = task val ord = task_ord);
structure Task_Graph = Graph(type key = task val ord = task_ord);
@@ -156,7 +155,24 @@
(** queue of jobs and groups **)
-(* jobs *)
+(* known group members *)
+
+type groups = unit Tasks.table Inttab.table;
+
+fun get_tasks (groups: groups) gid =
+ the_default Tasks.empty (Inttab.lookup groups gid);
+
+fun add_task (gid, task) groups =
+ Inttab.update (gid, Tasks.update (task, ()) (get_tasks groups gid)) groups;
+
+fun del_task (gid, task) groups =
+ let val tasks = Tasks.delete_safe task (get_tasks groups gid) in
+ if Tasks.is_empty tasks then Inttab.delete_safe gid groups
+ else Inttab.update (gid, tasks) groups
+ end;
+
+
+(* job dependency graph *)
datatype job =
Job of (bool -> bool) list |
@@ -171,18 +187,12 @@
fun add_job task dep (jobs: jobs) =
Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
-fun get_deps (jobs: jobs) task =
- Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
-
(* queue *)
-datatype queue = Queue of
- {groups: task list Inttab.table, (*presently known group members*)
- jobs: jobs}; (*job dependency graph*)
+datatype queue = Queue of {groups: groups, jobs: jobs};
fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
-
val empty = make_queue Inttab.empty Task_Graph.empty;
fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task;
@@ -226,9 +236,9 @@
fun cancel (Queue {groups, jobs}) group =
let
val _ = cancel_group group Exn.Interrupt;
- val tasks = Inttab.lookup_list groups (group_id group);
val running =
- fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
+ Tasks.fold (#1 #> get_job jobs #> (fn Running t => insert Thread.equal t | _ => I))
+ (get_tasks groups (group_id group)) [];
val _ = List.app Simple_Thread.interrupt running;
in null running end;
@@ -253,8 +263,7 @@
fun finish task (Queue {groups, jobs}) =
let
val group = group_of_task task;
- val groups' = groups
- |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
+ val groups' = group_ancestry (fn gid => del_task (gid, task)) group groups;
val jobs' = Task_Graph.del_node task jobs;
val maximal = null (Task_Graph.imm_succs jobs task);
in (maximal, make_queue groups' jobs') end;
@@ -265,16 +274,14 @@
fun enqueue_passive group abort (Queue {groups, jobs}) =
let
val task = new_task group "passive" NONE;
- val groups' = groups
- |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
+ val groups' = group_ancestry (fn gid => add_task (gid, task)) group groups;
val jobs' = jobs |> Task_Graph.new_node (task, Passive abort);
in (task, make_queue groups' jobs') end;
fun enqueue name group deps pri job (Queue {groups, jobs}) =
let
val task = new_task group name (SOME pri);
- val groups' = groups
- |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
+ val groups' = group_ancestry (fn gid => add_task (gid, task)) group groups;
val jobs' = jobs
|> Task_Graph.new_node (task, Job [job])
|> fold (add_job task) deps;
@@ -318,11 +325,11 @@
fun ready_dep _ [] = NONE
| ready_dep seen (task :: tasks) =
- if member eq_task seen task then ready_dep seen tasks
+ if Tasks.defined seen task then ready_dep seen tasks
else
let val entry as (_, (ds, _)) = Task_Graph.get_entry jobs task in
(case ready_job task entry of
- NONE => ready_dep (task :: seen) (ds @ tasks)
+ NONE => ready_dep (Tasks.update (task, ()) seen) (ds @ tasks)
| some => some)
end;
@@ -333,7 +340,7 @@
(case ready deps [] of
(SOME res, deps') => result res deps'
| (NONE, deps') =>
- (case ready_dep [] deps' of
+ (case ready_dep Tasks.empty deps' of
SOME res => result res deps'
| NONE => ((NONE, deps'), queue)))
end;