more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
authorwenzelm
Tue, 28 Jul 2009 14:29:25 +0200
changeset 32248 0241916a5f06
parent 32247 3e7d1673f96e
child 32249 3e48bf962e05
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Tue Jul 28 14:11:15 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Tue Jul 28 14:29:25 2009 +0200
@@ -137,9 +137,8 @@
 fun broadcast cond = (*requires SYNCHRONIZED*)
   ConditionVar.broadcast cond;
 
-fun broadcast_all () = (*requires SYNCHRONIZED*)
- (ConditionVar.broadcast scheduler_event;
-  ConditionVar.broadcast work_available;
+fun broadcast_work () = (*requires SYNCHRONIZED*)
+ (ConditionVar.broadcast work_available;
   ConditionVar.broadcast work_finished);
 
 end;
@@ -200,9 +199,7 @@
 (* worker threads *)
 
 fun worker_wait cond = (*requires SYNCHRONIZED*)
- (change_active false; broadcast scheduler_event;
-  wait cond;
-  change_active true; broadcast scheduler_event);
+ (change_active false; wait cond; change_active true);
 
 fun worker_next () = (*requires SYNCHRONIZED*)
   if ! excessive > 0 then
@@ -223,13 +220,15 @@
   | SOME work => (execute name work; worker_loop name));
 
 fun worker_start name = (*requires SYNCHRONIZED*)
-  change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
+  change workers (cons (SimpleThread.fork false (fn () =>
+     (broadcast scheduler_event; worker_loop name)), true));
 
 
 (* scheduler *)
 
 val last_status = ref Time.zeroTime;
-val next_status = Time.fromMilliseconds 450;
+val next_status = Time.fromMilliseconds 500;
+val next_round = Time.fromMilliseconds 50;
 
 fun scheduler_next () = (*requires SYNCHRONIZED*)
   let
@@ -269,19 +268,16 @@
     val _ = excessive := l - mm;
     val _ =
       if mm > l then
-       (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
-        broadcast scheduler_event)
+        funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
       else ();
 
     (*canceled groups*)
     val _ =
       if null (! canceled) then ()
-      else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ());
+      else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ());
 
     (*delay loop*)
-    val delay =
-      Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
-    val _ = wait_interruptible scheduler_event delay
+    val _ = wait_interruptible scheduler_event next_round
       handle Exn.Interrupt =>
         (Multithreading.tracing 1 (fn () => "Interrupt");
           List.app do_cancel (Task_Queue.cancel_all (! queue)));
@@ -301,9 +297,8 @@
 
 fun scheduler_check () = (*requires SYNCHRONIZED*)
  (do_shutdown := false;
-  if not (scheduler_active ()) then
-   (scheduler := SOME (SimpleThread.fork false scheduler_loop); broadcast scheduler_event)
-  else ());
+  if scheduler_active () then ()
+  else scheduler := SOME (SimpleThread.fork false scheduler_loop));
 
 
 
@@ -420,7 +415,7 @@
   if Multithreading.available then
     SYNCHRONIZED "shutdown" (fn () =>
      while scheduler_active () do
-      (wait scheduler_event; broadcast_all ()))
+      (wait scheduler_event; broadcast_work ()))
   else ();