author | wenzelm |
Tue, 09 Sep 2008 20:22:40 +0200 | |
changeset 28184 | 5ed5cb73a2e9 |
parent 28179 | 8e8313aededc |
child 28185 | 0f20cbce4935 |
permissions | -rw-r--r-- |
28165 | 1 |
(* Title: Pure/Concurrent/task_queue.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
5 |
Ordered queue of grouped tasks. |
|
6 |
*) |
|
7 |
||
8 |
signature TASK_QUEUE = |
|
9 |
sig |
|
10 |
eqtype task |
|
11 |
eqtype group |
|
12 |
val new_group: unit -> group |
|
28179 | 13 |
val str_of_task: task -> string |
14 |
val str_of_group: group -> string |
|
28165 | 15 |
type queue |
16 |
val empty: queue |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
17 |
val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
18 |
val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
19 |
val cancel: group -> queue -> Thread.thread list * queue |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
20 |
val finish: task -> queue -> queue |
28165 | 21 |
end; |
22 |
||
28171 | 23 |
structure TaskQueue: TASK_QUEUE = |
28165 | 24 |
struct |
25 |
||
28168 | 26 |
(* identifiers *) |
28165 | 27 |
|
28 |
datatype task = Task of serial; |
|
29 |
||
30 |
datatype group = Group of serial; |
|
31 |
fun new_group () = Group (serial ()); |
|
32 |
||
28179 | 33 |
fun str_of_task (Task i) = string_of_int i; |
34 |
fun str_of_group (Group i) = string_of_int i; |
|
35 |
||
28165 | 36 |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
37 |
(* jobs *) |
28165 | 38 |
|
39 |
datatype job = |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
40 |
Job of bool * (bool -> bool) | |
28165 | 41 |
Running of Thread.thread; |
42 |
||
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
43 |
type jobs = (group * job) IntGraph.T; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
44 |
|
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
45 |
fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
46 |
fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
47 |
fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
48 |
|
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
49 |
|
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
50 |
(* queue of grouped jobs *) |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
51 |
|
28165 | 52 |
datatype queue = Queue of |
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
53 |
{groups: task list Inttab.table, (*groups with presently active members*) |
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
54 |
jobs: jobs}; (*job dependency graph*) |
28165 | 55 |
|
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
56 |
fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; |
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
57 |
val empty = make_queue Inttab.empty IntGraph.empty; |
28165 | 58 |
|
59 |
||
60 |
(* queue operations *) |
|
61 |
||
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
62 |
fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) = |
28165 | 63 |
let |
64 |
val id = serial (); |
|
65 |
val task = Task id; |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
66 |
val groups' = Inttab.cons_list (gid, task) groups; |
28165 | 67 |
|
68 |
fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G |
|
69 |
handle IntGraph.UNDEF _ => G; |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
70 |
val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps; |
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
71 |
in (task, make_queue groups' jobs') end; |
28165 | 72 |
|
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
73 |
fun dequeue thread (queue as Queue {groups, jobs}) = |
28165 | 74 |
let |
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
75 |
fun ready (id, ((group, Job (ok, job)), ([], _))) = SOME (Task id, group, (fn () => job ok)) |
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
76 |
| ready _ = NONE; |
28165 | 77 |
in |
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
78 |
(case IntGraph.get_first ready jobs of |
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
79 |
NONE => (NONE, queue) |
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
80 |
| SOME result => |
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
81 |
let val jobs' = map_job (#1 result) (K (Running thread)) jobs |
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
82 |
in (SOME result, make_queue groups jobs') end) |
28165 | 83 |
end; |
84 |
||
85 |
||
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
86 |
(* termination *) |
28165 | 87 |
|
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
88 |
fun cancel (group as Group gid) (Queue {groups, jobs}) = |
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
89 |
let |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
90 |
val tasks = Inttab.lookup_list groups gid; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
91 |
val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks []; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
92 |
val jobs' = fold (fn task => |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
93 |
(case get_job jobs task of |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
94 |
Job (true, job) => map_job task (K (Job (false, job))) |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
95 |
| _ => I)) tasks jobs; |
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
96 |
in (running, make_queue groups jobs') end; |
28165 | 97 |
|
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
98 |
fun finish (task as Task id) (Queue {groups, jobs}) = |
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
99 |
let |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
100 |
val Group gid = get_group jobs task; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
101 |
val groups' = Inttab.remove_list (op =) (gid, task) groups; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
102 |
val jobs' = IntGraph.del_nodes [id] jobs; |
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset
|
103 |
in make_queue groups' jobs' end; |
28165 | 104 |
|
105 |
end; |