more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
--- a/src/Pure/Concurrent/future.ML Tue Jul 28 14:11:15 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Tue Jul 28 14:29:25 2009 +0200
@@ -137,9 +137,8 @@
fun broadcast cond = (*requires SYNCHRONIZED*)
ConditionVar.broadcast cond;
-fun broadcast_all () = (*requires SYNCHRONIZED*)
- (ConditionVar.broadcast scheduler_event;
- ConditionVar.broadcast work_available;
+fun broadcast_work () = (*requires SYNCHRONIZED*)
+ (ConditionVar.broadcast work_available;
ConditionVar.broadcast work_finished);
end;
@@ -200,9 +199,7 @@
(* worker threads *)
fun worker_wait cond = (*requires SYNCHRONIZED*)
- (change_active false; broadcast scheduler_event;
- wait cond;
- change_active true; broadcast scheduler_event);
+ (change_active false; wait cond; change_active true);
fun worker_next () = (*requires SYNCHRONIZED*)
if ! excessive > 0 then
@@ -223,13 +220,15 @@
| SOME work => (execute name work; worker_loop name));
fun worker_start name = (*requires SYNCHRONIZED*)
- change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
+ change workers (cons (SimpleThread.fork false (fn () =>
+ (broadcast scheduler_event; worker_loop name)), true));
(* scheduler *)
val last_status = ref Time.zeroTime;
-val next_status = Time.fromMilliseconds 450;
+val next_status = Time.fromMilliseconds 500;
+val next_round = Time.fromMilliseconds 50;
fun scheduler_next () = (*requires SYNCHRONIZED*)
let
@@ -269,19 +268,16 @@
val _ = excessive := l - mm;
val _ =
if mm > l then
- (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
- broadcast scheduler_event)
+ funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
else ();
(*canceled groups*)
val _ =
if null (! canceled) then ()
- else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ());
+ else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ());
(*delay loop*)
- val delay =
- Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
- val _ = wait_interruptible scheduler_event delay
+ val _ = wait_interruptible scheduler_event next_round
handle Exn.Interrupt =>
(Multithreading.tracing 1 (fn () => "Interrupt");
List.app do_cancel (Task_Queue.cancel_all (! queue)));
@@ -301,9 +297,8 @@
fun scheduler_check () = (*requires SYNCHRONIZED*)
(do_shutdown := false;
- if not (scheduler_active ()) then
- (scheduler := SOME (SimpleThread.fork false scheduler_loop); broadcast scheduler_event)
- else ());
+ if scheduler_active () then ()
+ else scheduler := SOME (SimpleThread.fork false scheduler_loop));
@@ -420,7 +415,7 @@
if Multithreading.available then
SYNCHRONIZED "shutdown" (fn () =>
while scheduler_active () do
- (wait scheduler_event; broadcast_all ()))
+ (wait scheduler_event; broadcast_work ()))
else ();