--- a/src/Pure/Concurrent/future.ML Wed Nov 04 17:17:30 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Thu Nov 05 13:57:56 2009 +0100
@@ -64,7 +64,7 @@
type group = Task_Queue.group;
local
- val tag = Universal.tag () : (string * task * group) option Universal.tag;
+ val tag = Universal.tag () : (task * group) option Universal.tag;
in
fun thread_data () = the_default NONE (Thread.getLocal tag);
fun setmp_thread_data data f x =
@@ -72,8 +72,8 @@
end;
val is_worker = is_some o thread_data;
-val worker_task = Option.map #2 o thread_data;
-val worker_group = Option.map #3 o thread_data;
+val worker_task = Option.map #1 o thread_data;
+val worker_group = Option.map #2 o thread_data;
(* datatype future *)
@@ -99,17 +99,6 @@
(** scheduling **)
-(* global state *)
-
-val queue = Unsynchronized.ref Task_Queue.empty;
-val next = Unsynchronized.ref 0;
-val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
-val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
-val excessive = Unsynchronized.ref 0;
-val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
-val do_shutdown = Unsynchronized.ref false;
-
-
(* synchronization *)
val scheduler_event = ConditionVar.conditionVar ();
@@ -141,6 +130,24 @@
end;
+(* global state *)
+
+val queue = Unsynchronized.ref Task_Queue.empty;
+val next = Unsynchronized.ref 0;
+val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
+val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
+val do_shutdown = Unsynchronized.ref false;
+val max_workers = Unsynchronized.ref 0;
+val max_active = Unsynchronized.ref 0;
+val worker_trend = Unsynchronized.ref 0;
+
+datatype worker_state = Working | Waiting | Sleeping;
+val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
+
+fun count_workers state = (*requires SYNCHRONIZED*)
+ fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
+
+
(* execute future jobs *)
fun future_job group (e: unit -> 'a) =
@@ -165,10 +172,10 @@
(Unsynchronized.change canceled (insert Task_Queue.eq_group group);
broadcast scheduler_event);
-fun execute name (task, group, jobs) =
+fun execute (task, group, jobs) =
let
val valid = not (Task_Queue.is_canceled group);
- val ok = setmp_thread_data (name, task, group) (fn () =>
+ val ok = setmp_thread_data (task, group) (fn () =>
fold (fn job => fn ok => job valid andalso ok) jobs true) ();
val _ = SYNCHRONIZED "finish" (fn () =>
let
@@ -178,99 +185,134 @@
else if Task_Queue.cancel (! queue) group then ()
else do_cancel group;
val _ = broadcast work_finished;
- val _ = if maximal then () else broadcast work_available;
+ val _ = if maximal then () else signal work_available;
in () end);
in () end;
-(* worker activity *)
-
-fun count_active () = (*requires SYNCHRONIZED*)
- fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
-
-fun change_active active = (*requires SYNCHRONIZED*)
- Unsynchronized.change workers
- (AList.update Thread.equal (Thread.self (), active));
-
-
(* worker threads *)
-fun worker_wait cond = (*requires SYNCHRONIZED*)
- (change_active false; wait cond; change_active true);
+fun worker_wait active cond = (*requires SYNCHRONIZED*)
+ let
+ val state =
+ (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
+ SOME state => state
+ | NONE => raise Fail "Unregistered worker thread");
+ val _ = state := (if active then Waiting else Sleeping);
+ val _ = wait cond;
+ val _ = state := Working;
+ in () end;
fun worker_next () = (*requires SYNCHRONIZED*)
- if ! excessive > 0 then
- (Unsynchronized.dec excessive;
- Unsynchronized.change workers
- (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
- broadcast scheduler_event;
+ if length (! workers) > ! max_workers then
+ (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
+ signal work_available;
NONE)
- else if count_active () > Multithreading.max_threads_value () then
- (worker_wait scheduler_event; worker_next ())
+ else if count_workers Working > ! max_active then
+ (worker_wait false work_available; worker_next ())
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
- NONE => (worker_wait work_available; worker_next ())
- | some => some);
+ NONE => (worker_wait false work_available; worker_next ())
+ | some => (signal work_available; some));
fun worker_loop name =
(case SYNCHRONIZED name (fn () => worker_next ()) of
NONE => ()
- | SOME work => (execute name work; worker_loop name));
+ | SOME work => (execute work; worker_loop name));
fun worker_start name = (*requires SYNCHRONIZED*)
- Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
- (broadcast scheduler_event; worker_loop name)), true));
+ Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
+ Unsynchronized.ref Working));
(* scheduler *)
-val last_status = Unsynchronized.ref Time.zeroTime;
-val next_status = Time.fromMilliseconds 500;
+val status_ticks = Unsynchronized.ref 0;
+
+val last_round = Unsynchronized.ref Time.zeroTime;
val next_round = Time.fromMilliseconds 50;
fun scheduler_next () = (*requires SYNCHRONIZED*)
let
- (*queue and worker status*)
+ val now = Time.now ();
+ val tick = Time.<= (Time.+ (! last_round, next_round), now);
+ val _ = if tick then last_round := now else ();
+
+
+ (* queue and worker status *)
+
+ val _ =
+ if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
val _ =
- let val now = Time.now () in
- if Time.> (Time.+ (! last_status, next_status), now) then ()
- else
- (last_status := now; Multithreading.tracing 1 (fn () =>
- let
- val {ready, pending, running} = Task_Queue.status (! queue);
- val total = length (! workers);
- val active = count_active ();
- in
- "SCHEDULE " ^ Time.toString now ^ ": " ^
- string_of_int ready ^ " ready, " ^
- string_of_int pending ^ " pending, " ^
- string_of_int running ^ " running; " ^
- string_of_int total ^ " workers, " ^
- string_of_int active ^ " active"
- end))
- end;
+ if tick andalso ! status_ticks = 0 then
+ Multithreading.tracing 1 (fn () =>
+ let
+ val {ready, pending, running} = Task_Queue.status (! queue);
+ val total = length (! workers);
+ val active = count_workers Working;
+ val waiting = count_workers Waiting;
+ in
+ "SCHEDULE " ^ Time.toString now ^ ": " ^
+ string_of_int ready ^ " ready, " ^
+ string_of_int pending ^ " pending, " ^
+ string_of_int running ^ " running; " ^
+ string_of_int total ^ " workers, " ^
+ string_of_int active ^ " active, " ^
+ string_of_int waiting ^ " waiting "
+ end)
+ else ();
- (*worker threads*)
val _ =
if forall (Thread.isActive o #1) (! workers) then ()
else
- (case List.partition (Thread.isActive o #1) (! workers) of
- (_, []) => ()
- | (alive, dead) =>
- (workers := alive; Multithreading.tracing 0 (fn () =>
- "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
+ let
+ val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
+ val _ = workers := alive;
+ in
+ Multithreading.tracing 0 (fn () =>
+ "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
+ end;
+
+
+ (* worker pool adjustments *)
+
+ val max_active0 = ! max_active;
+ val max_workers0 = ! max_workers;
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
- val mm = if m = 9999 then 1 else m * 2;
- val l = length (! workers);
- val _ = excessive := l - mm;
+ val _ = max_active := m;
+
+ val mm =
+ if ! do_shutdown then 0
+ else if m = 9999 then 1
+ else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
val _ =
- if mm > l then
- funpow (mm - l) (fn () =>
- worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
+ if tick andalso mm > ! max_workers then
+ Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
+ else if tick andalso mm < ! max_workers then
+ Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
+ else ();
+ val _ =
+ if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
+ max_workers := mm
+ else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
+ max_workers := Int.min (mm, 2 * m)
else ();
- (*canceled groups*)
+ val missing = ! max_workers - length (! workers);
+ val _ =
+ if missing > 0 then
+ funpow missing (fn () =>
+ ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
+ else ();
+
+ val _ =
+ if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
+ else signal work_available;
+
+
+ (* canceled groups *)
+
val _ =
if null (! canceled) then ()
else
@@ -279,24 +321,30 @@
Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
broadcast_work ());
- (*delay loop*)
+
+ (* delay loop *)
+
val _ = Exn.release (wait_timeout next_round scheduler_event);
- (*shutdown*)
+
+ (* shutdown *)
+
val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
val continue = not (! do_shutdown andalso null (! workers));
val _ = if continue then () else scheduler := NONE;
+
val _ = broadcast scheduler_event;
in continue end
handle Exn.Interrupt =>
(Multithreading.tracing 1 (fn () => "Interrupt");
- uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
- scheduler_next ());
+ List.app do_cancel (Task_Queue.cancel_all (! queue)); true);
fun scheduler_loop () =
- Multithreading.with_attributes
- (Multithreading.sync_interrupts Multithreading.public_interrupts)
- (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
+ while
+ Multithreading.with_attributes
+ (Multithreading.sync_interrupts Multithreading.public_interrupts)
+ (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
+ do ();
fun scheduler_active () = (*requires SYNCHRONIZED*)
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
@@ -346,7 +394,7 @@
Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
| SOME res => res);
-fun join_wait x =
+fun passive_wait x =
Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
fun join_next deps = (*requires SYNCHRONIZED*)
@@ -354,11 +402,11 @@
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
(NONE, []) => NONE
- | (NONE, deps') => (worker_wait work_finished; join_next deps')
+ | (NONE, deps') => (worker_wait true work_finished; join_next deps')
| (SOME work, deps') => SOME (work, deps'));
fun execute_work NONE = ()
- | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps')
+ | execute_work (SOME (work, deps')) = (execute work; join_work deps')
and join_work deps =
execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
@@ -375,7 +423,7 @@
else
(case worker_task () of
SOME task => join_depend task (map task_of xs)
- | NONE => List.app join_wait xs;
+ | NONE => List.app passive_wait xs;
map get_result xs);
end;