| author | wenzelm | 
| Tue, 28 Jul 2009 14:54:53 +0200 | |
| changeset 32251 | e586c82fdf69 | 
| parent 32250 | 3c28e4e578ad | 
| child 32784 | 1a5dde5079ac | 
| permissions | -rw-r--r-- | 
| 28165 | 1  | 
(* Title: Pure/Concurrent/task_queue.ML  | 
2  | 
Author: Makarius  | 
|
3  | 
||
4  | 
Ordered queue of grouped tasks.  | 
|
5  | 
*)  | 
|
6  | 
||
7  | 
signature TASK_QUEUE =  | 
|
8  | 
sig  | 
|
| 29340 | 9  | 
type task  | 
| 29121 | 10  | 
val new_task: int -> task  | 
11  | 
val pri_of_task: task -> int  | 
|
| 28196 | 12  | 
val str_of_task: task -> string  | 
| 29340 | 13  | 
type group  | 
| 32221 | 14  | 
val new_group: group option -> group  | 
| 32052 | 15  | 
val group_id: group -> int  | 
| 29340 | 16  | 
val eq_group: group * group -> bool  | 
| 32221 | 17  | 
val cancel_group: group -> exn -> unit  | 
18  | 
val is_canceled: group -> bool  | 
|
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
19  | 
val group_status: group -> exn list  | 
| 28179 | 20  | 
val str_of_group: group -> string  | 
| 28165 | 21  | 
type queue  | 
22  | 
val empty: queue  | 
|
| 28204 | 23  | 
val is_empty: queue -> bool  | 
| 32052 | 24  | 
  val status: queue -> {ready: int, pending: int, running: int}
 | 
| 32221 | 25  | 
val cancel: queue -> group -> bool  | 
26  | 
val cancel_all: queue -> group list  | 
|
| 
32218
 
222f26693757
enqueue/finish: return minimal/maximal state of this task;
 
wenzelm 
parents: 
32192 
diff
changeset
 | 
27  | 
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue  | 
| 29365 | 28  | 
val extend: task -> (bool -> bool) -> queue -> queue option  | 
| 32249 | 29  | 
val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue  | 
30  | 
val dequeue_towards: Thread.thread -> task list -> queue ->  | 
|
| 32224 | 31  | 
(((task * group * (bool -> bool) list) option * task list) * queue)  | 
| 32221 | 32  | 
val finish: task -> queue -> bool * queue  | 
| 28165 | 33  | 
end;  | 
34  | 
||
| 29340 | 35  | 
structure Task_Queue:> TASK_QUEUE =  | 
| 28165 | 36  | 
struct  | 
37  | 
||
| 29121 | 38  | 
(* tasks *)  | 
39  | 
||
40  | 
datatype task = Task of int * serial;  | 
|
41  | 
fun new_task pri = Task (pri, serial ());  | 
|
| 28165 | 42  | 
|
| 29121 | 43  | 
fun pri_of_task (Task (pri, _)) = pri;  | 
44  | 
fun str_of_task (Task (_, i)) = string_of_int i;  | 
|
| 28998 | 45  | 
|
| 29121 | 46  | 
fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);  | 
| 
31971
 
8c1b845ed105
renamed functor TableFun to Table, and GraphFun to Graph;
 
wenzelm 
parents: 
31632 
diff
changeset
 | 
47  | 
structure Task_Graph = Graph(type key = task val ord = task_ord);  | 
| 28165 | 48  | 
|
| 28998 | 49  | 
|
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
50  | 
(* nested groups *)  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
51  | 
|
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
52  | 
datatype group = Group of  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
53  | 
 {parent: group option,
 | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
54  | 
id: serial,  | 
| 32251 | 55  | 
status: exn list Synchronized.var};  | 
| 29121 | 56  | 
|
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
57  | 
fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
 | 
| 32052 | 58  | 
|
| 32251 | 59  | 
fun new_group parent = make_group (parent, serial (), Synchronized.var "group" []);  | 
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
60  | 
|
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
61  | 
fun group_id (Group {id, ...}) = id;
 | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
