tuned tracing;
provide spare worker threads to saturate hardware while other workers wait during join;
--- a/src/Pure/Concurrent/future.ML Tue Jul 21 10:24:57 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Tue Jul 21 11:30:12 2009 +0200
@@ -134,16 +134,22 @@
(* worker activity *)
-fun trace_active () =
+fun count_active ws =
+ fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
+
+fun trace_active () = Multithreading.tracing 1 (fn () =>
let
val ws = ! workers;
val m = string_of_int (length ws);
- val n = string_of_int (length (filter #2 ws));
- in Multithreading.tracing 2 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
+ val n = string_of_int (count_active ws);
+ in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end);
fun change_active active = (*requires SYNCHRONIZED*)
change workers (AList.update Thread.equal (Thread.self (), active));
+fun overloaded () =
+ count_active (! workers) > Multithreading.max_threads_value ();
+
(* execute jobs *)
@@ -176,6 +182,7 @@
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
notify_all ();
NONE)
+ else if overloaded () then (worker_wait (); worker_next ())
else
(case change_result queue Task_Queue.dequeue of
NONE => (worker_wait (); worker_next ())
@@ -204,26 +211,31 @@
end);
(*worker threads*)
+ val ws = ! workers;
val _ =
- (case List.partition (Thread.isActive o #1) (! workers) of
- (_, []) => ()
- | (active, inactive) =>
- (workers := active; Multithreading.tracing 0 (fn () =>
- "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
+ if forall (Thread.isActive o #1) ws then ()
+ else
+ (case List.partition (Thread.isActive o #1) ws of
+ (_, []) => ()
+ | (active, inactive) =>
+ (workers := active; Multithreading.tracing 0 (fn () =>
+ "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
val _ = trace_active ();
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
- val l = length (! workers);
- val _ = excessive := l - m;
+ val mm = (m * 3) div 2;
+ val l = length ws;
+ val _ = excessive := l - mm;
val _ =
- if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
+ if mm > l then
+ funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
else ();
(*canceled groups*)
- val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
+ val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
(*shutdown*)
- val continue = not (! do_shutdown andalso null (! workers));
+ val continue = not (! do_shutdown andalso null ws);
val _ = if continue then () else scheduler := NONE;
val _ = notify_all ();
@@ -292,8 +304,12 @@
fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
+fun join_next deps = (*requires SYNCHRONIZED*)
+ if overloaded () then (worker_wait (); join_next deps)
+ else change_result queue (Task_Queue.dequeue_towards deps);
+
fun join_deps deps =
- (case SYNCHRONIZED "join" (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of
+ (case SYNCHRONIZED "join" (fn () => join_next deps) of
NONE => ()
| SOME (work, deps') => (execute "join" work; join_deps deps'));