1 (* Title: HOL/Tools/ATP/async_manager.ML |
|
2 Author: Fabian Immler, TU Muenchen |
|
3 Author: Makarius |
|
4 Author: Jasmin Blanchette, TU Muenchen |
|
5 |
|
6 Central manager for asynchronous diagnosis tool threads. |
|
7 *) |
|
8 |
|
9 signature ASYNC_MANAGER = |
|
10 sig |
|
11 val launch : |
|
12 string -> bool -> Time.time -> Time.time -> string -> (unit -> string) |
|
13 -> unit |
|
14 val kill_threads : string -> string -> unit |
|
15 val running_threads : string -> string -> unit |
|
16 val thread_messages : string -> string -> int option -> unit |
|
17 end; |
|
18 |
|
19 structure Async_Manager : ASYNC_MANAGER = |
|
20 struct |
|
21 |
|
22 (** preferences **) |
|
23 |
|
24 val message_store_limit = 20; |
|
25 val message_display_limit = 5; |
|
26 |
|
27 |
|
28 (** thread management **) |
|
29 |
|
30 (* data structures over threads *) |
|
31 |
|
32 structure Thread_Heap = Heap |
|
33 ( |
|
34 type elem = Time.time * Thread.thread; |
|
35 fun ord ((a, _), (b, _)) = Time.compare (a, b); |
|
36 ); |
|
37 |
|
38 fun lookup_thread xs = AList.lookup Thread.equal xs; |
|
39 fun delete_thread xs = AList.delete Thread.equal xs; |
|
40 fun update_thread xs = AList.update Thread.equal xs; |
|
41 |
|
42 |
|
43 (* state of thread manager *) |
|
44 |
|
45 type state = |
|
46 {manager: Thread.thread option, |
|
47 timeout_heap: Thread_Heap.T, |
|
48 active: (Thread.thread * (string * Time.time * Time.time * string)) list, |
|
49 canceling: (Thread.thread * (string * Time.time * string)) list, |
|
50 messages: (string * string) list, |
|
51 store: (string * string) list} |
|
52 |
|
53 fun make_state manager timeout_heap active canceling messages store : state = |
|
54 {manager = manager, timeout_heap = timeout_heap, active = active, |
|
55 canceling = canceling, messages = messages, store = store} |
|
56 |
|
57 val global_state = Synchronized.var "async_manager" |
|
58 (make_state NONE Thread_Heap.empty [] [] [] []); |
|
59 |
|
60 |
|
61 (* unregister thread *) |
|
62 |
|
63 fun unregister verbose message thread = |
|
64 Synchronized.change global_state |
|
65 (fn state as {manager, timeout_heap, active, canceling, messages, store} => |
|
66 (case lookup_thread active thread of |
|
67 SOME (tool, birth_time, _, desc) => |
|
68 let |
|
69 val active' = delete_thread thread active; |
|
70 val now = Time.now () |
|
71 val canceling' = (thread, (tool, now, desc)) :: canceling |
|
72 val message' = |
|
73 desc ^ "\n" ^ message ^ |
|
74 (if verbose then |
|
75 "\nTotal time: " ^ Int.toString (Time.toMilliseconds |
|
76 (Time.- (now, birth_time))) ^ " ms." |
|
77 else |
|
78 "") |
|
79 val messages' = (tool, message') :: messages; |
|
80 val store' = (tool, message') :: |
|
81 (if length store <= message_store_limit then store |
|
82 else #1 (chop message_store_limit store)); |
|
83 in make_state manager timeout_heap active' canceling' messages' store' end |
|
84 | NONE => state)); |
|
85 |
|
86 |
|
87 (* main manager thread -- only one may exist *) |
|
88 |
|
89 val min_wait_time = Time.fromMilliseconds 300; |
|
90 val max_wait_time = Time.fromSeconds 10; |
|
91 |
|
92 fun replace_all bef aft = |
|
93 let |
|
94 fun aux seen "" = String.implode (rev seen) |
|
95 | aux seen s = |
|
96 if String.isPrefix bef s then |
|
97 aux seen "" ^ aft ^ aux [] (unprefix bef s) |
|
98 else |
|
99 aux (String.sub (s, 0) :: seen) (String.extract (s, 1, NONE)) |
|
100 in aux [] end |
|
101 |
|
102 (* This is a workaround for Proof General's off-by-a-few sendback display bug, |
|
103 whereby "pr" in "proof" is not highlighted. *) |
|
104 fun break_into_chunks xs = |
|
105 maps (space_explode "\000" o replace_all "\n\n" "\000" o snd) xs |
|
106 |
|
107 fun print_new_messages () = |
|
108 case Synchronized.change_result global_state |
|
109 (fn {manager, timeout_heap, active, canceling, messages, store} => |
|
110 (messages, make_state manager timeout_heap active canceling [] |
|
111 store)) of |
|
112 [] => () |
|
113 | msgs as (tool, _) :: _ => |
|
114 let val ss = break_into_chunks msgs in |
|
115 List.app priority (tool ^ ": " ^ hd ss :: tl ss) |
|
116 end |
|
117 |
|
118 fun check_thread_manager verbose = Synchronized.change global_state |
|
119 (fn state as {manager, timeout_heap, active, canceling, messages, store} => |
|
120 if (case manager of SOME thread => Thread.isActive thread | NONE => false) then state |
|
121 else let val manager = SOME (Toplevel.thread false (fn () => |
|
122 let |
|
123 fun time_limit timeout_heap = |
|
124 (case try Thread_Heap.min timeout_heap of |
|
125 NONE => Time.+ (Time.now (), max_wait_time) |
|
126 | SOME (time, _) => time); |
|
127 |
|
128 (*action: find threads whose timeout is reached, and interrupt canceling threads*) |
|
129 fun action {manager, timeout_heap, active, canceling, messages, store} = |
|
130 let val (timeout_threads, timeout_heap') = |
|
131 Thread_Heap.upto (Time.now (), Thread.self ()) timeout_heap; |
|
132 in |
|
133 if null timeout_threads andalso null canceling then |
|
134 NONE |
|
135 else |
|
136 let |
|
137 val _ = List.app (Simple_Thread.interrupt o #1) canceling |
|
138 val canceling' = filter (Thread.isActive o #1) canceling |
|
139 val state' = make_state manager timeout_heap' active canceling' messages store; |
|
140 in SOME (map #2 timeout_threads, state') end |
|
141 end; |
|
142 in |
|
143 while Synchronized.change_result global_state |
|
144 (fn state as {timeout_heap, active, canceling, messages, store, ...} => |
|
145 if null active andalso null canceling andalso null messages |
|
146 then (false, make_state NONE timeout_heap active canceling messages store) |
|
147 else (true, state)) |
|
148 do |
|
149 (Synchronized.timed_access global_state (SOME o time_limit o #timeout_heap) action |
|
150 |> these |
|
151 |> List.app (unregister verbose "Timed out.\n"); |
|
152 print_new_messages (); |
|
153 (*give threads some time to respond to interrupt*) |
|
154 OS.Process.sleep min_wait_time) |
|
155 end)) |
|
156 in make_state manager timeout_heap active canceling messages store end) |
|
157 |
|
158 |
|
159 (* register thread *) |
|
160 |
|
161 fun register tool verbose birth_time death_time desc thread = |
|
162 (Synchronized.change global_state |
|
163 (fn {manager, timeout_heap, active, canceling, messages, store} => |
|
164 let |
|
165 val timeout_heap' = Thread_Heap.insert (death_time, thread) timeout_heap; |
|
166 val active' = update_thread (thread, (tool, birth_time, death_time, desc)) active; |
|
167 val state' = make_state manager timeout_heap' active' canceling messages store; |
|
168 in state' end); |
|
169 check_thread_manager verbose); |
|
170 |
|
171 |
|
172 fun launch tool verbose birth_time death_time desc f = |
|
173 (Toplevel.thread true |
|
174 (fn () => |
|
175 let |
|
176 val self = Thread.self () |
|
177 val _ = register tool verbose birth_time death_time desc self |
|
178 val message = f () |
|
179 in unregister verbose message self end); |
|
180 ()) |
|
181 |
|
182 |
|
183 (** user commands **) |
|
184 |
|
185 (* kill threads *) |
|
186 |
|
187 fun kill_threads tool workers = Synchronized.change global_state |
|
188 (fn {manager, timeout_heap, active, canceling, messages, store} => |
|
189 let |
|
190 val killing = |
|
191 map_filter (fn (th, (tool', _, _, desc)) => |
|
192 if tool' = tool then SOME (th, (tool', Time.now (), desc)) |
|
193 else NONE) active |
|
194 val state' = make_state manager timeout_heap [] (killing @ canceling) messages store; |
|
195 val _ = if null killing then () else priority ("Killed active " ^ workers ^ ".") |
|
196 in state' end); |
|
197 |
|
198 |
|
199 (* running threads *) |
|
200 |
|
201 fun seconds time = string_of_int (Time.toSeconds time) ^ " s" |
|
202 |
|
203 fun running_threads tool workers = |
|
204 let |
|
205 val {active, canceling, ...} = Synchronized.value global_state; |
|
206 |
|
207 val now = Time.now (); |
|
208 fun running_info (_, (tool', birth_time, death_time, desc)) = |
|
209 if tool' = tool then |
|
210 SOME ("Running: " ^ seconds (Time.- (now, birth_time)) ^ " -- " ^ |
|
211 seconds (Time.- (death_time, now)) ^ " to live:\n" ^ desc) |
|
212 else |
|
213 NONE |
|
214 fun canceling_info (_, (tool', death_time, desc)) = |
|
215 if tool' = tool then |
|
216 SOME ("Trying to interrupt thread since " ^ |
|
217 seconds (Time.- (now, death_time)) ^ ":\n" ^ desc) |
|
218 else |
|
219 NONE |
|
220 val running = |
|
221 case map_filter running_info active of |
|
222 [] => ["No " ^ workers ^ " running."] |
|
223 | ss => "Running " ^ workers ^ ":" :: ss |
|
224 val interrupting = |
|
225 case map_filter canceling_info canceling of |
|
226 [] => [] |
|
227 | ss => "Trying to interrupt the following " ^ workers ^ ":" :: ss |
|
228 in priority (space_implode "\n\n" (running @ interrupting)) end |
|
229 |
|
230 fun thread_messages tool worker opt_limit = |
|
231 let |
|
232 val limit = the_default message_display_limit opt_limit; |
|
233 val tool_store = Synchronized.value global_state |
|
234 |> #store |> filter (curry (op =) tool o fst) |
|
235 val header = |
|
236 "Recent " ^ worker ^ " messages" ^ |
|
237 (if length tool_store <= limit then ":" |
|
238 else " (" ^ string_of_int limit ^ " displayed):"); |
|
239 in List.app priority (header :: break_into_chunks (#1 (chop limit tool_store))) end |
|
240 |
|
241 end; |
|