74 type elem = Time.time * Thread.thread; |
60 type elem = Time.time * Thread.thread; |
75 fun ord ((a, _), (b, _)) = Time.compare (a, b); |
61 fun ord ((a, _), (b, _)) = Time.compare (a, b); |
76 ); |
62 ); |
77 |
63 |
78 fun lookup_thread xs = AList.lookup Thread.equal xs; |
64 fun lookup_thread xs = AList.lookup Thread.equal xs; |
|
65 fun delete_thread xs = AList.delete Thread.equal xs; |
79 fun update_thread xs = AList.update Thread.equal xs; |
66 fun update_thread xs = AList.update Thread.equal xs; |
80 |
67 |
81 |
68 |
82 (* state of thread manager *) |
69 (* state of thread manager *) |
83 |
70 |
84 type state = |
71 type state = |
85 {manager: Thread.thread option, |
72 {manager: Thread.thread option, |
86 timeout_heap: Thread_Heap.T, |
73 timeout_heap: Thread_Heap.T, |
87 oldest_heap: Thread_Heap.T, |
|
88 active: (Thread.thread * (Time.time * Time.time * string)) list, |
74 active: (Thread.thread * (Time.time * Time.time * string)) list, |
89 cancelling: (Thread.thread * (Time.time * Time.time * string)) list, |
75 cancelling: (Thread.thread * (Time.time * string)) list, |
90 messages: string list, |
76 messages: string list, |
91 store: string list}; |
77 store: string list}; |
92 |
78 |
93 fun make_state manager timeout_heap oldest_heap active cancelling messages store : state = |
79 fun make_state manager timeout_heap active cancelling messages store : state = |
94 {manager = manager, timeout_heap = timeout_heap, oldest_heap = oldest_heap, |
80 {manager = manager, timeout_heap = timeout_heap, active = active, |
95 active = active, cancelling = cancelling, messages = messages, store = store}; |
81 cancelling = cancelling, messages = messages, store = store}; |
96 |
82 |
97 val global_state = Synchronized.var "atp_manager" |
83 val global_state = Synchronized.var "atp_manager" |
98 (make_state NONE Thread_Heap.empty Thread_Heap.empty [] [] [] []); |
84 (make_state NONE Thread_Heap.empty [] [] [] []); |
99 |
85 |
100 |
86 |
101 (* unregister thread *) |
87 (* unregister ATP thread *) |
102 |
88 |
103 fun unregister (success, message) thread = Synchronized.change global_state |
89 fun unregister (success, message) thread = Synchronized.change global_state |
104 (fn state as {manager, timeout_heap, oldest_heap, active, cancelling, messages, store} => |
90 (fn state as {manager, timeout_heap, active, cancelling, messages, store} => |
105 (case lookup_thread active thread of |
91 (case lookup_thread active thread of |
106 SOME (birthtime, _, description) => |
92 SOME (birth_time, _, description) => |
107 let |
93 let |
108 val (group, active') = |
94 val active' = delete_thread thread active; |
109 if success then List.partition (fn (_, (tb, _, _)) => tb = birthtime) active |
95 val cancelling' = (thread, (Time.now (), description)) :: cancelling; |
110 else List.partition (fn (th, _) => Thread.equal (th, thread)) active; |
96 val message' = description ^ "\n" ^ message; |
111 |
97 val messages' = message' :: messages; |
112 val now = Time.now (); |
|
113 val cancelling' = |
|
114 fold (fn (th, (tb, _, desc)) => update_thread (th, (tb, now, desc))) group cancelling; |
|
115 |
|
116 val message' = description ^ "\n" ^ message ^ |
|
117 (if length group <= 1 then "" |
|
118 else "\nInterrupted " ^ string_of_int (length group - 1) ^ " other group members"); |
|
119 val store' = message' :: |
98 val store' = message' :: |
120 (if length store <= message_store_limit then store |
99 (if length store <= message_store_limit then store |
121 else #1 (chop message_store_limit store)); |
100 else #1 (chop message_store_limit store)); |
122 in |
101 in make_state manager timeout_heap active' cancelling' messages' store' end |
123 make_state manager timeout_heap oldest_heap |
|
124 active' cancelling' (message' :: messages) store' |
|
125 end |
|
126 | NONE => state)); |
102 | NONE => state)); |
127 |
103 |
128 |
104 |
129 (* kill excessive atp threads *) |
105 (* main manager thread -- only one may exist *) |
130 |
106 |
131 local |
107 val min_wait_time = Time.fromMilliseconds 300; |
132 |
108 val max_wait_time = Time.fromSeconds 10; |
133 exception UNCHANGED of unit; |
|
134 |
|
135 fun kill_oldest () = |
|
136 Synchronized.change_result global_state |
|
137 (fn {manager, timeout_heap, oldest_heap, active, cancelling, messages, store} => |
|
138 if Thread_Heap.is_empty oldest_heap orelse not (excessive_atps active) |
|
139 then raise UNCHANGED () |
|
140 else |
|
141 let |
|
142 val ((_, oldest_thread), oldest_heap') = Thread_Heap.min_elem oldest_heap; |
|
143 val state' = |
|
144 make_state manager timeout_heap oldest_heap' active cancelling messages store; |
|
145 in (oldest_thread, state') end) |
|
146 |> unregister (false, "Interrupted (maximum number of ATPs exceeded)") |
|
147 handle UNCHANGED () => (); |
|
148 |
|
149 in |
|
150 |
|
151 fun kill_excessive () = |
|
152 let val {active, ...} = Synchronized.value global_state |
|
153 in if excessive_atps active then (kill_oldest (); kill_excessive ()) else () end; |
|
154 |
|
155 end; |
|
156 |
109 |
157 fun print_new_messages () = |
110 fun print_new_messages () = |
158 let val msgs = Synchronized.change_result global_state |
111 let val msgs = Synchronized.change_result global_state |
159 (fn {manager, timeout_heap, oldest_heap, active, cancelling, messages, store} => |
112 (fn {manager, timeout_heap, active, cancelling, messages, store} => |
160 (messages, make_state manager timeout_heap oldest_heap active cancelling [] store)) |
113 (messages, make_state manager timeout_heap active cancelling [] store)) |
161 in |
114 in |
162 if null msgs then () |
115 if null msgs then () |
163 else priority ("Sledgehammer: " ^ space_implode "\n\n" msgs) |
116 else priority ("Sledgehammer: " ^ space_implode "\n\n" msgs) |
164 end; |
117 end; |
165 |
118 |
166 |
|
167 (* start manager thread -- only one may exist *) |
|
168 |
|
169 val min_wait_time = Time.fromMilliseconds 300; |
|
170 val max_wait_time = Time.fromSeconds 10; |
|
171 |
|
172 fun check_thread_manager () = Synchronized.change global_state |
119 fun check_thread_manager () = Synchronized.change global_state |
173 (fn {manager, timeout_heap, oldest_heap, active, cancelling, messages, store} => |
120 (fn state as {manager, timeout_heap, active, cancelling, messages, store} => |
174 if (case manager of SOME thread => Thread.isActive thread | NONE => false) |
121 if (case manager of SOME thread => Thread.isActive thread | NONE => false) then state |
175 then make_state manager timeout_heap oldest_heap active cancelling messages store |
|
176 else let val manager = SOME (SimpleThread.fork false (fn () => |
122 else let val manager = SOME (SimpleThread.fork false (fn () => |
177 let |
123 let |
178 fun time_limit timeout_heap = |
124 fun time_limit timeout_heap = |
179 (case try Thread_Heap.min timeout_heap of |
125 (case try Thread_Heap.min timeout_heap of |
180 NONE => Time.+ (Time.now (), max_wait_time) |
126 NONE => Time.+ (Time.now (), max_wait_time) |
181 | SOME (time, _) => time); |
127 | SOME (time, _) => time); |
182 |
128 |
183 (*action: find threads whose timeout is reached, and interrupt cancelling threads*) |
129 (*action: find threads whose timeout is reached, and interrupt cancelling threads*) |
184 fun action {manager, timeout_heap, oldest_heap, active, cancelling, messages, store} = |
130 fun action {manager, timeout_heap, active, cancelling, messages, store} = |
185 let val (timeout_threads, timeout_heap') = |
131 let val (timeout_threads, timeout_heap') = |
186 Thread_Heap.upto (Time.now (), Thread.self ()) timeout_heap; |
132 Thread_Heap.upto (Time.now (), Thread.self ()) timeout_heap; |
187 in |
133 in |
188 if null timeout_threads andalso null cancelling andalso not (excessive_atps active) |
134 if null timeout_threads andalso null cancelling |
189 then NONE |
135 then NONE |
190 else |
136 else |
191 let |
137 let |
192 val _ = List.app (SimpleThread.interrupt o #1) cancelling; |
138 val _ = List.app (SimpleThread.interrupt o #1) cancelling; |
193 val cancelling' = filter (Thread.isActive o #1) cancelling; |
139 val cancelling' = filter (Thread.isActive o #1) cancelling; |
194 val state' = |
140 val state' = make_state manager timeout_heap' active cancelling' messages store; |
195 make_state manager timeout_heap' oldest_heap active cancelling' messages store; |
|
196 in SOME (map #2 timeout_threads, state') end |
141 in SOME (map #2 timeout_threads, state') end |
197 end; |
142 end; |
198 in |
143 in |
199 while Synchronized.change_result global_state |
144 while Synchronized.change_result global_state |
200 (fn state as {timeout_heap, oldest_heap, active, cancelling, messages, store, ...} => |
145 (fn state as {timeout_heap, active, cancelling, messages, store, ...} => |
201 if null active andalso null cancelling andalso null messages |
146 if null active andalso null cancelling andalso null messages |
202 then (false, make_state NONE timeout_heap oldest_heap active cancelling messages store) |
147 then (false, make_state NONE timeout_heap active cancelling messages store) |
203 else (true, state)) |
148 else (true, state)) |
204 do |
149 do |
205 (Synchronized.timed_access global_state (SOME o time_limit o #timeout_heap) action |
150 (Synchronized.timed_access global_state (SOME o time_limit o #timeout_heap) action |
206 |> these |
151 |> these |
207 |> List.app (unregister (false, "Interrupted (reached timeout)")); |
152 |> List.app (unregister (false, "Interrupted (reached timeout)")); |
208 kill_excessive (); |
|
209 print_new_messages (); |
153 print_new_messages (); |
210 (*give threads some time to respond to interrupt*) |
154 (*give threads some time to respond to interrupt*) |
211 OS.Process.sleep min_wait_time) |
155 OS.Process.sleep min_wait_time) |
212 end)) |
156 end)) |
213 in make_state manager timeout_heap oldest_heap active cancelling messages store end); |
157 in make_state manager timeout_heap active cancelling messages store end); |
214 |
158 |
215 |
159 |
216 (* thread is registered here by sledgehammer *) |
160 (* register ATP thread *) |
217 |
161 |
218 fun register birthtime deadtime (thread, desc) = |
162 fun register birth_time death_time (thread, desc) = |
219 (Synchronized.change global_state |
163 (Synchronized.change global_state |
220 (fn {manager, timeout_heap, oldest_heap, active, cancelling, messages, store} => |
164 (fn {manager, timeout_heap, active, cancelling, messages, store} => |
221 let |
165 let |
222 val timeout_heap' = Thread_Heap.insert (deadtime, thread) timeout_heap; |
166 val timeout_heap' = Thread_Heap.insert (death_time, thread) timeout_heap; |
223 val oldest_heap' = Thread_Heap.insert (birthtime, thread) oldest_heap; |
167 val active' = update_thread (thread, (birth_time, death_time, desc)) active; |
224 val active' = update_thread (thread, (birthtime, deadtime, desc)) active; |
168 val state' = make_state manager timeout_heap' active' cancelling messages store; |
225 val state' = |
|
226 make_state manager timeout_heap' oldest_heap' active' cancelling messages store; |
|
227 in state' end); |
169 in state' end); |
228 check_thread_manager ()); |
170 check_thread_manager ()); |
229 |
171 |
230 |
172 |
231 |
173 |
232 (** user commands **) |
174 (** user commands **) |
233 |
175 |
234 (* kill: move all threads to cancelling *) |
176 (* kill *) |
235 |
177 |
236 fun kill () = Synchronized.change global_state |
178 fun kill () = Synchronized.change global_state |
237 (fn {manager, timeout_heap, oldest_heap, active, cancelling, messages, store} => |
179 (fn {manager, timeout_heap, active, cancelling, messages, store} => |
238 let |
180 let |
239 val killing = map (fn (th, (tb, _, desc)) => (th, (tb, Time.now (), desc))) active; |
181 val killing = map (fn (th, (_, _, desc)) => (th, (Time.now (), desc))) active; |
240 val state' = |
182 val state' = make_state manager timeout_heap [] (killing @ cancelling) messages store; |
241 make_state manager timeout_heap oldest_heap [] (killing @ cancelling) messages store; |
|
242 in state' end); |
183 in state' end); |
243 |
184 |
244 |
185 |
245 (* ATP info *) |
186 (* info *) |
246 |
187 |
247 fun seconds time = string_of_int (Time.toSeconds time) ^ "s"; |
188 fun seconds time = string_of_int (Time.toSeconds time) ^ "s"; |
248 |
189 |
249 fun info () = |
190 fun info () = |
250 let |
191 let |
251 val {active, cancelling, ...} = Synchronized.value global_state; |
192 val {active, cancelling, ...} = Synchronized.value global_state; |
252 |
193 |
253 val now = Time.now (); |
194 val now = Time.now (); |
254 fun running_info (_, (birth_time, dead_time, desc)) = |
195 fun running_info (_, (birth_time, death_time, desc)) = |
255 "Running: " ^ seconds (Time.- (now, birth_time)) ^ " -- " ^ |
196 "Running: " ^ seconds (Time.- (now, birth_time)) ^ " -- " ^ |
256 seconds (Time.- (dead_time, now)) ^ " to live:\n" ^ desc; |
197 seconds (Time.- (death_time, now)) ^ " to live:\n" ^ desc; |
257 fun cancelling_info (_, (_, dead_time, desc)) = |
198 fun cancelling_info (_, (deadth_time, desc)) = |
258 "Trying to interrupt thread since " ^ seconds (Time.- (now, dead_time)) ^ ":\n" ^ desc; |
199 "Trying to interrupt thread since " ^ seconds (Time.- (now, deadth_time)) ^ ":\n" ^ desc; |
259 |
200 |
260 val running = |
201 val running = |
261 if null active then "No ATPs running." |
202 if null active then "No ATPs running." |
262 else space_implode "\n\n" ("Running ATPs:" :: map running_info active); |
203 else space_implode "\n\n" ("Running ATPs:" :: map running_info active); |
263 val interrupting = |
204 val interrupting = |