11 val pri_of_task: task -> int |
11 val pri_of_task: task -> int |
12 val str_of_task: task -> string |
12 val str_of_task: task -> string |
13 type group |
13 type group |
14 val group_id: group -> int |
14 val group_id: group -> int |
15 val eq_group: group * group -> bool |
15 val eq_group: group * group -> bool |
16 val new_group: unit -> group |
16 val new_group: group option -> group |
17 val group_exns: group -> exn list |
17 val group_status: group -> exn list |
18 val str_of_group: group -> string |
18 val str_of_group: group -> string |
19 type queue |
19 type queue |
20 val empty: queue |
20 val empty: queue |
21 val is_empty: queue -> bool |
21 val is_empty: queue -> bool |
22 val status: queue -> {ready: int, pending: int, running: int} |
22 val status: queue -> {ready: int, pending: int, running: int} |
25 val dequeue: queue -> (task * group * (bool -> bool) list) option * queue |
25 val dequeue: queue -> (task * group * (bool -> bool) list) option * queue |
26 val dequeue_towards: task list -> queue -> |
26 val dequeue_towards: task list -> queue -> |
27 (((task * group * (bool -> bool) list) * task list) option * queue) |
27 (((task * group * (bool -> bool) list) * task list) option * queue) |
28 val interrupt: queue -> task -> unit |
28 val interrupt: queue -> task -> unit |
29 val interrupt_external: queue -> string -> unit |
29 val interrupt_external: queue -> string -> unit |
|
30 val is_canceled: group -> bool |
30 val cancel_group: group -> exn -> unit |
31 val cancel_group: group -> exn -> unit |
31 val cancel: queue -> group -> bool |
32 val cancel: queue -> group -> bool |
32 val cancel_all: queue -> group list |
33 val cancel_all: queue -> group list |
33 val finish: task -> queue -> queue |
34 val finish: task -> queue -> queue |
34 end; |
35 end; |
46 |
47 |
47 fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2); |
48 fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2); |
48 structure Task_Graph = Graph(type key = task val ord = task_ord); |
49 structure Task_Graph = Graph(type key = task val ord = task_ord); |
49 |
50 |
50 |
51 |
51 (* groups *) |
52 (* nested groups *) |
52 |
53 |
53 datatype group = Group of serial * exn list ref; |
54 datatype group = Group of |
54 |
55 {parent: group option, |
55 fun group_id (Group (gid, _)) = gid; |
56 id: serial, |
56 fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2; |
57 status: exn list ref}; |
57 |
58 |
58 fun new_group () = Group (serial (), ref []); |
59 fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status}; |
59 |
60 |
60 fun group_exns (Group (_, ref exns)) = exns; |
61 fun new_group parent = make_group (parent, serial (), ref []); |
61 |
62 |
62 fun str_of_group (Group (i, ref exns)) = |
63 fun group_id (Group {id, ...}) = id; |
63 if null exns then string_of_int i else enclose "(" ")" (string_of_int i); |
64 fun eq_group (group1, group2) = group_id group1 = group_id group2; |
|
65 |
|
66 fun group_ancestry (Group {parent, id, ...}) = |
|
67 id :: (case parent of NONE => [] | SOME group => group_ancestry group); |
|
68 |
|
69 |
|
70 fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () => |
|
71 (case exn of |
|
72 Exn.Interrupt => if null (! status) then status := [exn] else () |
|
73 | _ => change status (cons exn))); |
|
74 |
|
75 fun group_status (Group {parent, status, ...}) = (*non-critical*) |
|
76 ! status @ (case parent of NONE => [] | SOME group => group_status group); |
|
77 |
|
78 fun is_canceled (Group {parent, status, ...}) = (*non-critical*) |
|
79 not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group); |
|
80 |
|
81 fun str_of_group group = |
|
82 (is_canceled group ? enclose "(" ")") (string_of_int (group_id group)); |
64 |
83 |
65 |
84 |
66 (* jobs *) |
85 (* jobs *) |
67 |
86 |
68 datatype job = |
87 datatype job = |
92 |
111 |
93 val empty = make_queue Inttab.empty Task_Graph.empty No_Result; |
112 val empty = make_queue Inttab.empty Task_Graph.empty No_Result; |
94 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; |
113 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; |
95 |
114 |
96 |
115 |
97 (* status *) |
116 (* queue status *) |
98 |
117 |
99 fun status (Queue {jobs, ...}) = |
118 fun status (Queue {jobs, ...}) = |
100 let |
119 let |
101 val (x, y, z) = |
120 val (x, y, z) = |
102 Task_Graph.fold (fn (task, ((_, job), (deps, _))) => fn (x, y, z) => |
121 Task_Graph.fold (fn (task, ((_, job), (deps, _))) => fn (x, y, z) => |
105 | Running _ => (x, y, z + 1))) |
124 | Running _ => (x, y, z + 1))) |
106 jobs (0, 0, 0); |
125 jobs (0, 0, 0); |
107 in {ready = x, pending = y, running = z} end; |
126 in {ready = x, pending = y, running = z} end; |
108 |
127 |
109 |
128 |
|
129 (* cancel -- peers and sub-groups *) |
|
130 |
|
131 fun cancel (Queue {groups, jobs, ...}) group = |
|
132 let |
|
133 val _ = cancel_group group Exn.Interrupt; |
|
134 val tasks = Inttab.lookup_list groups (group_id group); |
|
135 val running = |
|
136 fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; |
|
137 val _ = List.app SimpleThread.interrupt running; |
|
138 in null running end; |
|
139 |
|
140 fun cancel_all (Queue {jobs, ...}) = |
|
141 let |
|
142 fun cancel_job (group, job) (groups, running) = |
|
143 (cancel_group group Exn.Interrupt; |
|
144 (case job of Running t => (insert eq_group group groups, insert Thread.equal t running) |
|
145 | _ => (groups, running))); |
|
146 val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); |
|
147 val _ = List.app SimpleThread.interrupt running; |
|
148 in groups end; |
|
149 |
|
150 |
110 (* enqueue *) |
151 (* enqueue *) |
111 |
152 |
112 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, cache}) = |
153 fun enqueue group deps pri job (Queue {groups, jobs, cache}) = |
113 let |
154 let |
114 val task = new_task pri; |
155 val task = new_task pri; |
115 val groups' = Inttab.cons_list (gid, task) groups; |
156 val groups' = groups |
|
157 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); |
116 val jobs' = jobs |
158 val jobs' = jobs |
117 |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps; |
159 |> Task_Graph.new_node (task, (group, Job [job])) |
|
160 |> fold (add_job task) deps; |
118 val cache' = |
161 val cache' = |
119 (case cache of |
162 (case cache of |
120 Result last => |
163 Result last => |
121 if task_ord (last, task) = LESS |
164 if task_ord (last, task) = LESS |
122 then cache else Unknown |
165 then cache else Unknown |
156 fun dequeue_towards deps (queue as Queue {groups, jobs, ...}) = |
199 fun dequeue_towards deps (queue as Queue {groups, jobs, ...}) = |
157 let |
200 let |
158 fun ready task = |
201 fun ready task = |
159 (case Task_Graph.get_node jobs task of |
202 (case Task_Graph.get_node jobs task of |
160 (group, Job list) => |
203 (group, Job list) => |
161 if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list) |
204 if null (Task_Graph.imm_preds jobs task) |
|
205 then SOME (task, group, rev list) |
162 else NONE |
206 else NONE |
163 | _ => NONE); |
207 | _ => NONE); |
164 |
208 |
165 val tasks = filter (can (Task_Graph.get_node jobs)) deps; |
209 val tasks = filter (can (Task_Graph.get_node jobs)) deps; |
166 fun result (res as (task, _, _)) = |
210 fun result (res as (task, _, _)) = |
179 |
223 |
180 |
224 |
181 (* sporadic interrupts *) |
225 (* sporadic interrupts *) |
182 |
226 |
183 fun interrupt (Queue {jobs, ...}) task = |
227 fun interrupt (Queue {jobs, ...}) task = |
184 (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ()); |
228 (case try (get_job jobs) task of |
|
229 SOME (Running thread) => SimpleThread.interrupt thread |
|
230 | _ => ()); |
185 |
231 |
186 fun interrupt_external (queue as Queue {jobs, ...}) str = |
232 fun interrupt_external (queue as Queue {jobs, ...}) str = |
187 (case Int.fromString str of |
233 (case Int.fromString str of |
188 SOME i => |
234 SOME i => |
189 (case Task_Graph.get_first NONE |
235 (case Task_Graph.get_first NONE |
192 | NONE => ()); |
238 | NONE => ()); |
193 |
239 |
194 |
240 |
195 (* termination *) |
241 (* termination *) |
196 |
242 |
197 fun cancel_group (Group (_, r)) exn = CRITICAL (fn () => |
|
198 (case exn of |
|
199 Exn.Interrupt => if null (! r) then r := [exn] else () |
|
200 | _ => change r (cons exn))); |
|
201 |
|
202 fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) = |
|
203 let |
|
204 val _ = cancel_group group Exn.Interrupt; |
|
205 val tasks = Inttab.lookup_list groups gid; |
|
206 val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; |
|
207 val _ = List.app SimpleThread.interrupt running; |
|
208 in null running end; |
|
209 |
|
210 fun cancel_all (Queue {jobs, ...}) = |
|
211 let |
|
212 fun cancel_job (group, job) (groups, running) = |
|
213 (cancel_group group Exn.Interrupt; |
|
214 (case job of Running t => (insert eq_group group groups, insert Thread.equal t running) |
|
215 | _ => (groups, running))); |
|
216 val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); |
|
217 val _ = List.app SimpleThread.interrupt running; |
|
218 in groups end; |
|
219 |
|
220 fun finish task (Queue {groups, jobs, cache}) = |
243 fun finish task (Queue {groups, jobs, cache}) = |
221 let |
244 let |
222 val Group (gid, _) = get_group jobs task; |
245 val group = get_group jobs task; |
223 val groups' = Inttab.remove_list (op =) (gid, task) groups; |
246 val groups' = groups |
|
247 |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group); |
224 val jobs' = Task_Graph.del_node task jobs; |
248 val jobs' = Task_Graph.del_node task jobs; |
225 val cache' = |
249 val cache' = |
226 if null (Task_Graph.imm_succs jobs task) then cache |
250 if null (Task_Graph.imm_succs jobs task) then cache |
227 else Unknown; |
251 else Unknown; |
228 in make_queue groups' jobs' cache' end; |
252 in make_queue groups' jobs' cache' end; |