author  wenzelm 
Wed, 10 Sep 2008 23:19:36 +0200  
changeset 28196  f019dd2db561 
parent 28190  0a2434cf38c9 
child 28202  23cb9a974630 
permissions  rwrr 
28165  1 
(* Title: Pure/Concurrent/task_queue.ML 
2 
ID: $Id$ 

3 
Author: Makarius 

4 

5 
Ordered queue of grouped tasks. 

6 
*) 

7 

8 
signature TASK_QUEUE = 

9 
sig 

10 
eqtype task 

28196  11 
val str_of_task: task > string 
28165  12 
eqtype group 
13 
val new_group: unit > group 

28179  14 
val str_of_group: group > string 
28165  15 
type queue 
16 
val empty: queue 

28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

17 
val enqueue: group > task list > (bool > bool) > queue > task * queue 
28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

18 
val depend: task list > task > queue > queue 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

19 
val dequeue: queue > (task * group * (unit > bool)) option * queue 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

20 
val dequeue_towards: task list > queue > (task * group * (unit > bool)) option * queue 
28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

21 
val interrupt: queue > task > unit 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

22 
val interrupt_external: queue > string > unit 
28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

23 
val finish: task > queue > queue 
28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

24 
val cancel: queue > group > bool 
28165  25 
end; 
26 

28171  27 
structure TaskQueue: TASK_QUEUE = 
28165  28 
struct 
29 

28168  30 
(* identifiers *) 
28165  31 

32 
datatype task = Task of serial; 

28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

33 
fun str_of_task (Task i) = string_of_int i; 
28165  34 

28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

35 
datatype group = Group of serial * bool ref; 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

36 
fun new_group () = Group (serial (), ref true); 
28165  37 

28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

38 
fun str_of_group (Group (i, ref ok)) = 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

39 
if ok then string_of_int i else enclose "(" ")" (string_of_int i); 
28179  40 

28165  41 

28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

42 
(* jobs *) 
28165  43 

44 
datatype job = 

28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

45 
Job of bool > bool  
28165  46 
Running of Thread.thread; 
47 

28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

48 
type jobs = (group * job) IntGraph.T; 
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

49 

28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

50 
fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id; 
28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

51 
fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); 
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

52 
fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); 
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

53 
fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; 
28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

54 
fun add_job (Task id) (Task dep) (jobs: jobs) = 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

55 
IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs; 
28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

56 

01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

57 

01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

58 
(* queue of grouped jobs *) 
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

59 

28165  60 
datatype queue = Queue of 
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

61 
{groups: task list Inttab.table, (*groups with presently active members*) 
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

62 
jobs: jobs}; (*job dependency graph*) 
28165  63 

28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

64 
fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; 
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

65 
val empty = make_queue Inttab.empty IntGraph.empty; 
28165  66 

67 

28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

68 
(* enqueue *) 
28165  69 

28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

70 
fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) = 
28165  71 
let 
72 
val id = serial (); 

73 
val task = Task id; 

28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

74 
val groups' = Inttab.cons_list (gid, task) groups; 
28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

75 
val jobs' = jobs 
28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

76 
> IntGraph.new_node (id, (group, Job job)) > fold (add_job task) deps; 
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

77 
in (task, make_queue groups' jobs') end; 
28165  78 

28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

79 
fun depend deps task (Queue {groups, jobs}) = 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

80 
make_queue groups (fold (add_job task) deps jobs); 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

81 

0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

82 

0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

83 
(* dequeue *) 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

84 

0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

85 
fun dequeue_if P (queue as Queue {groups, jobs}) = 
28165  86 
let 
28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

87 
fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) = 
28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

88 
if P id then SOME (Task id, group, (fn () => job ok)) else NONE 
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

89 
 ready _ = NONE; 
28165  90 
in 
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

91 
(case IntGraph.get_first ready jobs of 
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

92 
NONE => (NONE, queue) 
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

93 
 SOME result => 
28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

94 
let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs 
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

95 
in (SOME result, make_queue groups jobs') end) 
28165  96 
end; 
97 

28185
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

98 
val dequeue = dequeue_if (K true); 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

99 

0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

100 
fun dequeue_towards tasks (queue as Queue {jobs, ...}) = 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

101 
let val ids = tasks 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

102 
> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE) 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

103 
in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end; 
0f20cbce4935
simplified dequeue: provide Thread.self internally;
wenzelm
parents:
28184
diff
changeset

104 

28165  105 

28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

106 
(* sporadic interrupts *) 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

107 

0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

108 
fun interrupt_thread thread = Thread.interrupt thread handle Thread _ => (); 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

109 

0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

110 
fun interrupt (Queue {jobs, ...}) task = 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

111 
(case try (get_job jobs) task of SOME (Running thread) => interrupt_thread thread  _ => ()); 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

112 

0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

113 
fun interrupt_external queue str = 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

114 
(case Int.fromString str of SOME id => interrupt queue (Task id)  NONE => ()); 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

115 

0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

116 

28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

117 
(* termination *) 
28165  118 

28196  119 
fun cancel (Queue {groups, jobs}) (Group (gid, ok)) = 
28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

120 
let 
28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

121 
val _ = ok := false; (*invalidate any future group members*) 
28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

122 
val tasks = Inttab.lookup_list groups gid; 
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

123 
val running = fold (get_job jobs #> (fn Running thread => cons thread  _ => I)) tasks []; 
28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

124 
val _ = List.app interrupt_thread running; 
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

125 
in null running end; 
28165  126 

28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

127 
fun finish (task as Task id) (Queue {groups, jobs}) = 
28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

128 
let 
28190
0a2434cf38c9
cancel: invalidate group implicitly, via bool ref;
wenzelm
parents:
28185
diff
changeset

129 
val Group (gid, _) = get_group jobs task; 
28176
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

130 
val groups' = Inttab.remove_list (op =) (gid, task) groups; 
01b21886e7f0
job: explicit 'ok' status  false for canceled jobs;
wenzelm
parents:
28171
diff
changeset

131 
val jobs' = IntGraph.del_nodes [id] jobs; 
28184
5ed5cb73a2e9
eliminated cache, access queue efficiently via IntGraph.get_first;
wenzelm
parents:
28179
diff
changeset

132 
in make_queue groups' jobs' end; 
28165  133 

134 
end; 