--- a/src/Pure/Concurrent/task_queue.ML Tue Dec 16 16:25:19 2008 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Tue Dec 16 16:25:19 2008 +0100
@@ -1,5 +1,4 @@
(* Title: Pure/Concurrent/task_queue.ML
- ID: $Id$
Author: Makarius
Ordered queue of grouped tasks.
@@ -8,7 +7,8 @@
signature TASK_QUEUE =
sig
eqtype task
- val new_task: unit -> task
+ val new_task: int -> task
+ val pri_of_task: task -> int
val str_of_task: task -> string
eqtype group
val new_group: unit -> group
@@ -17,9 +17,8 @@
type queue
val empty: queue
val is_empty: queue -> bool
- val enqueue: group -> task list -> bool -> (bool -> bool) -> queue -> task * queue
+ val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
val depend: task list -> task -> queue -> queue
- val focus: task list -> queue -> queue
val dequeue: queue -> (task * group * (unit -> bool)) option * queue
val dequeue_towards: task list -> queue ->
(((task * group * (unit -> bool)) * task list) option * queue)
@@ -29,20 +28,27 @@
val cancel: queue -> group -> bool
end;
-structure TaskQueue: TASK_QUEUE =
+structure Task_Queue: TASK_QUEUE =
struct
-(* identifiers *)
+(* tasks *)
+
+datatype task = Task of int * serial;
+fun new_task pri = Task (pri, serial ());
-datatype task = Task of serial;
-fun new_task () = Task (serial ());
+fun pri_of_task (Task (pri, _)) = pri;
+fun str_of_task (Task (_, i)) = string_of_int i;
-fun str_of_task (Task i) = string_of_int i;
+fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
+structure Task_Graph = GraphFun(type key = task val ord = task_ord);
+(* groups *)
+
datatype group = Group of serial * bool ref;
fun new_group () = Group (serial (), ref true);
+
fun invalidate_group (Group (_, ok)) = ok := false;
fun str_of_group (Group (i, ref ok)) =
@@ -52,53 +58,46 @@
(* jobs *)
datatype job =
- Job of bool * (bool -> bool) | (*priority, job: status -> status*)
+ Job of bool -> bool |
Running of Thread.thread;
-type jobs = (group * job) IntGraph.T;
+type jobs = (group * job) Task_Graph.T;
-fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
-fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
-fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
+fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
+fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
+fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
-fun add_job (Task id) (Task dep) (jobs: jobs) =
- IntGraph.add_edge (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
+fun add_job task dep (jobs: jobs) =
+ Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
-fun add_job_acyclic (Task id) (Task dep) (jobs: jobs) =
- IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
-
-fun check_job (jobs: jobs) (task as Task id) =
- if can (IntGraph.get_node jobs) id then SOME task else NONE;
+fun add_job_acyclic task dep (jobs: jobs) =
+ Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
(* queue of grouped jobs *)
datatype queue = Queue of
{groups: task list Inttab.table, (*groups with presently active members*)
- jobs: jobs, (*job dependency graph*)
- focus: task list}; (*particular collection of high-priority tasks*)
+ jobs: jobs}; (*job dependency graph*)
-fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus};
+fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
-val empty = make_queue Inttab.empty IntGraph.empty [];
-fun is_empty (Queue {jobs, ...}) = IntGraph.is_empty jobs;
+val empty = make_queue Inttab.empty Task_Graph.empty;
+fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
(* enqueue *)
-fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, focus}) =
+fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
let
- val task as Task id = new_task ();
+ val task = new_task pri;
val groups' = Inttab.cons_list (gid, task) groups;
val jobs' = jobs
- |> IntGraph.new_node (id, (group, Job (pri, job))) |> fold (add_job task) deps;
- in (task, make_queue groups' jobs' focus) end;
+ |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
+ in (task, make_queue groups' jobs') end;
-fun depend deps task (Queue {groups, jobs, focus}) =
- make_queue groups (fold (add_job_acyclic task) deps jobs) focus;
-
-fun focus tasks (Queue {groups, jobs, ...}) =
- make_queue groups jobs (map_filter (check_job jobs) tasks);
+fun depend deps task (Queue {groups, jobs}) =
+ make_queue groups (fold (add_job_acyclic task) deps jobs);
(* dequeue *)
@@ -106,38 +105,30 @@
local
fun dequeue_result NONE queue = (NONE, queue)
- | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) =
- (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus);
-
-fun dequeue_global req_pri (queue as Queue {jobs, ...}) =
- let
- fun ready (id, ((group as Group (_, ref ok), Job (pri, job)), ([], _))) =
- if pri = req_pri then SOME (Task id, group, (fn () => job ok)) else NONE
- | ready _ = NONE;
- in dequeue_result (IntGraph.get_first ready jobs) queue end;
-
-fun dequeue_local focus (queue as Queue {jobs, ...}) =
- let
- fun ready id =
- (case IntGraph.get_node jobs id of
- (group as Group (_, ref ok), Job (_, job)) =>
- if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok))
- else NONE
- | _ => NONE);
- val ids = map (fn Task id => id) focus;
- in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end;
+ | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
+ (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
in
-fun dequeue (queue as Queue {focus, ...}) =
- (case dequeue_local focus queue of
- (NONE, _) =>
- (case dequeue_global true queue of (NONE, _) => dequeue_global false queue | res => res)
- | res => res);
+fun dequeue (queue as Queue {jobs, ...}) =
+ let
+ fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
+ SOME (task, group, (fn () => job ok))
+ | ready _ = NONE;
+ in dequeue_result (Task_Graph.get_first ready jobs) queue end;
fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
- let val tasks' = map_filter (check_job jobs) tasks in
- (case dequeue_local tasks' queue of
+ let
+ val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
+
+ fun ready task =
+ (case Task_Graph.get_node jobs task of
+ (group as Group (_, ref ok), Job job) =>
+ if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
+ else NONE
+ | _ => NONE);
+ in
+ (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
(NONE, queue') => (NONE, queue')
| (SOME work, queue') => (SOME (work, tasks'), queue'))
end;
@@ -150,8 +141,13 @@
fun interrupt (Queue {jobs, ...}) task =
(case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
-fun interrupt_external queue str =
- (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
+fun interrupt_external (queue as Queue {jobs, ...}) str =
+ (case Int.fromString str of
+ SOME i =>
+ (case Task_Graph.get_first
+ (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
+ of SOME task => interrupt queue task | NONE => ())
+ | NONE => ());
(* misc operations *)
@@ -164,12 +160,11 @@
val _ = List.app SimpleThread.interrupt running;
in null running end;
-fun finish (task as Task id) (Queue {groups, jobs, focus}) =
+fun finish task (Queue {groups, jobs}) =
let
val Group (gid, _) = get_group jobs task;
val groups' = Inttab.remove_list (op =) (gid, task) groups;
- val jobs' = IntGraph.del_node id jobs;
- val focus' = remove (op =) task focus;
- in make_queue groups' jobs' focus' end;
+ val jobs' = Task_Graph.del_node task jobs;
+ in make_queue groups' jobs' end;
end;