62  | 
fun eq_group (group1, group2) = group_id group1 = group_id group2;  | 
| 28551 | 63  | 
|
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
64  | 
fun group_ancestry (Group {parent, id, ...}) =
 | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
65  | 
id :: (case parent of NONE => [] | SOME group => group_ancestry group);  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
66  | 
|
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
67  | 
|
| 32221 | 68  | 
(* group status *)  | 
69  | 
||
| 32251 | 70  | 
fun cancel_group (Group {status, ...}) exn =
 | 
71  | 
Synchronized.change status  | 
|
72  | 
(fn exns =>  | 
|
73  | 
(case exn of  | 
|
74  | 
Exn.Interrupt => if null exns then [exn] else exns  | 
|
75  | 
| _ => exn :: exns));  | 
|
| 29121 | 76  | 
|
| 32251 | 77  | 
fun is_canceled (Group {parent, status, ...}) =
 | 
78  | 
not (null (Synchronized.value status)) orelse  | 
|
79  | 
(case parent of NONE => false | SOME group => is_canceled group);  | 
|
| 32221 | 80  | 
|
| 32251 | 81  | 
fun group_status (Group {parent, status, ...}) =
 | 
82  | 
Synchronized.value status @  | 
|
83  | 
(case parent of NONE => [] | SOME group => group_status group);  | 
|
| 28165 | 84  | 
|
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
85  | 
fun str_of_group group =  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
86  | 
  (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
 | 
| 28179 | 87  | 
|
| 28165 | 88  | 
|
| 
28176
 
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
 
wenzelm 
parents: 
28171 
diff
changeset
 | 
89  | 
(* jobs *)  | 
| 28165 | 90  | 
|
91  | 
datatype job =  | 
|
| 29365 | 92  | 
Job of (bool -> bool) list |  | 
| 28165 | 93  | 
Running of Thread.thread;  | 
94  | 
||
| 29121 | 95  | 
type jobs = (group * job) Task_Graph.T;  | 
| 
28176
 
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
 
wenzelm 
parents: 
28171 
diff
changeset
 | 
96  | 
|
| 29121 | 97  | 
fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);  | 
98  | 
fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);  | 
|
| 29365 | 99  | 
fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;  | 
| 
28202
 
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
 
wenzelm 
parents: 
28196 
diff
changeset
 | 
100  | 
|
| 32250 | 101  | 
fun add_job task dep (jobs: jobs) =  | 
102  | 
Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;  | 
|
103  | 
||
104  | 
fun get_deps (jobs: jobs) task =  | 
|
105  | 
Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];  | 
|
106  | 
||
| 
28176
 
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
 
wenzelm 
parents: 
28171 
diff
changeset
 | 
107  | 
|
| 
 
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
 
wenzelm 
parents: 
28171 
diff
changeset
 | 
108  | 
(* queue of grouped jobs *)  | 
| 
 
01b21886e7f0
job: explicit 'ok' status -- false for canceled jobs;
 
wenzelm 
parents: 
28171 
diff
changeset
 | 
109  | 
|
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
110  | 
datatype result = Unknown | Result of task | No_Result;  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
111  | 
|
| 28165 | 112  | 
datatype queue = Queue of  | 
| 
28184
 
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
 
wenzelm 
parents: 
28179 
diff
changeset
 | 
113  | 
 {groups: task list Inttab.table,   (*groups with presently active members*)
 | 
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
114  | 
jobs: jobs, (*job dependency graph*)  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
115  | 
cache: result}; (*last dequeue result*)  | 
| 28165 | 116  | 
|
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
117  | 
fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
 | 
| 28204 | 118  | 
|
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
119  | 
val empty = make_queue Inttab.empty Task_Graph.empty No_Result;  | 
| 29121 | 120  | 
fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
 | 
| 28165 | 121  | 
|
122  | 
||
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
123  | 
(* queue status *)  | 
| 32052 | 124  | 
|
125  | 
fun status (Queue {jobs, ...}) =
 | 
