multiplexing
authorblanchet
Fri, 25 Jun 2010 23:35:14 +0200
changeset 37585 c2ed8112ce57
parent 37584 2e289dc13615
child 37586 a209fff74792
multiplexing
src/HOL/Tools/ATP_Manager/async_manager.ML
src/HOL/Tools/ATP_Manager/atp_manager.ML
--- a/src/HOL/Tools/ATP_Manager/async_manager.ML	Fri Jun 25 18:34:06 2010 +0200
+++ b/src/HOL/Tools/ATP_Manager/async_manager.ML	Fri Jun 25 23:35:14 2010 +0200
@@ -11,9 +11,9 @@
   val launch :
     string -> bool -> Time.time -> Time.time -> string -> (unit -> string)
     -> unit
-  val kill_threads : string -> unit
-  val running_threads : string -> unit
-  val thread_messages : string -> int option -> unit
+  val kill_threads : string -> string -> unit
+  val running_threads : string -> string -> unit
+  val thread_messages : string -> string -> int option -> unit
 end;
 
 structure Async_Manager : ASYNC_MANAGER =
@@ -43,12 +43,12 @@
 (* state of thread manager *)
 
 type state =
- {manager: Thread.thread option,
-  timeout_heap: Thread_Heap.T,
-  active: (Thread.thread * (Time.time * Time.time * string)) list,
-  canceling: (Thread.thread * (Time.time * string)) list,
-  messages: string list,
-  store: string list};
+  {manager: Thread.thread option,
+   timeout_heap: Thread_Heap.T,
+   active: (Thread.thread * (string * Time.time * Time.time * string)) list,
+   canceling: (Thread.thread * (string * Time.time * string)) list,
+   messages: (string * string) list,
+   store: (string * string) list}
 
 fun make_state manager timeout_heap active canceling messages store : state =
   {manager = manager, timeout_heap = timeout_heap, active = active,
@@ -64,11 +64,11 @@
   Synchronized.change global_state
   (fn state as {manager, timeout_heap, active, canceling, messages, store} =>
     (case lookup_thread active thread of
-      SOME (birth_time, _, desc) =>
+      SOME (tool, birth_time, _, desc) =>
         let
           val active' = delete_thread thread active;
           val now = Time.now ()
-          val canceling' = (thread, (now, desc)) :: canceling
+          val canceling' = (thread, (tool, now, desc)) :: canceling
           val message' =
             desc ^ "\n" ^ message ^
             (if verbose then
@@ -76,8 +76,8 @@
                                           (Time.- (now, birth_time))) ^ " ms.\n"
              else
                "")
-          val messages' = message' :: messages;
-          val store' = message' ::
+          val messages' = (tool, message') :: messages;
+          val store' = (tool, message') ::
             (if length store <= message_store_limit then store
              else #1 (chop message_store_limit store));
         in make_state manager timeout_heap active' canceling' messages' store' end
@@ -102,20 +102,20 @@
 (* This is a workaround for Proof General's off-by-a-few sendback display bug,
    whereby "pr" in "proof" is not highlighted. *)
 val break_into_chunks =
-  map (replace_all "\n\n" "\000") #> maps (space_explode "\000")
+  maps (space_explode "\000" o replace_all "\n\n" "\000" o snd)
 
-fun print_new_messages tool =
+fun print_new_messages () =
   case Synchronized.change_result global_state
          (fn {manager, timeout_heap, active, canceling, messages, store} =>
              (messages, make_state manager timeout_heap active canceling []
                                    store)) of
     [] => ()
-  | msgs =>
-    msgs |> break_into_chunks
-         |> (fn msg :: msgs => tool ^ ": " ^ msg :: msgs)
-         |> List.app priority
+  | msgs as (tool, _) :: _ =>
+    let val ss = break_into_chunks msgs in
+      List.app priority (tool ^ ": " ^ hd ss :: tl ss)
+    end
 
-fun check_thread_manager tool verbose = Synchronized.change global_state
+fun check_thread_manager verbose = Synchronized.change global_state
   (fn state as {manager, timeout_heap, active, canceling, messages, store} =>
     if (case manager of SOME thread => Thread.isActive thread | NONE => false) then state
     else let val manager = SOME (Toplevel.thread false (fn () =>
@@ -130,8 +130,8 @@
           let val (timeout_threads, timeout_heap') =
             Thread_Heap.upto (Time.now (), Thread.self ()) timeout_heap;
           in
-            if null timeout_threads andalso null canceling
-            then NONE
+            if null timeout_threads andalso null canceling then
+              NONE
             else
               let
                 val _ = List.app (Simple_Thread.interrupt o #1) canceling
@@ -149,7 +149,7 @@
           (Synchronized.timed_access global_state (SOME o time_limit o #timeout_heap) action
             |> these
             |> List.app (unregister verbose "Timed out.\n");
-            print_new_messages tool;
+            print_new_messages ();
             (*give threads some time to respond to interrupt*)
             OS.Process.sleep min_wait_time)
       end))
@@ -163,10 +163,10 @@
     (fn {manager, timeout_heap, active, canceling, messages, store} =>
       let
         val timeout_heap' = Thread_Heap.insert (death_time, thread) timeout_heap;
-        val active' = update_thread (thread, (birth_time, death_time, desc)) active;
+        val active' = update_thread (thread, (tool, birth_time, death_time, desc)) active;
         val state' = make_state manager timeout_heap' active' canceling messages store;
       in state' end);
-  check_thread_manager tool verbose);
+  check_thread_manager verbose);
 
 
 fun launch tool verbose birth_time death_time desc f =
@@ -184,47 +184,58 @@
 
 (* kill threads *)
 
-fun kill_threads tools = Synchronized.change global_state
+fun kill_threads tool workers = Synchronized.change global_state
   (fn {manager, timeout_heap, active, canceling, messages, store} =>
     let
-      val killing = map (fn (th, (_, _, desc)) => (th, (Time.now (), desc))) active;
+      val killing =
+        map_filter (fn (th, (tool', _, _, desc)) =>
+                       if tool' = tool then SOME (th, (tool', Time.now (), desc))
+                       else NONE) active
       val state' = make_state manager timeout_heap [] (killing @ canceling) messages store;
-      val _ = if null active then () else priority ("Killed active " ^ tools ^ ".")
+      val _ = if null killing then () else priority ("Killed active " ^ workers ^ ".")
     in state' end);
 
 
 (* running threads *)
 
-fun seconds time = string_of_int (Time.toSeconds time) ^ "s";
+fun seconds time = string_of_int (Time.toSeconds time) ^ " s"
 
-fun running_threads tools =
+fun running_threads tool workers =
   let
     val {active, canceling, ...} = Synchronized.value global_state;
 
     val now = Time.now ();
-    fun running_info (_, (birth_time, death_time, desc)) =
-      "Running: " ^ seconds (Time.- (now, birth_time)) ^ " -- " ^
-        seconds (Time.- (death_time, now)) ^ " to live:\n" ^ desc;
-    fun canceling_info (_, (deadth_time, desc)) =
-      "Trying to interrupt thread since " ^ seconds (Time.- (now, deadth_time)) ^ ":\n" ^ desc;
-
+    fun running_info (_, (tool', birth_time, death_time, desc)) =
+      if tool' = tool then
+        SOME ("Running: " ^ seconds (Time.- (now, birth_time)) ^ " -- " ^
+              seconds (Time.- (death_time, now)) ^ " to live:\n" ^ desc)
+      else
+        NONE
+    fun canceling_info (_, (tool', death_time, desc)) =
+      if tool' = tool then
+        SOME ("Trying to interrupt thread since " ^
+              seconds (Time.- (now, death_time)) ^ ":\n" ^ desc)
+      else
+        NONE
     val running =
-      case map running_info active of
-        [] => ["No " ^ tools ^ " running."]
-      | ss => "Running " ^ tools ^ ":" :: ss
+      case map_filter running_info active of
+        [] => ["No " ^ workers ^ " running."]
+      | ss => "Running " ^ workers ^ ":" :: ss
     val interrupting =
-      case map canceling_info canceling of
+      case map_filter canceling_info canceling of
         [] => []
-      | ss => "Trying to interrupt the following " ^ tools ^ ":" :: ss
+      | ss => "Trying to interrupt the following " ^ workers ^ ":" :: ss
   in priority (space_implode "\n\n" (running @ interrupting)) end
 
-fun thread_messages tool opt_limit =
+fun thread_messages tool worker opt_limit =
   let
     val limit = the_default message_display_limit opt_limit;
-    val {store, ...} = Synchronized.value global_state;
+    val tool_store = Synchronized.value global_state
+                     |> #store |> filter (curry (op =) tool o fst)
     val header =
-      "Recent " ^ tool ^ " messages" ^
-        (if length store <= limit then ":" else " (" ^ string_of_int limit ^ " displayed):");
-  in List.app priority (header :: break_into_chunks (#1 (chop limit store))) end
+      "Recent " ^ worker ^ " messages" ^
+        (if length tool_store <= limit then ":"
+         else " (" ^ string_of_int limit ^ " displayed):");
+  in List.app priority (header :: break_into_chunks (#1 (chop limit tool_store))) end
 
 end;
--- a/src/HOL/Tools/ATP_Manager/atp_manager.ML	Fri Jun 25 18:34:06 2010 +0200
+++ b/src/HOL/Tools/ATP_Manager/atp_manager.ML	Fri Jun 25 23:35:14 2010 +0200
@@ -70,9 +70,11 @@
 
 (** The Sledgehammer **)
 
-fun kill_atps () = Async_Manager.kill_threads "ATPs"
-fun running_atps () = Async_Manager.running_threads "ATPs"
-val messages = Async_Manager.thread_messages "ATP"
+val das_Tool = "Sledgehammer"
+
+fun kill_atps () = Async_Manager.kill_threads das_Tool "ATPs"
+fun running_atps () = Async_Manager.running_threads das_Tool "ATPs"
+val messages = Async_Manager.thread_messages das_Tool "ATP"
 
 (** problems, results, provers, etc. **)
 
@@ -155,7 +157,7 @@
       "ATP " ^ quote name ^ " for subgoal " ^ string_of_int i ^ ":\n" ^
       Syntax.string_of_term ctxt (Thm.term_of (Thm.cprem_of goal i));
   in
-    Async_Manager.launch "Sledgehammer" verbose birth_time death_time desc
+    Async_Manager.launch das_Tool verbose birth_time death_time desc
         (fn () =>
             let
               val problem =