(* Title: Pure/Concurrent/task_queue.ML 
2 
Author: Makarius 

3 

4 
Ordered queue of grouped tasks. 

5 
*) 

6 

7 
signature TASK_QUEUE = 

8 
sig 

9 
eqtype task 

29121  10 
val new_task: int > task 
11 
val pri_of_task: task > int 

28196  12 
val str_of_task: task > string 
28165  13 
eqtype group 
14 
val new_group: unit > group 

28551  15 
val invalidate_group: group > unit 
28179  16 
val str_of_group: group > string 
28165  17 
type queue 
18 
val empty: queue 

28204  19 
val is_empty: queue > bool 
29121  20 
val enqueue: group > task list > int > (bool > bool) > queue > task * queue 
21 
val depend: task list > task > queue > queue 
22 
val dequeue: queue > (task * group * (unit > bool)) option * queue 
23 
val dequeue_towards: task list > queue > 
24 
(((task * group * (unit > bool)) * task list) option * queue) 
25 
val interrupt: queue > task > unit 
26 
val interrupt_external: queue > string > unit 
27 
val finish: task > queue > queue 
28 
val cancel: queue > group > bool 
28165  29 
end; 
30 

29121  31 
structure Task_Queue: TASK_QUEUE = 
28165  32 
struct 
33 

29121  34 
(* tasks *) 
35 

36 
datatype task = Task of int * serial; 

37 
fun new_task pri = Task (pri, serial ()); 

28165  38 

29121  39 
fun pri_of_task (Task (pri, _)) = pri; 
40 
fun str_of_task (Task (_, i)) = string_of_int i; 

28998  41 

29121  42 
fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2); 
43 
structure Task_Graph = GraphFun(type key = task val ord = task_ord); 

28165  44 

28998  45 

29121  46 
(* groups *) 
47 

48 
datatype group = Group of serial * bool ref; 
28551  49 

50 
fun new_group () = Group (serial (), ref true); 
29121  51 

28551  52 
fun invalidate_group (Group (_, ok)) = ok := false; 
28165  53 

54 
fun str_of_group (Group (i, ref ok)) = 
55 
if ok then string_of_int i else enclose "(" ")" (string_of_int i); 
28179  56 

28165  57 

58 
(* jobs *) 
28165  59 

60 
datatype job = 

29121  61 
Job of bool > bool  
28165  62 
Running of Thread.thread; 
63 

29121  64 
type jobs = (group * job) Task_Graph.T; 
65 

29121  66 
fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task); 
67 
fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task); 

68 
fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs; 

69 

29121  70 
fun add_job task dep (jobs: jobs) = 
71 
Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; 

72 

29121  73 
fun add_job_acyclic task dep (jobs: jobs) = 
74 
Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; 

75 

76 

77 
(* queue of grouped jobs *) 
78 

28165  79 
datatype queue = Queue of 
80 
{groups: task list Inttab.table, (*groups with presently active members*) 
29121  81 
jobs: jobs}; (*job dependency graph*) 
28165  82 

29121  83 
fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; 
28204  84 

29121  85 
val empty = make_queue Inttab.empty Task_Graph.empty; 
86 
fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; 

28165  87 

88 

89 
(* enqueue *) 
28165  90 

29121  91 
fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) = 
28165  92 
let 
29121  93 
val task = new_task pri; 
94 
val groups' = Inttab.cons_list (gid, task) groups; 
95 
val jobs' = jobs 
29121  96 
> Task_Graph.new_node (task, (group, Job job)) > fold (add_job task) deps; 
97 
in (task, make_queue groups' jobs') end; 

28165  98 

29121  99 
fun depend deps task (Queue {groups, jobs}) = 
100 
make_queue groups (fold (add_job_acyclic task) deps jobs); 

101 

102 

103 
(* dequeue *) 
104 

105 
local 
106 

107 
fun dequeue_result NONE queue = (NONE, queue) 
29121  108 
 dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) = 
109 
(SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs)); 

110 

111 
in 
112 

29121  113 
fun dequeue (queue as Queue {jobs, ...}) = 
114 
let 

115 
fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) = 

116 
SOME (task, group, (fn () => job ok)) 

117 
 ready _ = NONE; 

118 
in dequeue_result (Task_Graph.get_first ready jobs) queue end; 

119 

120 
fun dequeue_towards tasks (queue as Queue {jobs, ...}) = 
29121  121 
let 
122 
val tasks' = filter (can (Task_Graph.get_node jobs)) tasks; 

123 

124 
fun ready task = 

125 
(case Task_Graph.get_node jobs task of 

126 
(group as Group (_, ref ok), Job job) => 

127 
if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok)) 

128 
else NONE 

129 
 _ => NONE); 

130 
in 

131 
(case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of 

132 
(NONE, queue') => (NONE, queue') 
133 
 (SOME work, queue') => (SOME (work, tasks'), queue')) 
134 
end; 
135 

136 
end; 
137 

28165  138 

139 
(* sporadic interrupts *) 
140 

141 
fun interrupt (Queue {jobs, ...}) task = 
28551  142 
(case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread  _ => ()); 
143 

29121  144 
fun interrupt_external (queue as Queue {jobs, ...}) str = 
145 
(case Int.fromString str of 

146 
SOME i => 

147 
(case Task_Graph.get_first 

148 
(fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs 

149 
of SOME task => interrupt queue task  NONE => ()) 

150 
 NONE => ()); 

151 

152 

153 
(* misc operations *) 
28165  154 

28551  155 
fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) = 
156 
let 
28551  157 
val _ = invalidate_group group; 
158 
val tasks = Inttab.lookup_list groups gid; 
159 
val running = fold (get_job jobs #> (fn Running thread => cons thread  _ => I)) tasks []; 
28551  160 
val _ = List.app SimpleThread.interrupt running; 
161 
in null running end; 
28165  162 

29121  163 
fun finish task (Queue {groups, jobs}) = 
164 
let 
165 
val Group (gid, _) = get_group jobs task; 
166 
val groups' = Inttab.remove_list (op =) (gid, task) groups; 
29121  167 
val jobs' = Task_Graph.del_node task jobs; 
168 
in make_queue groups' jobs' end; 

28165  169 

170 
end; 