more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
more specific signal vs. broadcast;
execute/finish: more careful notification based on minimal/maximal status;
tuned shutdown;
--- a/src/Pure/Concurrent/future.ML Mon Jul 27 12:00:02 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Mon Jul 27 12:11:18 2009 +0200
@@ -114,20 +114,26 @@
(* synchronization *)
+val scheduler_event = ConditionVar.conditionVar ();
+val work_available = ConditionVar.conditionVar ();
+val work_finished = ConditionVar.conditionVar ();
+
local
val lock = Mutex.mutex ();
- val cond = ConditionVar.conditionVar ();
in
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
-fun wait () = (*requires SYNCHRONIZED*)
+fun wait cond = (*requires SYNCHRONIZED*)
ConditionVar.wait (cond, lock);
-fun wait_timeout timeout = (*requires SYNCHRONIZED*)
+fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
-fun notify_all () = (*requires SYNCHRONIZED*)
+fun signal cond = (*requires SYNCHRONIZED*)
+ ConditionVar.signal cond;
+
+fun broadcast cond = (*requires SYNCHRONIZED*)
ConditionVar.broadcast cond;
end;
@@ -183,29 +189,35 @@
val ok = setmp_thread_data (name, task, group) (fn () =>
fold (fn job => fn ok => job valid andalso ok) jobs true) ();
val _ = SYNCHRONIZED "execute" (fn () =>
- (change queue (Task_Queue.finish task);
- if ok then ()
- else if Task_Queue.cancel (! queue) group then ()
- else do_cancel group;
- notify_all ()));
+ let
+ val maximal = change_result queue (Task_Queue.finish task);
+ val _ =
+ if ok then ()
+ else if Task_Queue.cancel (! queue) group then ()
+ else do_cancel group;
+ val _ = broadcast work_finished;
+ val _ = if maximal then () else broadcast work_available;
+ in () end);
in () end;
(* worker threads *)
-fun worker_wait () = (*requires SYNCHRONIZED*)
- (change_active false; wait (); change_active true);
+fun worker_wait cond = (*requires SYNCHRONIZED*)
+ (change_active false; broadcast scheduler_event;
+ wait cond;
+ change_active true; broadcast scheduler_event);
fun worker_next () = (*requires SYNCHRONIZED*)
if ! excessive > 0 then
(dec excessive;
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
- notify_all ();
+ broadcast scheduler_event;
NONE)
- else if overloaded () then (worker_wait (); worker_next ())
+ else if overloaded () then (worker_wait scheduler_event; worker_next ())
else
(case change_result queue Task_Queue.dequeue of
- NONE => (worker_wait (); worker_next ())
+ NONE => (worker_wait work_available; worker_next ())
| some => some);
fun worker_loop name =
@@ -231,11 +243,10 @@
end);
(*worker threads*)
- val ws = ! workers;
val _ =
- if forall (Thread.isActive o #1) ws then ()
+ if forall (Thread.isActive o #1) (! workers) then ()
else
- (case List.partition (Thread.isActive o #1) ws of
+ (case List.partition (Thread.isActive o #1) (! workers) of
(_, []) => ()
| (active, inactive) =>
(workers := active; Multithreading.tracing 0 (fn () =>
@@ -244,24 +255,26 @@
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
val mm = (m * 3) div 2;
- val l = length ws;
+ val l = length (! workers);
val _ = excessive := l - mm;
val _ =
if mm > l then
- funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
+ (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
+ broadcast scheduler_event)
else ();
(*canceled groups*)
val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
- (*shutdown*)
- val continue = not (! do_shutdown andalso null ws);
- val _ = if continue then () else scheduler := NONE;
+ val timeout =
+ Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
+ val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
+ handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
- val _ = notify_all ();
- val _ = interruptible (fn () =>
- wait_timeout (Time.fromMilliseconds (if null (! canceled) then 1000 else 50))) ()
- handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
+ (*shutdown*)
+ val continue = not (! do_shutdown andalso null (! workers));
+ val _ = if continue then () else scheduler := NONE;
+ val _ = broadcast scheduler_event;
in continue end;
fun scheduler_loop () =
@@ -272,7 +285,8 @@
fun scheduler_check name = SYNCHRONIZED name (fn () =>
if not (scheduler_active ()) then
- (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
+ (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
+ broadcast scheduler_event)
else if ! do_shutdown then error "Scheduler shutdown in progress"
else ());
@@ -292,7 +306,10 @@
| NONE => Task_Queue.new_group (worker_group ()));
val (result, job) = future_job group e;
val task = SYNCHRONIZED "future" (fn () =>
- change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
+ let
+ val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
+ val _ = if minimal then signal work_available else ();
+ in task end);
in Future {task = task, group = group, result = result} end;
fun fork e = fork_future NONE [] 0 e;
@@ -313,7 +330,7 @@
| SOME res => res);
fun join_next deps = (*requires SYNCHRONIZED*)
- if overloaded () then (worker_wait (); join_next deps)
+ if overloaded () then (worker_wait scheduler_event; join_next deps)
else change_result queue (Task_Queue.dequeue_towards deps);
fun join_deps deps =
@@ -336,7 +353,7 @@
fun join_wait x =
if SYNCHRONIZED "join_wait" (fn () =>
- is_finished x orelse (if worker then worker_wait () else wait (); false))
+ is_finished x orelse ((if worker then worker_wait else wait) work_finished; false))
then () else join_wait x;
val _ = xs |> List.app (fn x =>
@@ -390,7 +407,7 @@
(*cancel: present and future group members will be interrupted eventually*)
fun cancel_group group =
(scheduler_check "cancel check";
- SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ())));
+ SYNCHRONIZED "cancel" (fn () => (do_cancel group; broadcast scheduler_event)));
fun cancel x = cancel_group (group_of x);
@@ -401,13 +418,13 @@
if Multithreading.available then
(scheduler_check "shutdown check";
SYNCHRONIZED "shutdown" (fn () =>
- (while not (scheduler_active ()) do wait ();
- while not (Task_Queue.is_empty (! queue)) do wait ();
+ (while not (scheduler_active ()) do wait scheduler_event;
+ while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
do_shutdown := true;
- notify_all ();
- while not (null (! workers)) do wait ();
- while scheduler_active () do wait ();
- OS.Process.sleep (Time.fromMilliseconds 300))))
+ while scheduler_active () do
+ (broadcast work_available;
+ broadcast scheduler_event;
+ wait scheduler_event))))
else ();