# HG changeset patch # User wenzelm # Date 1220995800 -7200 # Node ID 0f20cbce493592436860cb75028275896e0cc0a3 # Parent 5ed5cb73a2e953d6fe66e7960445b16fba9109f2 simplified dequeue: provide Thread.self internally; simplified cancel: interrupt running threads internally; added depend; added dequeue_towards; misc tuning; diff -r 5ed5cb73a2e9 -r 0f20cbce4935 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Sep 09 20:22:40 2008 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Tue Sep 09 23:30:00 2008 +0200 @@ -15,8 +15,10 @@ type queue val empty: queue val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue - val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue - val cancel: group -> queue -> Thread.thread list * queue + val depend: task list -> task -> queue -> queue + val dequeue: queue -> (task * group * (unit -> bool)) option * queue + val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue + val cancel: group -> queue -> bool * queue val finish: task -> queue -> queue end; @@ -42,9 +44,12 @@ type jobs = (group * job) IntGraph.T; +fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id; fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; +fun add_job (Task id) (Task dep) (jobs: jobs) = + IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs; (* queue of grouped jobs *) @@ -57,31 +62,43 @@ val empty = make_queue Inttab.empty IntGraph.empty; -(* queue operations *) +(* enqueue *) fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) = let val id = serial (); val task = Task id; val groups' = Inttab.cons_list (gid, task) groups; - - fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G - handle IntGraph.UNDEF _ => G; - val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps; + val jobs' = jobs + |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps; in (task, make_queue groups' jobs') end; -fun dequeue thread (queue as Queue {groups, jobs}) = +fun depend deps task (Queue {groups, jobs}) = + make_queue groups (fold (add_job task) deps jobs); + + +(* dequeue *) + +fun dequeue_if P (queue as Queue {groups, jobs}) = let - fun ready (id, ((group, Job (ok, job)), ([], _))) = SOME (Task id, group, (fn () => job ok)) + fun ready (id, ((group, Job (ok, job)), ([], _))) = + if P id then SOME (Task id, group, (fn () => job ok)) else NONE | ready _ = NONE; in (case IntGraph.get_first ready jobs of NONE => (NONE, queue) | SOME result => - let val jobs' = map_job (#1 result) (K (Running thread)) jobs + let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs in (SOME result, make_queue groups jobs') end) end; +val dequeue = dequeue_if (K true); + +fun dequeue_towards tasks (queue as Queue {jobs, ...}) = + let val ids = tasks + |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE) + in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end; + (* termination *) @@ -93,7 +110,8 @@ (case get_job jobs task of Job (true, job) => map_job task (K (Job (false, job))) | _ => I)) tasks jobs; - in (running, make_queue groups jobs') end; + val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running; + in (null running, make_queue groups jobs') end; fun finish (task as Task id) (Queue {groups, jobs}) = let