|
126  | 
let  | 
|
127  | 
val (x, y, z) =  | 
|
128  | 
Task_Graph.fold (fn (task, ((_, job), (deps, _))) => fn (x, y, z) =>  | 
|
129  | 
(case job of  | 
|
130  | 
Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z)  | 
|
131  | 
| Running _ => (x, y, z + 1)))  | 
|
132  | 
jobs (0, 0, 0);  | 
|
133  | 
  in {ready = x, pending = y, running = z} end;
 | 
|
134  | 
||
135  | 
||
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
136  | 
(* cancel -- peers and sub-groups *)  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
137  | 
|
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
138  | 
fun cancel (Queue {groups, jobs, ...}) group =
 | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
139  | 
let  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
140  | 
val _ = cancel_group group Exn.Interrupt;  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
141  | 
val tasks = Inttab.lookup_list groups (group_id group);  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
142  | 
val running =  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
143  | 
fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
144  | 
val _ = List.app SimpleThread.interrupt running;  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
145  | 
in null running end;  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
146  | 
|
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
147  | 
fun cancel_all (Queue {jobs, ...}) =
 | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
148  | 
let  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
149  | 
fun cancel_job (group, job) (groups, running) =  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
150  | 
(cancel_group group Exn.Interrupt;  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
151  | 
(case job of Running t => (insert eq_group group groups, insert Thread.equal t running)  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
152  | 
| _ => (groups, running)));  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
153  | 
val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
154  | 
val _ = List.app SimpleThread.interrupt running;  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
155  | 
in groups end;  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
156  | 
|
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
157  | 
|
| 
28185
 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
 
wenzelm 
parents: 
28184 
diff
changeset
 | 
158  | 
(* enqueue *)  | 
| 28165 | 159  | 
|
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
160  | 
fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
 | 
| 28165 | 161  | 
let  | 
| 29121 | 162  | 
val task = new_task pri;  | 
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
163  | 
val groups' = groups  | 
| 
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
164  | 
|> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);  | 
| 
28185
 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
 
wenzelm 
parents: 
28184 
diff
changeset
 | 
165  | 
val jobs' = jobs  | 
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
166  | 
|> Task_Graph.new_node (task, (group, Job [job]))  | 
| 
32190
 
4fc7a882b41e
enqueue: maintain transitive closure, which simplifies dequeue_towards;
 
wenzelm 
parents: 
32101 
diff
changeset
 | 
167  | 
|> fold (add_job task) deps  | 
| 
 
4fc7a882b41e
enqueue: maintain transitive closure, which simplifies dequeue_towards;
 
wenzelm 
parents: 
32101 
diff
changeset
 | 
168  | 
|> fold (fold (add_job task) o get_deps jobs) deps;  | 
| 
32218
 
222f26693757
enqueue/finish: return minimal/maximal state of this task;
 
wenzelm 
parents: 
32192 
diff
changeset
 | 
169  | 
val minimal = null (get_deps jobs' task);  | 
| 31632 | 170  | 
val cache' =  | 
171  | 
(case cache of  | 
|
172  | 
Result last =>  | 
|
173  | 
if task_ord (last, task) = LESS  | 
|
174  | 
then cache else Unknown  | 
|
175  | 
| _ => Unknown);  | 
|
| 
32218
 
222f26693757
enqueue/finish: return minimal/maximal state of this task;
 
wenzelm 
parents: 
32192 
diff
changeset
 | 
176  | 
in ((task, minimal), make_queue groups' jobs' cache') end;  | 
| 28165 | 177  | 
|
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
178  | 
fun extend task job (Queue {groups, jobs, cache}) =
 | 
| 29365 | 179  | 
(case try (get_job jobs) task of  | 
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
180  | 
SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)  | 
| 29365 | 181  | 
| _ => NONE);  | 
182  | 
||
| 
28185
 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
 
wenzelm 
parents: 
28184 
diff
changeset
 | 
183  | 
|
| 
 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
 
wenzelm 
parents: 
28184 
diff
changeset
 | 
184  | 
(* dequeue *)  | 
| 
 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
 
wenzelm 
parents: 
28184 
diff
changeset
 | 
185  | 
|
| 32249 | 186  | 
fun dequeue thread (queue as Queue {groups, jobs, cache}) =
 | 
| 29121 | 187  | 
let  | 
| 29365 | 188  | 
fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)  | 
| 29121 | 189  | 
| ready _ = NONE;  | 
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
190  | 
fun deq boundary =  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
191  | 
(case Task_Graph.get_first boundary ready jobs of  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
192  | 
NONE => (NONE, make_queue groups jobs No_Result)  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
193  | 
| SOME (result as (task, _, _)) =>  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
194  | 
let  | 
| 32249 | 195  | 
val jobs' = set_job task (Running thread) jobs;  | 
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
196  | 
val cache' = Result task;  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
197  | 
in (SOME result, make_queue groups jobs' cache') end);  | 
| 29121 | 198  | 
in  | 
| 
31617
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
199  | 
(case cache of  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
200  | 
Unknown => deq NONE  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
201  | 
| Result last => deq (SOME last)  | 
| 
 
bb7b5a5942c7
simplified join_results: no longer work "towards" deps, which simplifies task queue management and maintains strict bottom up discipline (without "transfer of priority" to required futures);
 
wenzelm 
parents: 
29365 
diff
changeset
 | 
202  | 
| No_Result => (NONE, queue))  | 
| 
28384
 
