21 val is_empty: queue -> bool |
21 val is_empty: queue -> bool |
22 val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue |
22 val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue |
23 val extend: task -> (bool -> bool) -> queue -> queue option |
23 val extend: task -> (bool -> bool) -> queue -> queue option |
24 val depend: task list -> task -> queue -> queue |
24 val depend: task list -> task -> queue -> queue |
25 val dequeue: queue -> (task * group * (bool -> bool) list) option * queue |
25 val dequeue: queue -> (task * group * (bool -> bool) list) option * queue |
26 val dequeue_towards: task list -> queue -> |
|
27 (((task * group * (bool -> bool) list) * task list) option * queue) |
|
28 val interrupt: queue -> task -> unit |
26 val interrupt: queue -> task -> unit |
29 val interrupt_external: queue -> string -> unit |
27 val interrupt_external: queue -> string -> unit |
30 val cancel: queue -> group -> bool |
28 val cancel: queue -> group -> bool |
31 val cancel_all: queue -> group list |
29 val cancel_all: queue -> group list |
32 val finish: task -> queue -> queue |
30 val finish: task -> queue -> queue |
80 Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; |
78 Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; |
81 |
79 |
82 |
80 |
83 (* queue of grouped jobs *) |
81 (* queue of grouped jobs *) |
84 |
82 |
|
83 datatype result = Unknown | Result of task | No_Result; |
|
84 |
85 datatype queue = Queue of |
85 datatype queue = Queue of |
86 {groups: task list Inttab.table, (*groups with presently active members*) |
86 {groups: task list Inttab.table, (*groups with presently active members*) |
87 jobs: jobs}; (*job dependency graph*) |
87 jobs: jobs, (*job dependency graph*) |
|
88 cache: result}; (*last dequeue result*) |
88 |
89 |
89 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; |
90 fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache}; |
90 |
91 |
91 val empty = make_queue Inttab.empty Task_Graph.empty; |
92 val empty = make_queue Inttab.empty Task_Graph.empty No_Result; |
92 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; |
93 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; |
93 |
94 |
94 |
95 |
95 (* enqueue *) |
96 (* enqueue *) |
96 |
97 |
97 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) = |
98 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, ...}) = |
98 let |
99 let |
99 val task = new_task pri; |
100 val task = new_task pri; |
100 val groups' = Inttab.cons_list (gid, task) groups; |
101 val groups' = Inttab.cons_list (gid, task) groups; |
101 val jobs' = jobs |
102 val jobs' = jobs |
102 |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps; |
103 |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps; |
103 in (task, make_queue groups' jobs') end; |
104 in (task, make_queue groups' jobs' Unknown) end; |
104 |
105 |
105 fun extend task job (Queue {groups, jobs}) = |
106 fun extend task job (Queue {groups, jobs, cache}) = |
106 (case try (get_job jobs) task of |
107 (case try (get_job jobs) task of |
107 SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs)) |
108 SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache) |
108 | _ => NONE); |
109 | _ => NONE); |
109 |
110 |
110 fun depend deps task (Queue {groups, jobs}) = |
111 fun depend deps task (Queue {groups, jobs, ...}) = |
111 make_queue groups (fold (add_job_acyclic task) deps jobs); |
112 make_queue groups (fold (add_job_acyclic task) deps jobs) Unknown; |
112 |
113 |
113 |
114 |
114 (* dequeue *) |
115 (* dequeue *) |
115 |
116 |
116 local |
117 fun dequeue (queue as Queue {groups, jobs, cache}) = |
117 |
|
118 fun dequeue_result NONE queue = (NONE, queue) |
|
119 | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) = |
|
120 (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs)); |
|
121 |
|
122 in |
|
123 |
|
124 fun dequeue (queue as Queue {jobs, ...}) = |
|
125 let |
118 let |
126 fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list) |
119 fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list) |
127 | ready _ = NONE; |
120 | ready _ = NONE; |
128 in dequeue_result (Task_Graph.get_first ready jobs) queue end; |
121 fun deq boundary = |
129 |
122 (case Task_Graph.get_first boundary ready jobs of |
130 fun dequeue_towards tasks (queue as Queue {jobs, ...}) = |
123 NONE => (NONE, make_queue groups jobs No_Result) |
131 let |
124 | SOME (result as (task, _, _)) => |
132 val tasks' = filter (can (Task_Graph.get_node jobs)) tasks; |
125 let |
133 |
126 val jobs' = set_job task (Running (Thread.self ())) jobs; |
134 fun ready task = |
127 val cache' = Result task; |
135 (case Task_Graph.get_node jobs task of |
128 in (SOME result, make_queue groups jobs' cache') end); |
136 (group, Job list) => |
|
137 if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list) |
|
138 else NONE |
|
139 | _ => NONE); |
|
140 in |
129 in |
141 (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of |
130 (case cache of |
142 (NONE, queue') => (NONE, queue') |
131 Unknown => deq NONE |
143 | (SOME work, queue') => (SOME (work, tasks'), queue')) |
132 | Result last => deq (SOME last) |
|
133 | No_Result => (NONE, queue)) |
144 end; |
134 end; |
145 |
|
146 end; |
|
147 |
135 |
148 |
136 |
149 (* sporadic interrupts *) |
137 (* sporadic interrupts *) |
150 |
138 |
151 fun interrupt (Queue {jobs, ...}) task = |
139 fun interrupt (Queue {jobs, ...}) task = |
152 (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ()); |
140 (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ()); |
153 |
141 |
154 fun interrupt_external (queue as Queue {jobs, ...}) str = |
142 fun interrupt_external (queue as Queue {jobs, ...}) str = |
155 (case Int.fromString str of |
143 (case Int.fromString str of |
156 SOME i => |
144 SOME i => |
157 (case Task_Graph.get_first |
145 (case Task_Graph.get_first NONE |
158 (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs |
146 (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs |
159 of SOME task => interrupt queue task | NONE => ()) |
147 of SOME task => interrupt queue task | NONE => ()) |
160 | NONE => ()); |
148 | NONE => ()); |
161 |
149 |
162 |
150 |