included managing_thread in state of AtpManager:
authorimmler@in.tum.de
Tue, 31 Mar 2009 22:23:40 +0200
changeset 30830 263064c4d0c3
parent 30827 fe4331fb3806
child 30831 7c6e1843fda5
included managing_thread in state of AtpManager: synchronized termination and check for running managing_thread
src/HOL/Tools/atp_manager.ML
--- a/src/HOL/Tools/atp_manager.ML	Tue Mar 31 21:39:56 2009 +0200
+++ b/src/HOL/Tools/atp_manager.ML	Tue Mar 31 22:23:40 2009 +0200
@@ -85,36 +85,25 @@
 (* state of thread manager *)
 
 datatype T = State of
- {timeout_heap: ThreadHeap.T,
+ {managing_thread: Thread.thread option,
+  timeout_heap: ThreadHeap.T,
   oldest_heap: ThreadHeap.T,
   active: (Thread.thread * (Time.time * Time.time * string)) list,
   cancelling: (Thread.thread * (Time.time * Time.time * string)) list,
   messages: string list,
   store: string list};
 
-fun make_state timeout_heap oldest_heap active cancelling messages store =
-  State {timeout_heap = timeout_heap, oldest_heap = oldest_heap,
+fun make_state managing_thread timeout_heap oldest_heap active cancelling messages store =
+  State {managing_thread = managing_thread, timeout_heap = timeout_heap, oldest_heap = oldest_heap,
     active = active, cancelling = cancelling, messages = messages, store = store};
 
-fun empty_state state =
-  let
-    val State {active = active, cancelling = cancelling, messages = messages, ...} =
-      Synchronized.value state
-  in (null active) andalso (null cancelling) andalso (null messages) end;
-
-val state = Synchronized.var "atp_manager" (make_state ThreadHeap.empty ThreadHeap.empty [] [] [] []);
-
-
-(* the managing thread *)
-
-(*watches over running threads and interrupts them if required*)
-val managing_thread = ref (NONE: Thread.thread option);
-
+val state = Synchronized.var "atp_manager"
+  (make_state NONE ThreadHeap.empty ThreadHeap.empty [] [] [] []);
 
 (* unregister thread *)
 
 fun unregister (success, message) thread = Synchronized.change state
-  (fn state as State {timeout_heap, oldest_heap, active, cancelling, messages, store} =>
+  (fn state as State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} =>
     (case lookup_thread active thread of
       SOME (birthtime, _, description) =>
         let
@@ -132,7 +121,9 @@
           val store' = message' ::
             (if length store <= message_store_limit then store
              else #1 (chop message_store_limit store))
-        in make_state timeout_heap oldest_heap active' cancelling' (message' :: messages) store' end
+        in make_state
+          managing_thread timeout_heap oldest_heap active' cancelling' (message' :: messages) store'
+        end
     | NONE => state));
 
 
@@ -147,12 +138,13 @@
 fun kill_oldest () =
   let exception Unchanged in
     Synchronized.change_result state
-      (fn State {timeout_heap, oldest_heap, active, cancelling, messages, store} =>
+      (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} =>
         if ThreadHeap.is_empty oldest_heap orelse not (excessive_atps active)
         then raise Unchanged
         else
           let val ((_, oldest_thread), oldest_heap') = ThreadHeap.min_elem oldest_heap
-          in (oldest_thread, make_state timeout_heap oldest_heap' active cancelling messages store) end)
+          in (oldest_thread,
+          make_state managing_thread timeout_heap oldest_heap' active cancelling messages store) end)
       |> unregister (false, "Interrupted (maximum number of ATPs exceeded)")
     handle Unchanged => ()
   end;
@@ -167,8 +159,8 @@
 
 fun print_new_messages () =
   let val to_print = Synchronized.change_result state
-    (fn State {timeout_heap, oldest_heap, active, cancelling, messages, store} =>
-      (messages, make_state timeout_heap oldest_heap active cancelling [] store))
+    (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} =>
+      (messages, make_state managing_thread timeout_heap oldest_heap active cancelling [] store))
   in
     if null to_print then ()
     else priority ("Sledgehammer: " ^ space_implode "\n\n" to_print)
@@ -177,55 +169,66 @@
 
 (* start a watching thread -- only one may exist *)
 
-fun check_thread_manager () = CRITICAL (fn () =>
-  if (case ! managing_thread of SOME thread => Thread.isActive thread | NONE => false)
-  then () else managing_thread := SOME (SimpleThread.fork false (fn () =>
-    let
-      val min_wait_time = Time.fromMilliseconds 300
-      val max_wait_time = Time.fromSeconds 10
+fun check_thread_manager () = Synchronized.change state
+  (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} =>
+    if (case managing_thread of SOME thread => Thread.isActive thread | NONE => false)
+    then make_state managing_thread timeout_heap oldest_heap active cancelling messages store
+    else let val managing_thread = SOME (SimpleThread.fork false (fn () =>
+      let
+        val min_wait_time = Time.fromMilliseconds 300
+        val max_wait_time = Time.fromSeconds 10
 
-      (* wait for next thread to cancel, or maximum*)
-      fun time_limit (State {timeout_heap, ...}) =
-        (case try ThreadHeap.min timeout_heap of
-          NONE => SOME (Time.+ (Time.now (), max_wait_time))
-        | SOME (time, _) => SOME time)
+        (* wait for next thread to cancel, or maximum*)
+        fun time_limit (State {timeout_heap, ...}) =
+          (case try ThreadHeap.min timeout_heap of
+            NONE => SOME (Time.+ (Time.now (), max_wait_time))
+          | SOME (time, _) => SOME time)
 
-      (* action: find threads whose timeout is reached, and interrupt cancelling threads *)
-      fun action (State {timeout_heap, oldest_heap, active, cancelling, messages, store}) =
-        let val (timeout_threads, timeout_heap') =
-          ThreadHeap.upto (Time.now (), Thread.self ()) timeout_heap
-        in
-          if null timeout_threads andalso null cancelling andalso not (excessive_atps active)
-          then NONE
-          else
-            let
-              val _ = List.app (SimpleThread.interrupt o #1) cancelling
-              val cancelling' = filter (Thread.isActive o #1) cancelling
-              val state' = make_state timeout_heap' oldest_heap active cancelling' messages store
-            in SOME (map #2 timeout_threads, state') end
-        end
-    in
-      while not (empty_state state) do
-       (Synchronized.timed_access state time_limit action
-        |> these
-        |> List.app (unregister (false, "Interrupted (reached timeout)"));
-        kill_excessive ();
-        print_new_messages ();
-        (*give threads time to respond to interrupt*)
-        OS.Process.sleep min_wait_time)
-    end)));
+        (* action: find threads whose timeout is reached, and interrupt cancelling threads *)
+        fun action (State {managing_thread, timeout_heap, oldest_heap, active, cancelling,
+                           messages, store}) =
+          let val (timeout_threads, timeout_heap') =
+            ThreadHeap.upto (Time.now (), Thread.self ()) timeout_heap
+          in
+            if null timeout_threads andalso null cancelling andalso not (excessive_atps active)
+            then NONE
+            else
+              let
+                val _ = List.app (SimpleThread.interrupt o #1) cancelling
+                val cancelling' = filter (Thread.isActive o #1) cancelling
+                val state' = make_state
+                  managing_thread timeout_heap' oldest_heap active cancelling' messages store
+              in SOME (map #2 timeout_threads, state') end
+          end
+      in
+        while Synchronized.change_result state
+          (fn st as
+            State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} =>
+            if (null active) andalso (null cancelling) andalso (null messages)
+            then (false, make_state NONE timeout_heap oldest_heap active cancelling messages store)
+            else (true, st))
+        do
+          (Synchronized.timed_access state time_limit action
+            |> these
+            |> List.app (unregister (false, "Interrupted (reached timeout)"));
+            kill_excessive ();
+            print_new_messages ();
+            (*give threads time to respond to interrupt*)
+            OS.Process.sleep min_wait_time)
+      end))
+    in make_state managing_thread timeout_heap oldest_heap active cancelling messages store end);
 
 
 (* thread is registered here by sledgehammer *)
 
 fun register birthtime deadtime (thread, desc) =
  (Synchronized.change state
-    (fn State {timeout_heap, oldest_heap, active, cancelling, messages, store} =>
+    (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} =>
       let
         val timeout_heap' = ThreadHeap.insert (deadtime, thread) timeout_heap
         val oldest_heap' = ThreadHeap.insert (birthtime, thread) oldest_heap
         val active' = update_thread (thread, (birthtime, deadtime, desc)) active
-      in make_state timeout_heap' oldest_heap' active' cancelling messages store end);
+      in make_state managing_thread timeout_heap' oldest_heap' active' cancelling messages store end);
   check_thread_manager ());
 
 
@@ -235,9 +238,11 @@
 (* kill: move all threads to cancelling *)
 
 fun kill () = Synchronized.change state
-  (fn State {timeout_heap, oldest_heap, active, cancelling, messages, store} =>
+  (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} =>
     let val formerly_active = map (fn (th, (tb, _, desc)) => (th, (tb, Time.now (), desc))) active
-    in make_state timeout_heap oldest_heap [] (formerly_active @ cancelling) messages store end);
+    in make_state
+      managing_thread timeout_heap oldest_heap [] (formerly_active @ cancelling) messages store
+    end);
 
 
 (* ATP info *)