tuned tracing;
authorwenzelm
Tue, 21 Jul 2009 11:30:12 +0200
changeset 32095 ad4be204fdfe
parent 32094 89b9210c7506
child 32096 cb9adb13f892
tuned tracing; provide spare worker threads to saturate hardware while other workers wait during join;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Tue Jul 21 10:24:57 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Tue Jul 21 11:30:12 2009 +0200
@@ -134,16 +134,22 @@
 
 (* worker activity *)
 
-fun trace_active () =
+fun count_active ws =
+  fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
+
+fun trace_active () = Multithreading.tracing 1 (fn () =>
   let
     val ws = ! workers;
     val m = string_of_int (length ws);
-    val n = string_of_int (length (filter #2 ws));
-  in Multithreading.tracing 2 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
+    val n = string_of_int (count_active ws);
+  in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end);
 
 fun change_active active = (*requires SYNCHRONIZED*)
   change workers (AList.update Thread.equal (Thread.self (), active));
 
+fun overloaded () =
+  count_active (! workers) > Multithreading.max_threads_value ();
+
 
 (* execute jobs *)
 
@@ -176,6 +182,7 @@
      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
      notify_all ();
      NONE)
+  else if overloaded () then (worker_wait (); worker_next ())
   else
     (case change_result queue Task_Queue.dequeue of
       NONE => (worker_wait (); worker_next ())
@@ -204,26 +211,31 @@
       end);
 
     (*worker threads*)
+    val ws = ! workers;
     val _ =
-      (case List.partition (Thread.isActive o #1) (! workers) of
-        (_, []) => ()
-      | (active, inactive) =>
-          (workers := active; Multithreading.tracing 0 (fn () =>
-            "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
+      if forall (Thread.isActive o #1) ws then ()
+      else
+        (case List.partition (Thread.isActive o #1) ws of
+          (_, []) => ()
+        | (active, inactive) =>
+            (workers := active; Multithreading.tracing 0 (fn () =>
+              "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
     val _ = trace_active ();
 
     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
-    val l = length (! workers);
-    val _ = excessive := l - m;
+    val mm = (m * 3) div 2;
+    val l = length ws;
+    val _ = excessive := l - mm;
     val _ =
-      if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
+      if mm > l then
+        funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
       else ();
 
     (*canceled groups*)
-    val _ =  change canceled (filter_out (Task_Queue.cancel (! queue)));
+    val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
 
     (*shutdown*)
-    val continue = not (! do_shutdown andalso null (! workers));
+    val continue = not (! do_shutdown andalso null ws);
     val _ = if continue then () else scheduler := NONE;
 
     val _ = notify_all ();
@@ -292,8 +304,12 @@
 
 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
 
+fun join_next deps = (*requires SYNCHRONIZED*)
+  if overloaded () then (worker_wait (); join_next deps)
+  else change_result queue (Task_Queue.dequeue_towards deps);
+
 fun join_deps deps =
-  (case SYNCHRONIZED "join" (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of
+  (case SYNCHRONIZED "join" (fn () => join_next deps) of
     NONE => ()
   | SOME (work, deps') => (execute "join" work; join_deps deps'));