# HG changeset patch # User wenzelm # Date 1248784165 -7200 # Node ID 0241916a5f06cf14ad9095197c3aeaa9f56849f0 # Parent 3e7d1673f96e5397a64c5b49470f158735141376 more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads; diff -r 3e7d1673f96e -r 0241916a5f06 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jul 28 14:11:15 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Tue Jul 28 14:29:25 2009 +0200 @@ -137,9 +137,8 @@ fun broadcast cond = (*requires SYNCHRONIZED*) ConditionVar.broadcast cond; -fun broadcast_all () = (*requires SYNCHRONIZED*) - (ConditionVar.broadcast scheduler_event; - ConditionVar.broadcast work_available; +fun broadcast_work () = (*requires SYNCHRONIZED*) + (ConditionVar.broadcast work_available; ConditionVar.broadcast work_finished); end; @@ -200,9 +199,7 @@ (* worker threads *) fun worker_wait cond = (*requires SYNCHRONIZED*) - (change_active false; broadcast scheduler_event; - wait cond; - change_active true; broadcast scheduler_event); + (change_active false; wait cond; change_active true); fun worker_next () = (*requires SYNCHRONIZED*) if ! excessive > 0 then @@ -223,13 +220,15 @@ | SOME work => (execute name work; worker_loop name)); fun worker_start name = (*requires SYNCHRONIZED*) - change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true)); + change workers (cons (SimpleThread.fork false (fn () => + (broadcast scheduler_event; worker_loop name)), true)); (* scheduler *) val last_status = ref Time.zeroTime; -val next_status = Time.fromMilliseconds 450; +val next_status = Time.fromMilliseconds 500; +val next_round = Time.fromMilliseconds 50; fun scheduler_next () = (*requires SYNCHRONIZED*) let @@ -269,19 +268,16 @@ val _ = excessive := l - mm; val _ = if mm > l then - (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) (); - broadcast scheduler_event) + funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) () else (); (*canceled groups*) val _ = if null (! canceled) then () - else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ()); + else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ()); (*delay loop*) - val delay = - Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50); - val _ = wait_interruptible scheduler_event delay + val _ = wait_interruptible scheduler_event next_round handle Exn.Interrupt => (Multithreading.tracing 1 (fn () => "Interrupt"); List.app do_cancel (Task_Queue.cancel_all (! queue))); @@ -301,9 +297,8 @@ fun scheduler_check () = (*requires SYNCHRONIZED*) (do_shutdown := false; - if not (scheduler_active ()) then - (scheduler := SOME (SimpleThread.fork false scheduler_loop); broadcast scheduler_event) - else ()); + if scheduler_active () then () + else scheduler := SOME (SimpleThread.fork false scheduler_loop)); @@ -420,7 +415,7 @@ if Multithreading.available then SYNCHRONIZED "shutdown" (fn () => while scheduler_active () do - (wait scheduler_event; broadcast_all ())) + (wait scheduler_event; broadcast_work ())) else ();