# HG changeset patch # User wenzelm # Date 1257425876 -3600 # Node ID e87ce1a03a11ef244f1cbbfbb4ec275d9d353cda # Parent c8bc8dc5869ffe574f00835a06f8e39130ec9610# Parent 13d00799fe49d8f8972235f27cb219865237275b merged diff -r c8bc8dc5869f -r e87ce1a03a11 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Nov 04 17:17:30 2009 +0100 +++ b/src/Pure/Concurrent/future.ML Thu Nov 05 13:57:56 2009 +0100 @@ -64,7 +64,7 @@ type group = Task_Queue.group; local - val tag = Universal.tag () : (string * task * group) option Universal.tag; + val tag = Universal.tag () : (task * group) option Universal.tag; in fun thread_data () = the_default NONE (Thread.getLocal tag); fun setmp_thread_data data f x = @@ -72,8 +72,8 @@ end; val is_worker = is_some o thread_data; -val worker_task = Option.map #2 o thread_data; -val worker_group = Option.map #3 o thread_data; +val worker_task = Option.map #1 o thread_data; +val worker_group = Option.map #2 o thread_data; (* datatype future *) @@ -99,17 +99,6 @@ (** scheduling **) -(* global state *) - -val queue = Unsynchronized.ref Task_Queue.empty; -val next = Unsynchronized.ref 0; -val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list); -val scheduler = Unsynchronized.ref (NONE: Thread.thread option); -val excessive = Unsynchronized.ref 0; -val canceled = Unsynchronized.ref ([]: Task_Queue.group list); -val do_shutdown = Unsynchronized.ref false; - - (* synchronization *) val scheduler_event = ConditionVar.conditionVar (); @@ -141,6 +130,24 @@ end; +(* global state *) + +val queue = Unsynchronized.ref Task_Queue.empty; +val next = Unsynchronized.ref 0; +val scheduler = Unsynchronized.ref (NONE: Thread.thread option); +val canceled = Unsynchronized.ref ([]: Task_Queue.group list); +val do_shutdown = Unsynchronized.ref false; +val max_workers = Unsynchronized.ref 0; +val max_active = Unsynchronized.ref 0; +val worker_trend = Unsynchronized.ref 0; + +datatype worker_state = Working | Waiting | Sleeping; +val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); + +fun count_workers state = (*requires SYNCHRONIZED*) + fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; + + (* execute future jobs *) fun future_job group (e: unit -> 'a) = @@ -165,10 +172,10 @@ (Unsynchronized.change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event); -fun execute name (task, group, jobs) = +fun execute (task, group, jobs) = let val valid = not (Task_Queue.is_canceled group); - val ok = setmp_thread_data (name, task, group) (fn () => + val ok = setmp_thread_data (task, group) (fn () => fold (fn job => fn ok => job valid andalso ok) jobs true) (); val _ = SYNCHRONIZED "finish" (fn () => let @@ -178,99 +185,134 @@ else if Task_Queue.cancel (! queue) group then () else do_cancel group; val _ = broadcast work_finished; - val _ = if maximal then () else broadcast work_available; + val _ = if maximal then () else signal work_available; in () end); in () end; -(* worker activity *) - -fun count_active () = (*requires SYNCHRONIZED*) - fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0; - -fun change_active active = (*requires SYNCHRONIZED*) - Unsynchronized.change workers - (AList.update Thread.equal (Thread.self (), active)); - - (* worker threads *) -fun worker_wait cond = (*requires SYNCHRONIZED*) - (change_active false; wait cond; change_active true); +fun worker_wait active cond = (*requires SYNCHRONIZED*) + let + val state = + (case AList.lookup Thread.equal (! workers) (Thread.self ()) of + SOME state => state + | NONE => raise Fail "Unregistered worker thread"); + val _ = state := (if active then Waiting else Sleeping); + val _ = wait cond; + val _ = state := Working; + in () end; fun worker_next () = (*requires SYNCHRONIZED*) - if ! excessive > 0 then - (Unsynchronized.dec excessive; - Unsynchronized.change workers - (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); - broadcast scheduler_event; + if length (! workers) > ! max_workers then + (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); + signal work_available; NONE) - else if count_active () > Multithreading.max_threads_value () then - (worker_wait scheduler_event; worker_next ()) + else if count_workers Working > ! max_active then + (worker_wait false work_available; worker_next ()) else (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of - NONE => (worker_wait work_available; worker_next ()) - | some => some); + NONE => (worker_wait false work_available; worker_next ()) + | some => (signal work_available; some)); fun worker_loop name = (case SYNCHRONIZED name (fn () => worker_next ()) of NONE => () - | SOME work => (execute name work; worker_loop name)); + | SOME work => (execute work; worker_loop name)); fun worker_start name = (*requires SYNCHRONIZED*) - Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => - (broadcast scheduler_event; worker_loop name)), true)); + Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name), + Unsynchronized.ref Working)); (* scheduler *) -val last_status = Unsynchronized.ref Time.zeroTime; -val next_status = Time.fromMilliseconds 500; +val status_ticks = Unsynchronized.ref 0; + +val last_round = Unsynchronized.ref Time.zeroTime; val next_round = Time.fromMilliseconds 50; fun scheduler_next () = (*requires SYNCHRONIZED*) let - (*queue and worker status*) + val now = Time.now (); + val tick = Time.<= (Time.+ (! last_round, next_round), now); + val _ = if tick then last_round := now else (); + + + (* queue and worker status *) + + val _ = + if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); val _ = - let val now = Time.now () in - if Time.> (Time.+ (! last_status, next_status), now) then () - else - (last_status := now; Multithreading.tracing 1 (fn () => - let - val {ready, pending, running} = Task_Queue.status (! queue); - val total = length (! workers); - val active = count_active (); - in - "SCHEDULE " ^ Time.toString now ^ ": " ^ - string_of_int ready ^ " ready, " ^ - string_of_int pending ^ " pending, " ^ - string_of_int running ^ " running; " ^ - string_of_int total ^ " workers, " ^ - string_of_int active ^ " active" - end)) - end; + if tick andalso ! status_ticks = 0 then + Multithreading.tracing 1 (fn () => + let + val {ready, pending, running} = Task_Queue.status (! queue); + val total = length (! workers); + val active = count_workers Working; + val waiting = count_workers Waiting; + in + "SCHEDULE " ^ Time.toString now ^ ": " ^ + string_of_int ready ^ " ready, " ^ + string_of_int pending ^ " pending, " ^ + string_of_int running ^ " running; " ^ + string_of_int total ^ " workers, " ^ + string_of_int active ^ " active, " ^ + string_of_int waiting ^ " waiting " + end) + else (); - (*worker threads*) val _ = if forall (Thread.isActive o #1) (! workers) then () else - (case List.partition (Thread.isActive o #1) (! workers) of - (_, []) => () - | (alive, dead) => - (workers := alive; Multithreading.tracing 0 (fn () => - "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads"))); + let + val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); + val _ = workers := alive; + in + Multithreading.tracing 0 (fn () => + "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads") + end; + + + (* worker pool adjustments *) + + val max_active0 = ! max_active; + val max_workers0 = ! max_workers; val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); - val mm = if m = 9999 then 1 else m * 2; - val l = length (! workers); - val _ = excessive := l - mm; + val _ = max_active := m; + + val mm = + if ! do_shutdown then 0 + else if m = 9999 then 1 + else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m); val _ = - if mm > l then - funpow (mm - l) (fn () => - worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) () + if tick andalso mm > ! max_workers then + Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1) + else if tick andalso mm < ! max_workers then + Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) + else (); + val _ = + if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then + max_workers := mm + else if ! worker_trend > 5 andalso ! max_workers < 2 * m then + max_workers := Int.min (mm, 2 * m) else (); - (*canceled groups*) + val missing = ! max_workers - length (! workers); + val _ = + if missing > 0 then + funpow missing (fn () => + ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () + else (); + + val _ = + if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () + else signal work_available; + + + (* canceled groups *) + val _ = if null (! canceled) then () else @@ -279,24 +321,30 @@ Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ()); - (*delay loop*) + + (* delay loop *) + val _ = Exn.release (wait_timeout next_round scheduler_event); - (*shutdown*) + + (* shutdown *) + val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else (); val continue = not (! do_shutdown andalso null (! workers)); val _ = if continue then () else scheduler := NONE; + val _ = broadcast scheduler_event; in continue end handle Exn.Interrupt => (Multithreading.tracing 1 (fn () => "Interrupt"); - uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) (); - scheduler_next ()); + List.app do_cancel (Task_Queue.cancel_all (! queue)); true); fun scheduler_loop () = - Multithreading.with_attributes - (Multithreading.sync_interrupts Multithreading.public_interrupts) - (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ()); + while + Multithreading.with_attributes + (Multithreading.sync_interrupts Multithreading.public_interrupts) + (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) + do (); fun scheduler_active () = (*requires SYNCHRONIZED*) (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); @@ -346,7 +394,7 @@ Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) | SOME res => res); -fun join_wait x = +fun passive_wait x = Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ()); fun join_next deps = (*requires SYNCHRONIZED*) @@ -354,11 +402,11 @@ else (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of (NONE, []) => NONE - | (NONE, deps') => (worker_wait work_finished; join_next deps') + | (NONE, deps') => (worker_wait true work_finished; join_next deps') | (SOME work, deps') => SOME (work, deps')); fun execute_work NONE = () - | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps') + | execute_work (SOME (work, deps')) = (execute work; join_work deps') and join_work deps = execute_work (SYNCHRONIZED "join" (fn () => join_next deps)); @@ -375,7 +423,7 @@ else (case worker_task () of SOME task => join_depend task (map task_of xs) - | NONE => List.app join_wait xs; + | NONE => List.app passive_wait xs; map get_result xs); end; diff -r c8bc8dc5869f -r e87ce1a03a11 src/Pure/proofterm.ML