src/HOL/Tools/ATP/async_manager.ML
changeset 39013 c79e6d536267
parent 38978 4bf80c23320e
parent 39012 96d97d1c676f
child 39014 e820beaf7d8c
equal deleted inserted replaced
38978:4bf80c23320e 39013:c79e6d536267
     1 (*  Title:      HOL/Tools/ATP/async_manager.ML
       
     2     Author:     Fabian Immler, TU Muenchen
       
     3     Author:     Makarius
       
     4     Author:     Jasmin Blanchette, TU Muenchen
       
     5 
       
     6 Central manager for asynchronous diagnosis tool threads.
       
     7 *)
       
     8 
       
     9 signature ASYNC_MANAGER =
       
    10 sig
       
    11   val launch :
       
    12     string -> bool -> Time.time -> Time.time -> string -> (unit -> string)
       
    13     -> unit
       
    14   val kill_threads : string -> string -> unit
       
    15   val running_threads : string -> string -> unit
       
    16   val thread_messages : string -> string -> int option -> unit
       
    17 end;
       
    18 
       
    19 structure Async_Manager : ASYNC_MANAGER =
       
    20 struct
       
    21 
       
    22 (** preferences **)
       
    23 
       
    24 val message_store_limit = 20;
       
    25 val message_display_limit = 5;
       
    26 
       
    27 
       
    28 (** thread management **)
       
    29 
       
    30 (* data structures over threads *)
       
    31 
       
    32 structure Thread_Heap = Heap
       
    33 (
       
    34   type elem = Time.time * Thread.thread;
       
    35   fun ord ((a, _), (b, _)) = Time.compare (a, b);
       
    36 );
       
    37 
       
    38 fun lookup_thread xs = AList.lookup Thread.equal xs;
       
    39 fun delete_thread xs = AList.delete Thread.equal xs;
       
    40 fun update_thread xs = AList.update Thread.equal xs;
       
    41 
       
    42 
       
    43 (* state of thread manager *)
       
    44 
       
    45 type state =
       
    46   {manager: Thread.thread option,
       
    47    timeout_heap: Thread_Heap.T,
       
    48    active: (Thread.thread * (string * Time.time * Time.time * string)) list,
       
    49    canceling: (Thread.thread * (string * Time.time * string)) list,
       
    50    messages: (string * string) list,
       
    51    store: (string * string) list}
       
    52 
       
    53 fun make_state manager timeout_heap active canceling messages store : state =
       
    54   {manager = manager, timeout_heap = timeout_heap, active = active,
       
    55    canceling = canceling, messages = messages, store = store}
       
    56 
       
    57 val global_state = Synchronized.var "async_manager"
       
    58   (make_state NONE Thread_Heap.empty [] [] [] []);
       
    59 
       
    60 
       
    61 (* unregister thread *)
       
    62 
       
    63 fun unregister verbose message thread =
       
    64   Synchronized.change global_state
       
    65   (fn state as {manager, timeout_heap, active, canceling, messages, store} =>
       
    66     (case lookup_thread active thread of
       
    67       SOME (tool, birth_time, _, desc) =>
       
    68         let
       
    69           val active' = delete_thread thread active;
       
    70           val now = Time.now ()
       
    71           val canceling' = (thread, (tool, now, desc)) :: canceling
       
    72           val message' =
       
    73             desc ^ "\n" ^ message ^
       
    74             (if verbose then
       
    75                "\nTotal time: " ^ Int.toString (Time.toMilliseconds
       
    76                                             (Time.- (now, birth_time))) ^ " ms."
       
    77              else
       
    78                "")
       
    79           val messages' = (tool, message') :: messages;
       
    80           val store' = (tool, message') ::
       
    81             (if length store <= message_store_limit then store
       
    82              else #1 (chop message_store_limit store));
       
    83         in make_state manager timeout_heap active' canceling' messages' store' end
       
    84     | NONE => state));
       
    85 
       
    86 
       
    87 (* main manager thread -- only one may exist *)
       
    88 
       
    89 val min_wait_time = Time.fromMilliseconds 300;
       
    90 val max_wait_time = Time.fromSeconds 10;
       
    91 
       
    92 fun replace_all bef aft =
       
    93   let
       
    94     fun aux seen "" = String.implode (rev seen)
       
    95       | aux seen s =
       
    96         if String.isPrefix bef s then
       
    97           aux seen "" ^ aft ^ aux [] (unprefix bef s)
       
    98         else
       
    99           aux (String.sub (s, 0) :: seen) (String.extract (s, 1, NONE))
       
   100   in aux [] end
       
   101 
       
   102 (* This is a workaround for Proof General's off-by-a-few sendback display bug,
       
   103    whereby "pr" in "proof" is not highlighted. *)
       
   104 fun break_into_chunks xs =
       
   105   maps (space_explode "\000" o replace_all "\n\n" "\000" o snd) xs
       
   106 
       
   107 fun print_new_messages () =
       
   108   case Synchronized.change_result global_state
       
   109          (fn {manager, timeout_heap, active, canceling, messages, store} =>
       
   110              (messages, make_state manager timeout_heap active canceling []
       
   111                                    store)) of
       
   112     [] => ()
       
   113   | msgs as (tool, _) :: _ =>
       
   114     let val ss = break_into_chunks msgs in
       
   115       List.app priority (tool ^ ": " ^ hd ss :: tl ss)
       
   116     end
       
   117 
       
   118 fun check_thread_manager verbose = Synchronized.change global_state
       
   119   (fn state as {manager, timeout_heap, active, canceling, messages, store} =>
       
   120     if (case manager of SOME thread => Thread.isActive thread | NONE => false) then state
       
   121     else let val manager = SOME (Toplevel.thread false (fn () =>
       
   122       let
       
   123         fun time_limit timeout_heap =
       
   124           (case try Thread_Heap.min timeout_heap of
       
   125             NONE => Time.+ (Time.now (), max_wait_time)
       
   126           | SOME (time, _) => time);
       
   127 
       
   128         (*action: find threads whose timeout is reached, and interrupt canceling threads*)
       
   129         fun action {manager, timeout_heap, active, canceling, messages, store} =
       
   130           let val (timeout_threads, timeout_heap') =
       
   131             Thread_Heap.upto (Time.now (), Thread.self ()) timeout_heap;
       
   132           in
       
   133             if null timeout_threads andalso null canceling then
       
   134               NONE
       
   135             else
       
   136               let
       
   137                 val _ = List.app (Simple_Thread.interrupt o #1) canceling
       
   138                 val canceling' = filter (Thread.isActive o #1) canceling
       
   139                 val state' = make_state manager timeout_heap' active canceling' messages store;
       
   140               in SOME (map #2 timeout_threads, state') end
       
   141           end;
       
   142       in
       
   143         while Synchronized.change_result global_state
       
   144           (fn state as {timeout_heap, active, canceling, messages, store, ...} =>
       
   145             if null active andalso null canceling andalso null messages
       
   146             then (false, make_state NONE timeout_heap active canceling messages store)
       
   147             else (true, state))
       
   148         do
       
   149           (Synchronized.timed_access global_state (SOME o time_limit o #timeout_heap) action
       
   150             |> these
       
   151             |> List.app (unregister verbose "Timed out.\n");
       
   152             print_new_messages ();
       
   153             (*give threads some time to respond to interrupt*)
       
   154             OS.Process.sleep min_wait_time)
       
   155       end))
       
   156     in make_state manager timeout_heap active canceling messages store end)
       
   157 
       
   158 
       
   159 (* register thread *)
       
   160 
       
   161 fun register tool verbose birth_time death_time desc thread =
       
   162  (Synchronized.change global_state
       
   163     (fn {manager, timeout_heap, active, canceling, messages, store} =>
       
   164       let
       
   165         val timeout_heap' = Thread_Heap.insert (death_time, thread) timeout_heap;
       
   166         val active' = update_thread (thread, (tool, birth_time, death_time, desc)) active;
       
   167         val state' = make_state manager timeout_heap' active' canceling messages store;
       
   168       in state' end);
       
   169   check_thread_manager verbose);
       
   170 
       
   171 
       
   172 fun launch tool verbose birth_time death_time desc f =
       
   173   (Toplevel.thread true
       
   174        (fn () =>
       
   175            let
       
   176              val self = Thread.self ()
       
   177              val _ = register tool verbose birth_time death_time desc self
       
   178              val message = f ()
       
   179            in unregister verbose message self end);
       
   180    ())
       
   181 
       
   182 
       
   183 (** user commands **)
       
   184 
       
   185 (* kill threads *)
       
   186 
       
   187 fun kill_threads tool workers = Synchronized.change global_state
       
   188   (fn {manager, timeout_heap, active, canceling, messages, store} =>
       
   189     let
       
   190       val killing =
       
   191         map_filter (fn (th, (tool', _, _, desc)) =>
       
   192                        if tool' = tool then SOME (th, (tool', Time.now (), desc))
       
   193                        else NONE) active
       
   194       val state' = make_state manager timeout_heap [] (killing @ canceling) messages store;
       
   195       val _ = if null killing then () else priority ("Killed active " ^ workers ^ ".")
       
   196     in state' end);
       
   197 
       
   198 
       
   199 (* running threads *)
       
   200 
       
   201 fun seconds time = string_of_int (Time.toSeconds time) ^ " s"
       
   202 
       
   203 fun running_threads tool workers =
       
   204   let
       
   205     val {active, canceling, ...} = Synchronized.value global_state;
       
   206 
       
   207     val now = Time.now ();
       
   208     fun running_info (_, (tool', birth_time, death_time, desc)) =
       
   209       if tool' = tool then
       
   210         SOME ("Running: " ^ seconds (Time.- (now, birth_time)) ^ " -- " ^
       
   211               seconds (Time.- (death_time, now)) ^ " to live:\n" ^ desc)
       
   212       else
       
   213         NONE
       
   214     fun canceling_info (_, (tool', death_time, desc)) =
       
   215       if tool' = tool then
       
   216         SOME ("Trying to interrupt thread since " ^
       
   217               seconds (Time.- (now, death_time)) ^ ":\n" ^ desc)
       
   218       else
       
   219         NONE
       
   220     val running =
       
   221       case map_filter running_info active of
       
   222         [] => ["No " ^ workers ^ " running."]
       
   223       | ss => "Running " ^ workers ^ ":" :: ss
       
   224     val interrupting =
       
   225       case map_filter canceling_info canceling of
       
   226         [] => []
       
   227       | ss => "Trying to interrupt the following " ^ workers ^ ":" :: ss
       
   228   in priority (space_implode "\n\n" (running @ interrupting)) end
       
   229 
       
   230 fun thread_messages tool worker opt_limit =
       
   231   let
       
   232     val limit = the_default message_display_limit opt_limit;
       
   233     val tool_store = Synchronized.value global_state
       
   234                      |> #store |> filter (curry (op =) tool o fst)
       
   235     val header =
       
   236       "Recent " ^ worker ^ " messages" ^
       
   237         (if length tool_store <= limit then ":"
       
   238          else " (" ^ string_of_int limit ^ " displayed):");
       
   239   in List.app priority (header :: break_into_chunks (#1 (chop limit tool_store))) end
       
   240 
       
   241 end;