20 val str_of_group: group -> string |
20 val str_of_group: group -> string |
21 type queue |
21 type queue |
22 val empty: queue |
22 val empty: queue |
23 val all_passive: queue -> bool |
23 val all_passive: queue -> bool |
24 val status: queue -> {ready: int, pending: int, running: int, passive: int} |
24 val status: queue -> {ready: int, pending: int, running: int, passive: int} |
25 val cancel: queue -> group -> bool |
25 val cancel: group -> queue -> bool * queue |
26 val cancel_all: queue -> group list |
26 val cancel_all: queue -> group list * queue |
27 val enqueue_passive: group -> queue -> task * queue |
27 val enqueue_passive: group -> queue -> task * queue |
28 val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue |
28 val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue |
29 val extend: task -> (bool -> bool) -> queue -> queue option |
29 val extend: task -> (bool -> bool) -> queue -> queue option |
30 val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue |
30 val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue |
31 val depend: task -> task list -> queue -> queue |
31 val depend: task -> task list -> queue -> queue |
79 |
79 |
80 fun is_canceled (Group {parent, status, ...}) = |
80 fun is_canceled (Group {parent, status, ...}) = |
81 not (null (Synchronized.value status)) orelse |
81 not (null (Synchronized.value status)) orelse |
82 (case parent of NONE => false | SOME group => is_canceled group); |
82 (case parent of NONE => false | SOME group => is_canceled group); |
83 |
83 |
|
84 fun is_ready deps group = null deps orelse is_canceled group; |
|
85 |
84 fun group_status (Group {parent, status, ...}) = |
86 fun group_status (Group {parent, status, ...}) = |
85 Synchronized.value status @ |
87 Synchronized.value status @ |
86 (case parent of NONE => [] | SOME group => group_status group); |
88 (case parent of NONE => [] | SOME group => group_status group); |
87 |
89 |
88 fun str_of_group group = |
90 fun str_of_group group = |
135 (* queue status *) |
137 (* queue status *) |
136 |
138 |
137 fun status (Queue {jobs, ...}) = |
139 fun status (Queue {jobs, ...}) = |
138 let |
140 let |
139 val (x, y, z, w) = |
141 val (x, y, z, w) = |
140 Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) => |
142 Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) => |
141 (case job of |
143 (case job of |
142 Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w) |
144 Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w) |
143 | Running _ => (x, y, z + 1, w) |
145 | Running _ => (x, y, z + 1, w) |
144 | Passive => (x, y, z, w + 1))) |
146 | Passive => (x, y, z, w + 1))) |
145 jobs (0, 0, 0, 0); |
147 jobs (0, 0, 0, 0); |
146 in {ready = x, pending = y, running = z, passive = w} end; |
148 in {ready = x, pending = y, running = z, passive = w} end; |
147 |
149 |
148 |
150 |
149 (* cancel -- peers and sub-groups *) |
151 (* cancel -- peers and sub-groups *) |
150 |
152 |
151 fun cancel (Queue {groups, jobs, ...}) group = |
153 fun cancel group (Queue {groups, jobs, ...}) = |
152 let |
154 let |
153 val _ = cancel_group group Exn.Interrupt; |
155 val _ = cancel_group group Exn.Interrupt; |
154 val tasks = Inttab.lookup_list groups (group_id group); |
156 val tasks = Inttab.lookup_list groups (group_id group); |
155 val running = |
157 val running = |
156 fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; |
158 fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; |
157 val _ = List.app SimpleThread.interrupt running; |
159 val _ = List.app SimpleThread.interrupt running; |
158 in null running end; |
160 in (null running, make_queue groups jobs Unknown) end; |
159 |
161 |
160 fun cancel_all (Queue {jobs, ...}) = |
162 fun cancel_all (Queue {groups, jobs, ...}) = |
161 let |
163 let |
162 fun cancel_job (group, job) (groups, running) = |
164 fun cancel_job (group, job) (groups, running) = |
163 (cancel_group group Exn.Interrupt; |
165 (cancel_group group Exn.Interrupt; |
164 (case job of |
166 (case job of |
165 Running t => (insert eq_group group groups, insert Thread.equal t running) |
167 Running t => (insert eq_group group groups, insert Thread.equal t running) |
166 | _ => (groups, running))); |
168 | _ => (groups, running))); |
167 val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); |
169 val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); |
168 val _ = List.app SimpleThread.interrupt running; |
170 val _ = List.app SimpleThread.interrupt running; |
169 in running_groups end; |
171 in (running_groups, make_queue groups jobs Unknown) end; |
170 |
172 |
171 |
173 |
172 (* enqueue *) |
174 (* enqueue *) |
173 |
175 |
174 fun enqueue_passive group (Queue {groups, jobs, cache}) = |
176 fun enqueue_passive group (Queue {groups, jobs, cache}) = |
205 |
207 |
206 (* dequeue *) |
208 (* dequeue *) |
207 |
209 |
208 fun dequeue thread (queue as Queue {groups, jobs, cache}) = |
210 fun dequeue thread (queue as Queue {groups, jobs, cache}) = |
209 let |
211 let |
210 fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list) |
212 fun ready (task, ((group, Job list), (deps, _))) = |
|
213 if is_ready deps group then SOME (task, group, rev list) else NONE |
211 | ready _ = NONE; |
214 | ready _ = NONE; |
212 fun deq boundary = |
215 fun deq boundary = |
213 (case Task_Graph.get_first boundary ready jobs of |
216 (case Task_Graph.get_first boundary ready jobs of |
214 NONE => (NONE, make_queue groups jobs No_Result) |
217 NONE => (NONE, make_queue groups jobs No_Result) |
215 | SOME (result as (task, _, _)) => |
218 | SOME (result as (task, _, _)) => |
233 fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) = |
236 fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) = |
234 let |
237 let |
235 fun ready task = |
238 fun ready task = |
236 (case Task_Graph.get_node jobs task of |
239 (case Task_Graph.get_node jobs task of |
237 (group, Job list) => |
240 (group, Job list) => |
238 if null (get_deps jobs task) |
241 if is_ready (get_deps jobs task) group |
239 then SOME (task, group, rev list) |
242 then SOME (task, group, rev list) |
240 else NONE |
243 else NONE |
241 | _ => NONE); |
244 | _ => NONE); |
242 val tasks = filter (can (Task_Graph.get_node jobs)) deps; |
245 val tasks = filter (can (Task_Graph.get_node jobs)) deps; |
243 fun result (res as (task, _, _)) = |
246 fun result (res as (task, _, _)) = |