# HG changeset patch # User wenzelm # Date 1229441119 -3600 # Node ID ad73b63ae2c5788545dd35b2f1ffe0c72336cf4d # Parent 8a904ff43f2894a8e0ff58d90affdd034ba4365b renamed structure TaskQueue to Task_Queue; tasks are ordered according to priority, which has been generalized from bool to int; removed unused focus; tuned dequeue: single pass due to proper priority order; tuned dequeue_towards; diff -r 8a904ff43f28 -r ad73b63ae2c5 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Dec 16 16:25:19 2008 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Tue Dec 16 16:25:19 2008 +0100 @@ -1,5 +1,4 @@ (* Title: Pure/Concurrent/task_queue.ML - ID: $Id$ Author: Makarius Ordered queue of grouped tasks. @@ -8,7 +7,8 @@ signature TASK_QUEUE = sig eqtype task - val new_task: unit -> task + val new_task: int -> task + val pri_of_task: task -> int val str_of_task: task -> string eqtype group val new_group: unit -> group @@ -17,9 +17,8 @@ type queue val empty: queue val is_empty: queue -> bool - val enqueue: group -> task list -> bool -> (bool -> bool) -> queue -> task * queue + val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue val depend: task list -> task -> queue -> queue - val focus: task list -> queue -> queue val dequeue: queue -> (task * group * (unit -> bool)) option * queue val dequeue_towards: task list -> queue -> (((task * group * (unit -> bool)) * task list) option * queue) @@ -29,20 +28,27 @@ val cancel: queue -> group -> bool end; -structure TaskQueue: TASK_QUEUE = +structure Task_Queue: TASK_QUEUE = struct -(* identifiers *) +(* tasks *) + +datatype task = Task of int * serial; +fun new_task pri = Task (pri, serial ()); -datatype task = Task of serial; -fun new_task () = Task (serial ()); +fun pri_of_task (Task (pri, _)) = pri; +fun str_of_task (Task (_, i)) = string_of_int i; -fun str_of_task (Task i) = string_of_int i; +fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2); +structure Task_Graph = GraphFun(type key = task val ord = task_ord); +(* groups *) + datatype group = Group of serial * bool ref; fun new_group () = Group (serial (), ref true); + fun invalidate_group (Group (_, ok)) = ok := false; fun str_of_group (Group (i, ref ok)) = @@ -52,53 +58,46 @@ (* jobs *) datatype job = - Job of bool * (bool -> bool) | (*priority, job: status -> status*) + Job of bool -> bool | Running of Thread.thread; -type jobs = (group * job) IntGraph.T; +type jobs = (group * job) Task_Graph.T; -fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); -fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); -fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; +fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task); +fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task); +fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs; -fun add_job (Task id) (Task dep) (jobs: jobs) = - IntGraph.add_edge (dep, id) jobs handle IntGraph.UNDEF _ => jobs; +fun add_job task dep (jobs: jobs) = + Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; -fun add_job_acyclic (Task id) (Task dep) (jobs: jobs) = - IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs; - -fun check_job (jobs: jobs) (task as Task id) = - if can (IntGraph.get_node jobs) id then SOME task else NONE; +fun add_job_acyclic task dep (jobs: jobs) = + Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; (* queue of grouped jobs *) datatype queue = Queue of {groups: task list Inttab.table, (*groups with presently active members*) - jobs: jobs, (*job dependency graph*) - focus: task list}; (*particular collection of high-priority tasks*) + jobs: jobs}; (*job dependency graph*) -fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus}; +fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; -val empty = make_queue Inttab.empty IntGraph.empty []; -fun is_empty (Queue {jobs, ...}) = IntGraph.is_empty jobs; +val empty = make_queue Inttab.empty Task_Graph.empty; +fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; (* enqueue *) -fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, focus}) = +fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) = let - val task as Task id = new_task (); + val task = new_task pri; val groups' = Inttab.cons_list (gid, task) groups; val jobs' = jobs - |> IntGraph.new_node (id, (group, Job (pri, job))) |> fold (add_job task) deps; - in (task, make_queue groups' jobs' focus) end; + |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps; + in (task, make_queue groups' jobs') end; -fun depend deps task (Queue {groups, jobs, focus}) = - make_queue groups (fold (add_job_acyclic task) deps jobs) focus; - -fun focus tasks (Queue {groups, jobs, ...}) = - make_queue groups jobs (map_filter (check_job jobs) tasks); +fun depend deps task (Queue {groups, jobs}) = + make_queue groups (fold (add_job_acyclic task) deps jobs); (* dequeue *) @@ -106,38 +105,30 @@ local fun dequeue_result NONE queue = (NONE, queue) - | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) = - (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus); - -fun dequeue_global req_pri (queue as Queue {jobs, ...}) = - let - fun ready (id, ((group as Group (_, ref ok), Job (pri, job)), ([], _))) = - if pri = req_pri then SOME (Task id, group, (fn () => job ok)) else NONE - | ready _ = NONE; - in dequeue_result (IntGraph.get_first ready jobs) queue end; - -fun dequeue_local focus (queue as Queue {jobs, ...}) = - let - fun ready id = - (case IntGraph.get_node jobs id of - (group as Group (_, ref ok), Job (_, job)) => - if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok)) - else NONE - | _ => NONE); - val ids = map (fn Task id => id) focus; - in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end; + | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) = + (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs)); in -fun dequeue (queue as Queue {focus, ...}) = - (case dequeue_local focus queue of - (NONE, _) => - (case dequeue_global true queue of (NONE, _) => dequeue_global false queue | res => res) - | res => res); +fun dequeue (queue as Queue {jobs, ...}) = + let + fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) = + SOME (task, group, (fn () => job ok)) + | ready _ = NONE; + in dequeue_result (Task_Graph.get_first ready jobs) queue end; fun dequeue_towards tasks (queue as Queue {jobs, ...}) = - let val tasks' = map_filter (check_job jobs) tasks in - (case dequeue_local tasks' queue of + let + val tasks' = filter (can (Task_Graph.get_node jobs)) tasks; + + fun ready task = + (case Task_Graph.get_node jobs task of + (group as Group (_, ref ok), Job job) => + if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok)) + else NONE + | _ => NONE); + in + (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of (NONE, queue') => (NONE, queue') | (SOME work, queue') => (SOME (work, tasks'), queue')) end; @@ -150,8 +141,13 @@ fun interrupt (Queue {jobs, ...}) task = (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ()); -fun interrupt_external queue str = - (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ()); +fun interrupt_external (queue as Queue {jobs, ...}) str = + (case Int.fromString str of + SOME i => + (case Task_Graph.get_first + (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs + of SOME task => interrupt queue task | NONE => ()) + | NONE => ()); (* misc operations *) @@ -164,12 +160,11 @@ val _ = List.app SimpleThread.interrupt running; in null running end; -fun finish (task as Task id) (Queue {groups, jobs, focus}) = +fun finish task (Queue {groups, jobs}) = let val Group (gid, _) = get_group jobs task; val groups' = Inttab.remove_list (op =) (gid, task) groups; - val jobs' = IntGraph.del_node id jobs; - val focus' = remove (op =) task focus; - in make_queue groups' jobs' focus' end; + val jobs' = Task_Graph.del_node task jobs; + in make_queue groups' jobs' end; end;