src/Pure/Concurrent/task_queue.ML
changeset 41709 2e427f340ad1
parent 41708 d2e6b1132df2
child 43792 d5803c3d537a
--- a/src/Pure/Concurrent/task_queue.ML	Fri Feb 04 20:40:25 2011 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Fri Feb 04 21:52:36 2011 +0100
@@ -64,8 +64,8 @@
 fun group_id (Group {id, ...}) = id;
 fun eq_group (group1, group2) = group_id group1 = group_id group2;
 
-fun group_ancestry (Group {parent, id, ...}) =
-  id :: (case parent of NONE => [] | SOME group => group_ancestry group);
+fun group_ancestry f (Group {parent = NONE, id, ...}) a = f id a
+  | group_ancestry f (Group {parent = SOME group, id, ...}) a = group_ancestry f group (f id a);
 
 
 (* group status *)
@@ -131,10 +131,9 @@
 fun task_ord (Task {id = id1, pri = pri1, ...}, Task {id = id2, pri = pri2, ...}) =
   prod_ord (rev_order o option_ord int_ord) int_ord ((pri1, id1), (pri2, id2));
 
-val eq_task = is_equal o task_ord;
-
 end;
 
+structure Tasks = Table(type key = task val ord = task_ord);
 structure Task_Graph = Graph(type key = task val ord = task_ord);
 
 
@@ -156,7 +155,24 @@
 
 (** queue of jobs and groups **)
 
-(* jobs *)
+(* known group members *)
+
+type groups = unit Tasks.table Inttab.table;
+
+fun get_tasks (groups: groups) gid =
+  the_default Tasks.empty (Inttab.lookup groups gid);
+
+fun add_task (gid, task) groups =
+  Inttab.update (gid, Tasks.update (task, ()) (get_tasks groups gid)) groups;
+
+fun del_task (gid, task) groups =
+  let val tasks = Tasks.delete_safe task (get_tasks groups gid) in
+    if Tasks.is_empty tasks then Inttab.delete_safe gid groups
+    else Inttab.update (gid, tasks) groups
+  end;
+
+
+(* job dependency graph *)
 
 datatype job =
   Job of (bool -> bool) list |
@@ -171,18 +187,12 @@
 fun add_job task dep (jobs: jobs) =
   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
 
-fun get_deps (jobs: jobs) task =
-  Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
-
 
 (* queue *)
 
-datatype queue = Queue of
- {groups: task list Inttab.table,   (*presently known group members*)
-  jobs: jobs};                      (*job dependency graph*)
+datatype queue = Queue of {groups: groups, jobs: jobs};
 
 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
-
 val empty = make_queue Inttab.empty Task_Graph.empty;
 
 fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task;
@@ -226,9 +236,9 @@
 fun cancel (Queue {groups, jobs}) group =
   let
     val _ = cancel_group group Exn.Interrupt;
-    val tasks = Inttab.lookup_list groups (group_id group);
     val running =
-      fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
+      Tasks.fold (#1 #> get_job jobs #> (fn Running t => insert Thread.equal t | _ => I))
+        (get_tasks groups (group_id group)) [];
     val _ = List.app Simple_Thread.interrupt running;
   in null running end;
 
@@ -253,8 +263,7 @@
 fun finish task (Queue {groups, jobs}) =
   let
     val group = group_of_task task;
-    val groups' = groups
-      |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
+    val groups' = group_ancestry (fn gid => del_task (gid, task)) group groups;
     val jobs' = Task_Graph.del_node task jobs;
     val maximal = null (Task_Graph.imm_succs jobs task);
   in (maximal, make_queue groups' jobs') end;
@@ -265,16 +274,14 @@
 fun enqueue_passive group abort (Queue {groups, jobs}) =
   let
     val task = new_task group "passive" NONE;
-    val groups' = groups
-      |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
+    val groups' = group_ancestry (fn gid => add_task (gid, task)) group groups;
     val jobs' = jobs |> Task_Graph.new_node (task, Passive abort);
   in (task, make_queue groups' jobs') end;
 
 fun enqueue name group deps pri job (Queue {groups, jobs}) =
   let
     val task = new_task group name (SOME pri);
-    val groups' = groups
-      |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
+    val groups' = group_ancestry (fn gid => add_task (gid, task)) group groups;
     val jobs' = jobs
       |> Task_Graph.new_node (task, Job [job])
       |> fold (add_job task) deps;
@@ -318,11 +325,11 @@
 
     fun ready_dep _ [] = NONE
       | ready_dep seen (task :: tasks) =
-          if member eq_task seen task then ready_dep seen tasks
+          if Tasks.defined seen task then ready_dep seen tasks
           else
             let val entry as (_, (ds, _)) = Task_Graph.get_entry jobs task in
               (case ready_job task entry of
-                NONE => ready_dep (task :: seen) (ds @ tasks)
+                NONE => ready_dep (Tasks.update (task, ()) seen) (ds @ tasks)
               | some => some)
             end;
 
@@ -333,7 +340,7 @@
     (case ready deps [] of
       (SOME res, deps') => result res deps'
     | (NONE, deps') =>
-        (case ready_dep [] deps' of
+        (case ready_dep Tasks.empty deps' of
           SOME res => result res deps'
         | NONE => ((NONE, deps'), queue)))
   end;