author | wenzelm |
Tue, 09 Sep 2008 16:29:32 +0200 | |
changeset 28176 | 01b21886e7f0 |
parent 28171 | 9b2f9cc9ff4b |
child 28179 | 8e8313aededc |
permissions | -rw-r--r-- |
28165 | 1 |
(* Title: Pure/Concurrent/task_queue.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
5 |
Ordered queue of grouped tasks. |
|
6 |
*) |
|
7 |
||
8 |
signature TASK_QUEUE = |
|
9 |
sig |
|
10 |
eqtype task |
|
11 |
eqtype group |
|
12 |
val new_group: unit -> group |
|
13 |
type queue |
|
14 |
val empty: queue |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
15 |
val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
16 |
val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
17 |
val cancel: group -> queue -> Thread.thread list * queue |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
18 |
val finish: task -> queue -> queue |
28165 | 19 |
end; |
20 |
||
28171 | 21 |
structure TaskQueue: TASK_QUEUE = |
28165 | 22 |
struct |
23 |
||
28168 | 24 |
(* identifiers *) |
28165 | 25 |
|
26 |
datatype task = Task of serial; |
|
27 |
||
28 |
datatype group = Group of serial; |
|
29 |
fun new_group () = Group (serial ()); |
|
30 |
||
31 |
||
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
32 |
(* jobs *) |
28165 | 33 |
|
34 |
datatype job = |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
35 |
Job of bool * (bool -> bool) | |
28165 | 36 |
Running of Thread.thread; |
37 |
||
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
38 |
type jobs = (group * job) IntGraph.T; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
39 |
|
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
40 |
fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
41 |
fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
42 |
fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
43 |
|
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
44 |
|
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
45 |
(* queue of grouped jobs *) |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
46 |
|
28165 | 47 |
datatype queue = Queue of |
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
48 |
{groups: task list Inttab.table, (*groups with presently active members*) |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
49 |
jobs: jobs, (*job dependency graph*) |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
50 |
cache: (task * group * (unit -> bool)) Queue.T option}; (*cache of ready tasks*) |
28165 | 51 |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
52 |
fun make_queue groups jobs cache = |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
53 |
Queue {groups = groups, jobs = jobs, cache = cache}; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
54 |
|
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
55 |
val empty = make_queue Inttab.empty IntGraph.empty NONE; |
28165 | 56 |
|
57 |
||
58 |
(* queue operations *) |
|
59 |
||
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
60 |
fun enqueue (group as Group gid) deps job (Queue {groups, jobs, ...}) = |
28165 | 61 |
let |
62 |
val id = serial (); |
|
63 |
val task = Task id; |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
64 |
val groups' = Inttab.cons_list (gid, task) groups; |
28165 | 65 |
|
66 |
fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G |
|
67 |
handle IntGraph.UNDEF _ => G; |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
68 |
val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
69 |
in (task, make_queue groups' jobs' NONE) end; |
28165 | 70 |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
71 |
fun dequeue thread (queue as Queue {groups, jobs, cache}) = |
28165 | 72 |
let |
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
73 |
fun add_ready (id, ((group, Job (ok, job)), ([], _))) = |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
74 |
Queue.enqueue (Task id, group, (fn () => job ok)) |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
75 |
| add_ready _ = I; |
28165 | 76 |
val ready = |
77 |
(case cache of |
|
78 |
SOME ready => ready |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
79 |
| NONE => IntGraph.fold add_ready jobs Queue.empty); |
28165 | 80 |
in |
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
81 |
(case try Queue.dequeue ready of |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
82 |
NONE => (NONE, make_queue groups jobs (SOME ready)) |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
83 |
| SOME (result, ready') => |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
84 |
let val jobs' = map_job (#1 result) (K (Running thread)) jobs |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
85 |
in (SOME result, make_queue groups jobs' (SOME ready')) end) |
28165 | 86 |
end; |
87 |
||
88 |
||
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
89 |
(* termination *) |
28165 | 90 |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
91 |
fun cancel (group as Group gid) (Queue {groups, jobs, ...}) = |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
92 |
let |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
93 |
val tasks = Inttab.lookup_list groups gid; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
94 |
val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks []; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
95 |
val jobs' = fold (fn task => |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
96 |
(case get_job jobs task of |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
97 |
Job (true, job) => map_job task (K (Job (false, job))) |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
98 |
| _ => I)) tasks jobs; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
99 |
in (running, make_queue groups jobs' NONE) end; |
28165 | 100 |
|
28176
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
101 |
fun finish (task as Task id) (Queue {groups, jobs, ...}) = |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
102 |
let |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
103 |
val Group gid = get_group jobs task; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
104 |
val groups' = Inttab.remove_list (op =) (gid, task) groups; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
105 |
val jobs' = IntGraph.del_nodes [id] jobs; |
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
wenzelm
parents:
28171
diff
changeset
|
106 |
in make_queue groups' jobs' NONE end; |
28165 | 107 |
|
108 |
end; |