# HG changeset patch # User wenzelm # Date 1296852756 -3600 # Node ID 2e427f340ad1a759c71f54f4c52255ba1de7f2b8 # Parent d2e6b1132df2d64dde5382f3e97c80ab3c1330f7 more scalable collections of tasks, notably for totality of known group members; tuned; diff -r d2e6b1132df2 -r 2e427f340ad1 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Fri Feb 04 20:40:25 2011 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Fri Feb 04 21:52:36 2011 +0100 @@ -64,8 +64,8 @@ fun group_id (Group {id, ...}) = id; fun eq_group (group1, group2) = group_id group1 = group_id group2; -fun group_ancestry (Group {parent, id, ...}) = - id :: (case parent of NONE => [] | SOME group => group_ancestry group); +fun group_ancestry f (Group {parent = NONE, id, ...}) a = f id a + | group_ancestry f (Group {parent = SOME group, id, ...}) a = group_ancestry f group (f id a); (* group status *) @@ -131,10 +131,9 @@ fun task_ord (Task {id = id1, pri = pri1, ...}, Task {id = id2, pri = pri2, ...}) = prod_ord (rev_order o option_ord int_ord) int_ord ((pri1, id1), (pri2, id2)); -val eq_task = is_equal o task_ord; - end; +structure Tasks = Table(type key = task val ord = task_ord); structure Task_Graph = Graph(type key = task val ord = task_ord); @@ -156,7 +155,24 @@ (** queue of jobs and groups **) -(* jobs *) +(* known group members *) + +type groups = unit Tasks.table Inttab.table; + +fun get_tasks (groups: groups) gid = + the_default Tasks.empty (Inttab.lookup groups gid); + +fun add_task (gid, task) groups = + Inttab.update (gid, Tasks.update (task, ()) (get_tasks groups gid)) groups; + +fun del_task (gid, task) groups = + let val tasks = Tasks.delete_safe task (get_tasks groups gid) in + if Tasks.is_empty tasks then Inttab.delete_safe gid groups + else Inttab.update (gid, tasks) groups + end; + + +(* job dependency graph *) datatype job = Job of (bool -> bool) list | @@ -171,18 +187,12 @@ fun add_job task dep (jobs: jobs) = Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; -fun get_deps (jobs: jobs) task = - Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => []; - (* queue *) -datatype queue = Queue of - {groups: task list Inttab.table, (*presently known group members*) - jobs: jobs}; (*job dependency graph*) +datatype queue = Queue of {groups: groups, jobs: jobs}; fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; - val empty = make_queue Inttab.empty Task_Graph.empty; fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task; @@ -226,9 +236,9 @@ fun cancel (Queue {groups, jobs}) group = let val _ = cancel_group group Exn.Interrupt; - val tasks = Inttab.lookup_list groups (group_id group); val running = - fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; + Tasks.fold (#1 #> get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) + (get_tasks groups (group_id group)) []; val _ = List.app Simple_Thread.interrupt running; in null running end; @@ -253,8 +263,7 @@ fun finish task (Queue {groups, jobs}) = let val group = group_of_task task; - val groups' = groups - |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group); + val groups' = group_ancestry (fn gid => del_task (gid, task)) group groups; val jobs' = Task_Graph.del_node task jobs; val maximal = null (Task_Graph.imm_succs jobs task); in (maximal, make_queue groups' jobs') end; @@ -265,16 +274,14 @@ fun enqueue_passive group abort (Queue {groups, jobs}) = let val task = new_task group "passive" NONE; - val groups' = groups - |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); + val groups' = group_ancestry (fn gid => add_task (gid, task)) group groups; val jobs' = jobs |> Task_Graph.new_node (task, Passive abort); in (task, make_queue groups' jobs') end; fun enqueue name group deps pri job (Queue {groups, jobs}) = let val task = new_task group name (SOME pri); - val groups' = groups - |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); + val groups' = group_ancestry (fn gid => add_task (gid, task)) group groups; val jobs' = jobs |> Task_Graph.new_node (task, Job [job]) |> fold (add_job task) deps; @@ -318,11 +325,11 @@ fun ready_dep _ [] = NONE | ready_dep seen (task :: tasks) = - if member eq_task seen task then ready_dep seen tasks + if Tasks.defined seen task then ready_dep seen tasks else let val entry as (_, (ds, _)) = Task_Graph.get_entry jobs task in (case ready_job task entry of - NONE => ready_dep (task :: seen) (ds @ tasks) + NONE => ready_dep (Tasks.update (task, ()) seen) (ds @ tasks) | some => some) end; @@ -333,7 +340,7 @@ (case ready deps [] of (SOME res, deps') => result res deps' | (NONE, deps') => - (case ready_dep [] deps' of + (case ready_dep Tasks.empty deps' of SOME res => result res deps' | NONE => ((NONE, deps'), queue))) end;