4 Ordered queue of grouped tasks. |
4 Ordered queue of grouped tasks. |
5 *) |
5 *) |
6 |
6 |
7 signature TASK_QUEUE = |
7 signature TASK_QUEUE = |
8 sig |
8 sig |
9 type task |
|
10 val dummy_task: task |
|
11 val name_of_task: task -> string |
|
12 val pri_of_task: task -> int |
|
13 val str_of_task: task -> string |
|
14 val timing_of_task: task -> Time.time * Time.time * string list |
|
15 type group |
9 type group |
16 val new_group: group option -> group |
10 val new_group: group option -> group |
17 val group_id: group -> int |
11 val group_id: group -> int |
18 val eq_group: group * group -> bool |
12 val eq_group: group * group -> bool |
19 val cancel_group: group -> exn -> unit |
13 val cancel_group: group -> exn -> unit |
20 val is_canceled: group -> bool |
14 val is_canceled: group -> bool |
21 val group_status: group -> exn list |
15 val group_status: group -> exn list |
22 val str_of_group: group -> string |
16 val str_of_group: group -> string |
|
17 type task |
|
18 val dummy_task: unit -> task |
|
19 val group_of_task: task -> group |
|
20 val name_of_task: task -> string |
|
21 val pri_of_task: task -> int |
|
22 val str_of_task: task -> string |
|
23 val timing_of_task: task -> Time.time * Time.time * string list |
23 type queue |
24 type queue |
24 val empty: queue |
25 val empty: queue |
25 val all_passive: queue -> bool |
26 val all_passive: queue -> bool |
26 val status: queue -> {ready: int, pending: int, running: int, passive: int} |
27 val status: queue -> {ready: int, pending: int, running: int, passive: int} |
27 val cancel: queue -> group -> bool |
28 val cancel: queue -> group -> bool |
30 val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue |
31 val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue |
31 val enqueue: string -> group -> task list -> int -> (bool -> bool) -> |
32 val enqueue: string -> group -> task list -> int -> (bool -> bool) -> |
32 queue -> (task * bool) * queue |
33 queue -> (task * bool) * queue |
33 val extend: task -> (bool -> bool) -> queue -> queue option |
34 val extend: task -> (bool -> bool) -> queue -> queue option |
34 val dequeue_passive: Thread.thread -> task -> queue -> bool * queue |
35 val dequeue_passive: Thread.thread -> task -> queue -> bool * queue |
35 val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue |
36 val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue |
36 type deps |
37 type deps |
37 val init_deps: task list -> deps |
38 val init_deps: task list -> deps |
38 val finished_deps: deps -> bool |
39 val finished_deps: deps -> bool |
39 val dequeue_deps: Thread.thread -> deps -> queue -> |
40 val dequeue_deps: Thread.thread -> deps -> queue -> |
40 (((task * group * (bool -> bool) list) option * deps) * queue) |
41 (((task * (bool -> bool) list) option * deps) * queue) |
41 val running: task -> (unit -> 'a) -> 'a |
42 val running: task -> (unit -> 'a) -> 'a |
42 val joining: task -> (unit -> 'a) -> 'a |
43 val joining: task -> (unit -> 'a) -> 'a |
43 val waiting: task -> deps -> (unit -> 'a) -> 'a |
44 val waiting: task -> deps -> (unit -> 'a) -> 'a |
44 end; |
45 end; |
45 |
46 |
47 struct |
48 struct |
48 |
49 |
49 val new_id = Synchronized.counter (); |
50 val new_id = Synchronized.counter (); |
50 |
51 |
51 |
52 |
52 (** grouped tasks **) |
53 (** nested groups of tasks **) |
|
54 |
|
55 (* groups *) |
|
56 |
|
57 abstype group = Group of |
|
58 {parent: group option, |
|
59 id: int, |
|
60 status: exn list Synchronized.var} |
|
61 with |
|
62 |
|
63 fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status}; |
|
64 |
|
65 fun new_group parent = make_group (parent, new_id (), Synchronized.var "group" []); |
|
66 |
|
67 fun group_id (Group {id, ...}) = id; |
|
68 fun eq_group (group1, group2) = group_id group1 = group_id group2; |
|
69 |
|
70 fun group_ancestry (Group {parent, id, ...}) = |
|
71 id :: (case parent of NONE => [] | SOME group => group_ancestry group); |
|
72 |
|
73 |
|
74 (* group status *) |
|
75 |
|
76 fun cancel_group (Group {status, ...}) exn = |
|
77 Synchronized.change status |
|
78 (fn exns => |
|
79 if not (Exn.is_interrupt exn) orelse null exns then exn :: exns |
|
80 else exns); |
|
81 |
|
82 fun is_canceled (Group {parent, status, ...}) = |
|
83 not (null (Synchronized.value status)) orelse |
|
84 (case parent of NONE => false | SOME group => is_canceled group); |
|
85 |
|
86 fun group_status (Group {parent, status, ...}) = |
|
87 Synchronized.value status @ |
|
88 (case parent of NONE => [] | SOME group => group_status group); |
|
89 |
|
90 fun str_of_group group = |
|
91 (is_canceled group ? enclose "(" ")") (string_of_int (group_id group)); |
|
92 |
|
93 end; |
|
94 |
53 |
95 |
54 (* tasks *) |
96 (* tasks *) |
55 |
97 |
56 type timing = Time.time * Time.time * string list; (*run, wait, wait dependencies*) |
98 type timing = Time.time * Time.time * string list; (*run, wait, wait dependencies*) |
57 |
99 |
58 fun new_timing () = |
100 fun new_timing () = |
59 Synchronized.var "timing" ((Time.zeroTime, Time.zeroTime, []): timing); |
101 Synchronized.var "timing" ((Time.zeroTime, Time.zeroTime, []): timing); |
60 |
102 |
61 abstype task = Task of |
103 abstype task = Task of |
62 {name: string, |
104 {group: group, |
|
105 name: string, |
63 id: int, |
106 id: int, |
64 pri: int option, |
107 pri: int option, |
65 timing: timing Synchronized.var} |
108 timing: timing Synchronized.var} |
66 with |
109 with |
67 |
110 |
68 val dummy_task = Task {name = "", id = ~1, pri = NONE, timing = new_timing ()}; |
111 fun dummy_task () = |
69 fun new_task name pri = Task {name = name, id = new_id (), pri = pri, timing = new_timing ()}; |
112 Task {group = new_group NONE, name = "", id = 0, pri = NONE, timing = new_timing ()}; |
70 |
113 |
|
114 fun new_task group name pri = |
|
115 Task {group = group, name = name, id = new_id (), pri = pri, timing = new_timing ()}; |
|
116 |
|
117 fun group_of_task (Task {group, ...}) = group; |
71 fun name_of_task (Task {name, ...}) = name; |
118 fun name_of_task (Task {name, ...}) = name; |
72 fun pri_of_task (Task {pri, ...}) = the_default 0 pri; |
119 fun pri_of_task (Task {pri, ...}) = the_default 0 pri; |
73 fun str_of_task (Task {name, id, ...}) = |
120 fun str_of_task (Task {name, id, ...}) = |
74 if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")"; |
121 if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")"; |
75 |
122 |
91 end; |
138 end; |
92 |
139 |
93 structure Task_Graph = Graph(type key = task val ord = task_ord); |
140 structure Task_Graph = Graph(type key = task val ord = task_ord); |
94 |
141 |
95 |
142 |
96 (* nested groups *) |
|
97 |
|
98 abstype group = Group of |
|
99 {parent: group option, |
|
100 id: int, |
|
101 status: exn list Synchronized.var} |
|
102 with |
|
103 |
|
104 fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status}; |
|
105 |
|
106 fun new_group parent = make_group (parent, new_id (), Synchronized.var "group" []); |
|
107 |
|
108 fun group_id (Group {id, ...}) = id; |
|
109 fun eq_group (group1, group2) = group_id group1 = group_id group2; |
|
110 |
|
111 fun group_ancestry (Group {parent, id, ...}) = |
|
112 id :: (case parent of NONE => [] | SOME group => group_ancestry group); |
|
113 |
|
114 |
|
115 (* group status *) |
|
116 |
|
117 fun cancel_group (Group {status, ...}) exn = |
|
118 Synchronized.change status |
|
119 (fn exns => |
|
120 if not (Exn.is_interrupt exn) orelse null exns then exn :: exns |
|
121 else exns); |
|
122 |
|
123 fun is_canceled (Group {parent, status, ...}) = |
|
124 not (null (Synchronized.value status)) orelse |
|
125 (case parent of NONE => false | SOME group => is_canceled group); |
|
126 |
|
127 fun group_status (Group {parent, status, ...}) = |
|
128 Synchronized.value status @ |
|
129 (case parent of NONE => [] | SOME group => group_status group); |
|
130 |
|
131 fun str_of_group group = |
|
132 (is_canceled group ? enclose "(" ")") (string_of_int (group_id group)); |
|
133 |
|
134 end; |
|
135 |
|
136 |
143 |
137 (** queue of jobs and groups **) |
144 (** queue of jobs and groups **) |
138 |
145 |
139 (* jobs *) |
146 (* jobs *) |
140 |
147 |
141 datatype job = |
148 datatype job = |
142 Job of (bool -> bool) list | |
149 Job of (bool -> bool) list | |
143 Running of Thread.thread | |
150 Running of Thread.thread | |
144 Passive of unit -> bool; |
151 Passive of unit -> bool; |
145 |
152 |
146 type jobs = (group * job) Task_Graph.T; |
153 type jobs = job Task_Graph.T; |
147 |
154 |
148 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task); |
155 fun get_job (jobs: jobs) task = Task_Graph.get_node jobs task; |
149 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task); |
156 fun set_job task job (jobs: jobs) = Task_Graph.map_node task (K job) jobs; |
150 fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs; |
|
151 |
157 |
152 fun add_job task dep (jobs: jobs) = |
158 fun add_job task dep (jobs: jobs) = |
153 Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; |
159 Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; |
154 |
160 |
155 fun get_deps (jobs: jobs) task = |
161 fun get_deps (jobs: jobs) task = |
167 val empty = make_queue Inttab.empty Task_Graph.empty; |
173 val empty = make_queue Inttab.empty Task_Graph.empty; |
168 |
174 |
169 |
175 |
170 (* job status *) |
176 (* job status *) |
171 |
177 |
172 fun ready_job task ((group, Job list), ([], _)) = SOME (task, group, rev list) |
178 fun ready_job task (Job list, ([], _)) = SOME (task, rev list) |
173 | ready_job task ((group, Passive abort), ([], _)) = |
179 | ready_job task (Passive abort, ([], _)) = |
174 if is_canceled group then SOME (task, group, [fn _ => abort ()]) |
180 if is_canceled (group_of_task task) then SOME (task, [fn _ => abort ()]) |
175 else NONE |
181 else NONE |
176 | ready_job _ _ = NONE; |
182 | ready_job _ _ = NONE; |
177 |
183 |
178 fun active_job (_, Job _) = SOME () |
184 fun active_job (_, (Job _, _)) = SOME () |
179 | active_job (_, Running _) = SOME () |
185 | active_job (_, (Running _, _)) = SOME () |
180 | active_job (group, Passive _) = if is_canceled group then SOME () else NONE; |
186 | active_job (task, (Passive _, _)) = |
181 |
187 if is_canceled (group_of_task task) then SOME () else NONE; |
182 fun all_passive (Queue {jobs, ...}) = |
188 |
183 is_none (Task_Graph.get_first (active_job o #1 o #2) jobs); |
189 fun all_passive (Queue {jobs, ...}) = is_none (Task_Graph.get_first active_job jobs); |
184 |
190 |
185 |
191 |
186 (* queue status *) |
192 (* queue status *) |
187 |
193 |
188 fun status (Queue {jobs, ...}) = |
194 fun status (Queue {jobs, ...}) = |
189 let |
195 let |
190 val (x, y, z, w) = |
196 val (x, y, z, w) = |
191 Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) => |
197 Task_Graph.fold (fn (_, (job, (deps, _))) => fn (x, y, z, w) => |
192 (case job of |
198 (case job of |
193 Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w) |
199 Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w) |
194 | Running _ => (x, y, z + 1, w) |
200 | Running _ => (x, y, z + 1, w) |
195 | Passive _ => (x, y, z, w + 1))) |
201 | Passive _ => (x, y, z, w + 1))) |
196 jobs (0, 0, 0, 0); |
202 jobs (0, 0, 0, 0); |
211 val _ = List.app Simple_Thread.interrupt running; |
217 val _ = List.app Simple_Thread.interrupt running; |
212 in null running end; |
218 in null running end; |
213 |
219 |
214 fun cancel_all (Queue {jobs, ...}) = |
220 fun cancel_all (Queue {jobs, ...}) = |
215 let |
221 let |
216 fun cancel_job (group, job) (groups, running) = |
222 fun cancel_job (task, (job, _)) (groups, running) = |
217 (cancel_group group Exn.Interrupt; |
223 let |
|
224 val group = group_of_task task; |
|
225 val _ = cancel_group group Exn.Interrupt; |
|
226 in |
218 (case job of |
227 (case job of |
219 Running t => (insert eq_group group groups, insert Thread.equal t running) |
228 Running t => (insert eq_group group groups, insert Thread.equal t running) |
220 | _ => (groups, running))); |
229 | _ => (groups, running)) |
221 val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); |
230 end; |
|
231 val (running_groups, running) = Task_Graph.fold cancel_job jobs ([], []); |
222 val _ = List.app Simple_Thread.interrupt running; |
232 val _ = List.app Simple_Thread.interrupt running; |
223 in running_groups end; |
233 in running_groups end; |
224 |
234 |
225 |
235 |
226 (* finish *) |
236 (* finish *) |
227 |
237 |
228 fun finish task (Queue {groups, jobs}) = |
238 fun finish task (Queue {groups, jobs}) = |
229 let |
239 let |
230 val group = get_group jobs task; |
240 val group = group_of_task task; |
231 val groups' = groups |
241 val groups' = groups |
232 |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group); |
242 |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group); |
233 val jobs' = Task_Graph.del_node task jobs; |
243 val jobs' = Task_Graph.del_node task jobs; |
234 val maximal = null (Task_Graph.imm_succs jobs task); |
244 val maximal = null (Task_Graph.imm_succs jobs task); |
235 in (maximal, make_queue groups' jobs') end; |
245 in (maximal, make_queue groups' jobs') end; |
237 |
247 |
238 (* enqueue *) |
248 (* enqueue *) |
239 |
249 |
240 fun enqueue_passive group abort (Queue {groups, jobs}) = |
250 fun enqueue_passive group abort (Queue {groups, jobs}) = |
241 let |
251 let |
242 val task = new_task "passive" NONE; |
252 val task = new_task group "passive" NONE; |
243 val groups' = groups |
253 val groups' = groups |
244 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); |
254 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); |
245 val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive abort)); |
255 val jobs' = jobs |> Task_Graph.new_node (task, Passive abort); |
246 in (task, make_queue groups' jobs') end; |
256 in (task, make_queue groups' jobs') end; |
247 |
257 |
248 fun enqueue name group deps pri job (Queue {groups, jobs}) = |
258 fun enqueue name group deps pri job (Queue {groups, jobs}) = |
249 let |
259 let |
250 val task = new_task name (SOME pri); |
260 val task = new_task group name (SOME pri); |
251 val groups' = groups |
261 val groups' = groups |
252 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); |
262 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); |
253 val jobs' = jobs |
263 val jobs' = jobs |
254 |> Task_Graph.new_node (task, (group, Job [job])) |
264 |> Task_Graph.new_node (task, Job [job]) |
255 |> fold (add_job task) deps |
265 |> fold (add_job task) deps |
256 |> fold (fold (add_job task) o get_deps jobs) deps; |
266 |> fold (fold (add_job task) o get_deps jobs) deps; |
257 val minimal = null (get_deps jobs' task); |
267 val minimal = null (get_deps jobs' task); |
258 in ((task, minimal), make_queue groups' jobs') end; |
268 in ((task, minimal), make_queue groups' jobs') end; |
259 |
269 |
272 in (true, make_queue groups jobs') end |
282 in (true, make_queue groups jobs') end |
273 | _ => (false, queue)); |
283 | _ => (false, queue)); |
274 |
284 |
275 fun dequeue thread (queue as Queue {groups, jobs}) = |
285 fun dequeue thread (queue as Queue {groups, jobs}) = |
276 (case Task_Graph.get_first (uncurry ready_job) jobs of |
286 (case Task_Graph.get_first (uncurry ready_job) jobs of |
277 SOME (result as (task, _, _)) => |
287 SOME (result as (task, _)) => |
278 let val jobs' = set_job task (Running thread) jobs |
288 let val jobs' = set_job task (Running thread) jobs |
279 in (SOME result, make_queue groups jobs') end |
289 in (SOME result, make_queue groups jobs') end |
280 | NONE => (NONE, queue)); |
290 | NONE => (NONE, queue)); |
281 |
291 |
282 |
292 |
292 |
302 |
293 fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) = |
303 fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) = |
294 let |
304 let |
295 fun ready task = ready_job task (Task_Graph.get_entry jobs task); |
305 fun ready task = ready_job task (Task_Graph.get_entry jobs task); |
296 val tasks = filter (can (Task_Graph.get_node jobs)) deps; |
306 val tasks = filter (can (Task_Graph.get_node jobs)) deps; |
297 fun result (res as (task, _, _)) = |
307 fun result (res as (task, _)) = |
298 let val jobs' = set_job task (Running thread) jobs |
308 let val jobs' = set_job task (Running thread) jobs |
299 in ((SOME res, Deps tasks), make_queue groups jobs') end; |
309 in ((SOME res, Deps tasks), make_queue groups jobs') end; |
300 in |
310 in |
301 (case get_first ready tasks of |
311 (case get_first ready tasks of |
302 SOME res => result res |
312 SOME res => result res |