# HG changeset patch # User immler@in.tum.de # Date 1238531020 -7200 # Node ID 263064c4d0c39040d9591026d1babe1561ecaba6 # Parent fe4331fb3806a8d7f79d139c0e61773d5480235a included managing_thread in state of AtpManager: synchronized termination and check for running managing_thread diff -r fe4331fb3806 -r 263064c4d0c3 src/HOL/Tools/atp_manager.ML --- a/src/HOL/Tools/atp_manager.ML Tue Mar 31 21:39:56 2009 +0200 +++ b/src/HOL/Tools/atp_manager.ML Tue Mar 31 22:23:40 2009 +0200 @@ -85,36 +85,25 @@ (* state of thread manager *) datatype T = State of - {timeout_heap: ThreadHeap.T, + {managing_thread: Thread.thread option, + timeout_heap: ThreadHeap.T, oldest_heap: ThreadHeap.T, active: (Thread.thread * (Time.time * Time.time * string)) list, cancelling: (Thread.thread * (Time.time * Time.time * string)) list, messages: string list, store: string list}; -fun make_state timeout_heap oldest_heap active cancelling messages store = - State {timeout_heap = timeout_heap, oldest_heap = oldest_heap, +fun make_state managing_thread timeout_heap oldest_heap active cancelling messages store = + State {managing_thread = managing_thread, timeout_heap = timeout_heap, oldest_heap = oldest_heap, active = active, cancelling = cancelling, messages = messages, store = store}; -fun empty_state state = - let - val State {active = active, cancelling = cancelling, messages = messages, ...} = - Synchronized.value state - in (null active) andalso (null cancelling) andalso (null messages) end; - -val state = Synchronized.var "atp_manager" (make_state ThreadHeap.empty ThreadHeap.empty [] [] [] []); - - -(* the managing thread *) - -(*watches over running threads and interrupts them if required*) -val managing_thread = ref (NONE: Thread.thread option); - +val state = Synchronized.var "atp_manager" + (make_state NONE ThreadHeap.empty ThreadHeap.empty [] [] [] []); (* unregister thread *) fun unregister (success, message) thread = Synchronized.change state - (fn state as State {timeout_heap, oldest_heap, active, cancelling, messages, store} => + (fn state as State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} => (case lookup_thread active thread of SOME (birthtime, _, description) => let @@ -132,7 +121,9 @@ val store' = message' :: (if length store <= message_store_limit then store else #1 (chop message_store_limit store)) - in make_state timeout_heap oldest_heap active' cancelling' (message' :: messages) store' end + in make_state + managing_thread timeout_heap oldest_heap active' cancelling' (message' :: messages) store' + end | NONE => state)); @@ -147,12 +138,13 @@ fun kill_oldest () = let exception Unchanged in Synchronized.change_result state - (fn State {timeout_heap, oldest_heap, active, cancelling, messages, store} => + (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} => if ThreadHeap.is_empty oldest_heap orelse not (excessive_atps active) then raise Unchanged else let val ((_, oldest_thread), oldest_heap') = ThreadHeap.min_elem oldest_heap - in (oldest_thread, make_state timeout_heap oldest_heap' active cancelling messages store) end) + in (oldest_thread, + make_state managing_thread timeout_heap oldest_heap' active cancelling messages store) end) |> unregister (false, "Interrupted (maximum number of ATPs exceeded)") handle Unchanged => () end; @@ -167,8 +159,8 @@ fun print_new_messages () = let val to_print = Synchronized.change_result state - (fn State {timeout_heap, oldest_heap, active, cancelling, messages, store} => - (messages, make_state timeout_heap oldest_heap active cancelling [] store)) + (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} => + (messages, make_state managing_thread timeout_heap oldest_heap active cancelling [] store)) in if null to_print then () else priority ("Sledgehammer: " ^ space_implode "\n\n" to_print) @@ -177,55 +169,66 @@ (* start a watching thread -- only one may exist *) -fun check_thread_manager () = CRITICAL (fn () => - if (case ! managing_thread of SOME thread => Thread.isActive thread | NONE => false) - then () else managing_thread := SOME (SimpleThread.fork false (fn () => - let - val min_wait_time = Time.fromMilliseconds 300 - val max_wait_time = Time.fromSeconds 10 +fun check_thread_manager () = Synchronized.change state + (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} => + if (case managing_thread of SOME thread => Thread.isActive thread | NONE => false) + then make_state managing_thread timeout_heap oldest_heap active cancelling messages store + else let val managing_thread = SOME (SimpleThread.fork false (fn () => + let + val min_wait_time = Time.fromMilliseconds 300 + val max_wait_time = Time.fromSeconds 10 - (* wait for next thread to cancel, or maximum*) - fun time_limit (State {timeout_heap, ...}) = - (case try ThreadHeap.min timeout_heap of - NONE => SOME (Time.+ (Time.now (), max_wait_time)) - | SOME (time, _) => SOME time) + (* wait for next thread to cancel, or maximum*) + fun time_limit (State {timeout_heap, ...}) = + (case try ThreadHeap.min timeout_heap of + NONE => SOME (Time.+ (Time.now (), max_wait_time)) + | SOME (time, _) => SOME time) - (* action: find threads whose timeout is reached, and interrupt cancelling threads *) - fun action (State {timeout_heap, oldest_heap, active, cancelling, messages, store}) = - let val (timeout_threads, timeout_heap') = - ThreadHeap.upto (Time.now (), Thread.self ()) timeout_heap - in - if null timeout_threads andalso null cancelling andalso not (excessive_atps active) - then NONE - else - let - val _ = List.app (SimpleThread.interrupt o #1) cancelling - val cancelling' = filter (Thread.isActive o #1) cancelling - val state' = make_state timeout_heap' oldest_heap active cancelling' messages store - in SOME (map #2 timeout_threads, state') end - end - in - while not (empty_state state) do - (Synchronized.timed_access state time_limit action - |> these - |> List.app (unregister (false, "Interrupted (reached timeout)")); - kill_excessive (); - print_new_messages (); - (*give threads time to respond to interrupt*) - OS.Process.sleep min_wait_time) - end))); + (* action: find threads whose timeout is reached, and interrupt cancelling threads *) + fun action (State {managing_thread, timeout_heap, oldest_heap, active, cancelling, + messages, store}) = + let val (timeout_threads, timeout_heap') = + ThreadHeap.upto (Time.now (), Thread.self ()) timeout_heap + in + if null timeout_threads andalso null cancelling andalso not (excessive_atps active) + then NONE + else + let + val _ = List.app (SimpleThread.interrupt o #1) cancelling + val cancelling' = filter (Thread.isActive o #1) cancelling + val state' = make_state + managing_thread timeout_heap' oldest_heap active cancelling' messages store + in SOME (map #2 timeout_threads, state') end + end + in + while Synchronized.change_result state + (fn st as + State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} => + if (null active) andalso (null cancelling) andalso (null messages) + then (false, make_state NONE timeout_heap oldest_heap active cancelling messages store) + else (true, st)) + do + (Synchronized.timed_access state time_limit action + |> these + |> List.app (unregister (false, "Interrupted (reached timeout)")); + kill_excessive (); + print_new_messages (); + (*give threads time to respond to interrupt*) + OS.Process.sleep min_wait_time) + end)) + in make_state managing_thread timeout_heap oldest_heap active cancelling messages store end); (* thread is registered here by sledgehammer *) fun register birthtime deadtime (thread, desc) = (Synchronized.change state - (fn State {timeout_heap, oldest_heap, active, cancelling, messages, store} => + (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} => let val timeout_heap' = ThreadHeap.insert (deadtime, thread) timeout_heap val oldest_heap' = ThreadHeap.insert (birthtime, thread) oldest_heap val active' = update_thread (thread, (birthtime, deadtime, desc)) active - in make_state timeout_heap' oldest_heap' active' cancelling messages store end); + in make_state managing_thread timeout_heap' oldest_heap' active' cancelling messages store end); check_thread_manager ()); @@ -235,9 +238,11 @@ (* kill: move all threads to cancelling *) fun kill () = Synchronized.change state - (fn State {timeout_heap, oldest_heap, active, cancelling, messages, store} => + (fn State {managing_thread, timeout_heap, oldest_heap, active, cancelling, messages, store} => let val formerly_active = map (fn (th, (tb, _, desc)) => (th, (tb, Time.now (), desc))) active - in make_state timeout_heap oldest_heap [] (formerly_active @ cancelling) messages store end); + in make_state + managing_thread timeout_heap oldest_heap [] (formerly_active @ cancelling) messages store + end); (* ATP info *)