28165  1 
(* Title: Pure/Concurrent/task_queue.ML 
2 
3 
Author: Makarius 

4 

5 
Ordered queue of grouped tasks. 

6 
*) 

7 

8 
signature TASK_QUEUE = 

9 
sig 

10 
eqtype task 

val str_of_task: task > string 
28165  12 
eqtype group 
13 
val new_group: unit > group 

val str_of_group: group > string 
28165  15 
type queue 
16 
val empty: queue 

17 
val enqueue: group > task list > (bool > bool) > queue > task * queue 
18 
val depend: task list > task > queue > queue 
19 
val dequeue: queue > (task * group * (unit > bool)) option * queue 
20 
val dequeue_towards: task list > queue > (task * group * (unit > bool)) option * queue 
21 
val interrupt: queue > task > unit 
22 
val interrupt_external: queue > string > unit 
23 
val finish: task > queue > queue 
24 
val cancel: queue > group > bool 
28165  25 
end; 
26 

28171  27 
structure TaskQueue: TASK_QUEUE = 
28165  28 
struct 
29 

28168  30 
(* identifiers *) 
28165  31 

32 
datatype task = Task of serial; 

33 
fun str_of_task (Task i) = string_of_int i; 
28165  34 

35 
datatype group = Group of serial * bool ref; 
36 
fun new_group () = Group (serial (), ref true); 
28165  37 

38 
fun str_of_group (Group (i, ref ok)) = 
39 
if ok then string_of_int i else enclose "(" ")" (string_of_int i); 
28179  40 

28165  41 

42 
(* jobs *) 
28165  43 

44 
datatype job = 

45 
Job of bool > bool  
28165  46 
Running of Thread.thread; 
47 

48 
type jobs = (group * job) IntGraph.T; 
49 

50 
fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id; 
51 
fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); 
52 
fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); 
53 
fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; 
54 
fun add_job (Task id) (Task dep) (jobs: jobs) = 
55 
IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs; 
56 

57 

58 
(* queue of grouped jobs *) 
59 

28165  60 
datatype queue = Queue of 
61 
{groups: task list Inttab.table, (*groups with presently active members*) 
62 
jobs: jobs}; (*job dependency graph*) 
28165  63 

64 
fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; 
65 
val empty = make_queue Inttab.empty IntGraph.empty; 
28165  66 

67 

68 
(* enqueue *) 
28165  69 

70 
fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) = 
28165  71 
let 
72 
val id = serial (); 

73 
val task = Task id; 

74 
val groups' = Inttab.cons_list (gid, task) groups; 
75 
val jobs' = jobs 
76 
> IntGraph.new_node (id, (group, Job job)) > fold (add_job task) deps; 
77 
in (task, make_queue groups' jobs') end; 
28165  78 

79 
fun depend deps task (Queue {groups, jobs}) = 
80 
make_queue groups (fold (add_job task) deps jobs); 
81 

82 

83 
(* dequeue *) 
84 

85 
fun dequeue_if P (queue as Queue {groups, jobs}) = 
28165  86 
let 
87 
fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) = 
88 
if P id then SOME (Task id, group, (fn () => job ok)) else NONE 
89 
 ready _ = NONE; 
28165  90 
in 
91 
(case IntGraph.get_first ready jobs of 
92 
NONE => (NONE, queue) 
93 
 SOME result => 
94 
let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs 
95 
in (SOME result, make_queue groups jobs') end) 
28165  96 
end; 
97 

98 
val dequeue = dequeue_if (K true); 
99 

100 
fun dequeue_towards tasks (queue as Queue {jobs, ...}) = 
101 
let val ids = tasks 
102 
> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE) 
103 
in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end; 
104 

28165  105 

106 
(* sporadic interrupts *) 
107 

108 
fun interrupt_thread thread = Thread.interrupt thread handle Thread _ => (); 
109 

110 
fun interrupt (Queue {jobs, ...}) task = 
111 
(case try (get_job jobs) task of SOME (Running thread) => interrupt_thread thread  _ => ()); 
112 

113 
fun interrupt_external queue str = 
114 
(case Int.fromString str of SOME id => interrupt queue (Task id)  NONE => ()); 
115 

116 

117 
(* termination *) 
28165  118 

28196  119 
fun cancel (Queue {groups, jobs}) (Group (gid, ok)) = 
120 
let 
121 
val _ = ok := false; (*invalidate any future group members*) 
122 
val tasks = Inttab.lookup_list groups gid; 
123 
val running = fold (get_job jobs #> (fn Running thread => cons thread  _ => I)) tasks []; 
124 
val _ = List.app interrupt_thread running; 
125 
in null running end; 
28165  126 

127 
fun finish (task as Task id) (Queue {groups, jobs}) = 
128 
let 
129 
val Group (gid, _) = get_group jobs task; 
130 
val groups' = Inttab.remove_list (op =) (gid, task) groups; 
131 
val jobs' = IntGraph.del_nodes [id] jobs; 
132 
in make_queue groups' jobs' end; 
28165  133 

134 
end; 