13 val str_of_task: task -> string |
13 val str_of_task: task -> string |
14 val str_of_group: group -> string |
14 val str_of_group: group -> string |
15 type queue |
15 type queue |
16 val empty: queue |
16 val empty: queue |
17 val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue |
17 val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue |
18 val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue |
18 val depend: task list -> task -> queue -> queue |
19 val cancel: group -> queue -> Thread.thread list * queue |
19 val dequeue: queue -> (task * group * (unit -> bool)) option * queue |
|
20 val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue |
|
21 val cancel: group -> queue -> bool * queue |
20 val finish: task -> queue -> queue |
22 val finish: task -> queue -> queue |
21 end; |
23 end; |
22 |
24 |
23 structure TaskQueue: TASK_QUEUE = |
25 structure TaskQueue: TASK_QUEUE = |
24 struct |
26 struct |
40 Job of bool * (bool -> bool) | |
42 Job of bool * (bool -> bool) | |
41 Running of Thread.thread; |
43 Running of Thread.thread; |
42 |
44 |
43 type jobs = (group * job) IntGraph.T; |
45 type jobs = (group * job) IntGraph.T; |
44 |
46 |
|
47 fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id; |
45 fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); |
48 fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); |
46 fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); |
49 fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); |
47 fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; |
50 fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; |
|
51 fun add_job (Task id) (Task dep) (jobs: jobs) = |
|
52 IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs; |
48 |
53 |
49 |
54 |
50 (* queue of grouped jobs *) |
55 (* queue of grouped jobs *) |
51 |
56 |
52 datatype queue = Queue of |
57 datatype queue = Queue of |
55 |
60 |
56 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; |
61 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; |
57 val empty = make_queue Inttab.empty IntGraph.empty; |
62 val empty = make_queue Inttab.empty IntGraph.empty; |
58 |
63 |
59 |
64 |
60 (* queue operations *) |
65 (* enqueue *) |
61 |
66 |
62 fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) = |
67 fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) = |
63 let |
68 let |
64 val id = serial (); |
69 val id = serial (); |
65 val task = Task id; |
70 val task = Task id; |
66 val groups' = Inttab.cons_list (gid, task) groups; |
71 val groups' = Inttab.cons_list (gid, task) groups; |
67 |
72 val jobs' = jobs |
68 fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G |
73 |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps; |
69 handle IntGraph.UNDEF _ => G; |
|
70 val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps; |
|
71 in (task, make_queue groups' jobs') end; |
74 in (task, make_queue groups' jobs') end; |
72 |
75 |
73 fun dequeue thread (queue as Queue {groups, jobs}) = |
76 fun depend deps task (Queue {groups, jobs}) = |
|
77 make_queue groups (fold (add_job task) deps jobs); |
|
78 |
|
79 |
|
80 (* dequeue *) |
|
81 |
|
82 fun dequeue_if P (queue as Queue {groups, jobs}) = |
74 let |
83 let |
75 fun ready (id, ((group, Job (ok, job)), ([], _))) = SOME (Task id, group, (fn () => job ok)) |
84 fun ready (id, ((group, Job (ok, job)), ([], _))) = |
|
85 if P id then SOME (Task id, group, (fn () => job ok)) else NONE |
76 | ready _ = NONE; |
86 | ready _ = NONE; |
77 in |
87 in |
78 (case IntGraph.get_first ready jobs of |
88 (case IntGraph.get_first ready jobs of |
79 NONE => (NONE, queue) |
89 NONE => (NONE, queue) |
80 | SOME result => |
90 | SOME result => |
81 let val jobs' = map_job (#1 result) (K (Running thread)) jobs |
91 let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs |
82 in (SOME result, make_queue groups jobs') end) |
92 in (SOME result, make_queue groups jobs') end) |
83 end; |
93 end; |
|
94 |
|
95 val dequeue = dequeue_if (K true); |
|
96 |
|
97 fun dequeue_towards tasks (queue as Queue {jobs, ...}) = |
|
98 let val ids = tasks |
|
99 |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE) |
|
100 in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end; |
84 |
101 |
85 |
102 |
86 (* termination *) |
103 (* termination *) |
87 |
104 |
88 fun cancel (group as Group gid) (Queue {groups, jobs}) = |
105 fun cancel (group as Group gid) (Queue {groups, jobs}) = |
91 val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks []; |
108 val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks []; |
92 val jobs' = fold (fn task => |
109 val jobs' = fold (fn task => |
93 (case get_job jobs task of |
110 (case get_job jobs task of |
94 Job (true, job) => map_job task (K (Job (false, job))) |
111 Job (true, job) => map_job task (K (Job (false, job))) |
95 | _ => I)) tasks jobs; |
112 | _ => I)) tasks jobs; |
96 in (running, make_queue groups jobs') end; |
113 val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running; |
|
114 in (null running, make_queue groups jobs') end; |
97 |
115 |
98 fun finish (task as Task id) (Queue {groups, jobs}) = |
116 fun finish (task as Task id) (Queue {groups, jobs}) = |
99 let |
117 let |
100 val Group gid = get_group jobs task; |
118 val Group gid = get_group jobs task; |
101 val groups' = Inttab.remove_list (op =) (gid, task) groups; |
119 val groups' = Inttab.remove_list (op =) (gid, task) groups; |