11 val pri_of_task: task -> int |
11 val pri_of_task: task -> int |
12 val str_of_task: task -> string |
12 val str_of_task: task -> string |
13 type group |
13 type group |
14 val eq_group: group * group -> bool |
14 val eq_group: group * group -> bool |
15 val new_group: unit -> group |
15 val new_group: unit -> group |
|
16 val is_valid: group -> bool |
16 val invalidate_group: group -> unit |
17 val invalidate_group: group -> unit |
17 val str_of_group: group -> string |
18 val str_of_group: group -> string |
18 type queue |
19 type queue |
19 val empty: queue |
20 val empty: queue |
20 val is_empty: queue -> bool |
21 val is_empty: queue -> bool |
21 val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue |
22 val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue |
|
23 val extend: task -> (bool -> bool) -> queue -> queue option |
22 val depend: task list -> task -> queue -> queue |
24 val depend: task list -> task -> queue -> queue |
23 val dequeue: queue -> (task * group * (unit -> bool)) option * queue |
25 val dequeue: queue -> (task * group * (bool -> bool) list) option * queue |
24 val dequeue_towards: task list -> queue -> |
26 val dequeue_towards: task list -> queue -> |
25 (((task * group * (unit -> bool)) * task list) option * queue) |
27 (((task * group * (bool -> bool) list) * task list) option * queue) |
26 val interrupt: queue -> task -> unit |
28 val interrupt: queue -> task -> unit |
27 val interrupt_external: queue -> string -> unit |
29 val interrupt_external: queue -> string -> unit |
28 val cancel: queue -> group -> bool |
30 val cancel: queue -> group -> bool |
29 val cancel_all: queue -> group list |
31 val cancel_all: queue -> group list |
30 val finish: task -> queue -> queue |
32 val finish: task -> queue -> queue |
50 datatype group = Group of serial * bool ref; |
52 datatype group = Group of serial * bool ref; |
51 fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2; |
53 fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2; |
52 |
54 |
53 fun new_group () = Group (serial (), ref true); |
55 fun new_group () = Group (serial (), ref true); |
54 |
56 |
|
57 fun is_valid (Group (_, ref ok)) = ok; |
55 fun invalidate_group (Group (_, ok)) = ok := false; |
58 fun invalidate_group (Group (_, ok)) = ok := false; |
56 |
59 |
57 fun str_of_group (Group (i, ref ok)) = |
60 fun str_of_group (Group (i, ref ok)) = |
58 if ok then string_of_int i else enclose "(" ")" (string_of_int i); |
61 if ok then string_of_int i else enclose "(" ")" (string_of_int i); |
59 |
62 |
60 |
63 |
61 (* jobs *) |
64 (* jobs *) |
62 |
65 |
63 datatype job = |
66 datatype job = |
64 Job of bool -> bool | |
67 Job of (bool -> bool) list | |
65 Running of Thread.thread; |
68 Running of Thread.thread; |
66 |
69 |
67 type jobs = (group * job) Task_Graph.T; |
70 type jobs = (group * job) Task_Graph.T; |
68 |
71 |
69 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task); |
72 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task); |
70 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task); |
73 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task); |
71 fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs; |
74 fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs; |
72 |
75 |
73 fun add_job task dep (jobs: jobs) = |
76 fun add_job task dep (jobs: jobs) = |
74 Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; |
77 Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; |
75 |
78 |
76 fun add_job_acyclic task dep (jobs: jobs) = |
79 fun add_job_acyclic task dep (jobs: jobs) = |
94 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) = |
97 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) = |
95 let |
98 let |
96 val task = new_task pri; |
99 val task = new_task pri; |
97 val groups' = Inttab.cons_list (gid, task) groups; |
100 val groups' = Inttab.cons_list (gid, task) groups; |
98 val jobs' = jobs |
101 val jobs' = jobs |
99 |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps; |
102 |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps; |
100 in (task, make_queue groups' jobs') end; |
103 in (task, make_queue groups' jobs') end; |
|
104 |
|
105 fun extend task job (Queue {groups, jobs}) = |
|
106 (case try (get_job jobs) task of |
|
107 SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs)) |
|
108 | _ => NONE); |
101 |
109 |
102 fun depend deps task (Queue {groups, jobs}) = |
110 fun depend deps task (Queue {groups, jobs}) = |
103 make_queue groups (fold (add_job_acyclic task) deps jobs); |
111 make_queue groups (fold (add_job_acyclic task) deps jobs); |
104 |
112 |
105 |
113 |
107 |
115 |
108 local |
116 local |
109 |
117 |
110 fun dequeue_result NONE queue = (NONE, queue) |
118 fun dequeue_result NONE queue = (NONE, queue) |
111 | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) = |
119 | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) = |
112 (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs)); |
120 (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs)); |
113 |
121 |
114 in |
122 in |
115 |
123 |
116 fun dequeue (queue as Queue {jobs, ...}) = |
124 fun dequeue (queue as Queue {jobs, ...}) = |
117 let |
125 let |
118 fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) = |
126 fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list) |
119 SOME (task, group, (fn () => job ok)) |
|
120 | ready _ = NONE; |
127 | ready _ = NONE; |
121 in dequeue_result (Task_Graph.get_first ready jobs) queue end; |
128 in dequeue_result (Task_Graph.get_first ready jobs) queue end; |
122 |
129 |
123 fun dequeue_towards tasks (queue as Queue {jobs, ...}) = |
130 fun dequeue_towards tasks (queue as Queue {jobs, ...}) = |
124 let |
131 let |
125 val tasks' = filter (can (Task_Graph.get_node jobs)) tasks; |
132 val tasks' = filter (can (Task_Graph.get_node jobs)) tasks; |
126 |
133 |
127 fun ready task = |
134 fun ready task = |
128 (case Task_Graph.get_node jobs task of |
135 (case Task_Graph.get_node jobs task of |
129 (group as Group (_, ref ok), Job job) => |
136 (group, Job list) => |
130 if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok)) |
137 if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list) |
131 else NONE |
138 else NONE |
132 | _ => NONE); |
139 | _ => NONE); |
133 in |
140 in |
134 (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of |
141 (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of |
135 (NONE, queue') => (NONE, queue') |
142 (NONE, queue') => (NONE, queue') |