70abca69247b
dequeue_towards: return bound for unfinished tasks;
 
wenzelm 
parents: 
28332 
diff
changeset
 | 
203  | 
end;  | 
| 
28202
 
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
 
wenzelm 
parents: 
28196 
diff
changeset
 | 
204  | 
|
| 28165 | 205  | 
|
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
206  | 
(* dequeue_towards -- adhoc dependencies *)  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
207  | 
|
| 32249 | 208  | 
fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
 | 
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
209  | 
let  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
210  | 
fun ready task =  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
211  | 
(case Task_Graph.get_node jobs task of  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
212  | 
(group, Job list) =>  | 
| 32250 | 213  | 
if null (get_deps jobs task)  | 
| 
32101
 
e25107ff4f56
support for nested groups -- cancellation is propagated to peers and subgroups;
 
wenzelm 
parents: 
32099 
diff
changeset
 | 
214  | 
then SOME (task, group, rev list)  | 
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
215  | 
else NONE  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
216  | 
| _ => NONE);  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
217  | 
val tasks = filter (can (Task_Graph.get_node jobs)) deps;  | 
| 32192 | 218  | 
fun result (res as (task, _, _)) =  | 
219  | 
let  | 
|
| 32249 | 220  | 
val jobs' = set_job task (Running thread) jobs;  | 
| 32192 | 221  | 
val cache' = Unknown;  | 
| 32224 | 222  | 
in ((SOME res, tasks), make_queue groups jobs' cache') end;  | 
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
223  | 
in  | 
| 
32093
 
30996b775a7f
tuned dequeu_towards: try immediate tasks before expensive all_preds;
 
wenzelm 
parents: 
32055 
diff
changeset
 | 
224  | 
(case get_first ready tasks of  | 
| 32192 | 225  | 
SOME res => result res  | 
226  | 
| NONE =>  | 
|
| 32250 | 227  | 
(case get_first (get_first ready o get_deps jobs) tasks of  | 
| 32192 | 228  | 
SOME res => result res  | 
| 32224 | 229  | 
| NONE => ((NONE, tasks), queue)))  | 
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
230  | 
end;  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
231  | 
|
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32052 
diff
changeset
 | 
232  | 
|
| 32221 | 233  | 
(* finish *)  | 
234  | 
||
235  | 
fun finish task (Queue {groups, jobs, cache}) =
 | 
|
236  | 
let  | 
|
237  | 
val group = get_group jobs task;  | 
|
238  | 
val groups' = groups  | 
|
239  | 
|> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);  | 
|
240  | 
val jobs' = Task_Graph.del_node task jobs;  | 
|
241  | 
val maximal = null (Task_Graph.imm_succs jobs task);  | 
|
242  | 
val cache' = if maximal then cache else Unknown;  | 
|
243  | 
in (maximal, make_queue groups' jobs' cache') end;  | 
|
244  | 
||
| 28165 | 245  | 
end;  |