worker_next: plain signalling via work_available only, not scheduler_event;
scheduler: tuned worker pool adjustments;
--- a/src/Pure/Concurrent/future.ML Thu Nov 05 00:13:00 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Thu Nov 05 13:01:11 2009 +0100
@@ -203,23 +203,20 @@
val _ = state := Working;
in () end;
-fun worker_next have_work = (*requires SYNCHRONIZED*)
+fun worker_next () = (*requires SYNCHRONIZED*)
if length (! workers) > ! max_workers then
(Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
- if have_work then signal work_available else ();
- broadcast scheduler_event;
+ signal work_available;
NONE)
else if count_workers Working > ! max_active then
- (if have_work then signal work_available else ();
- worker_wait false scheduler_event;
- worker_next false)
+ (worker_wait false work_available; worker_next ())
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
- NONE => (worker_wait false work_available; worker_next true)
+ NONE => (worker_wait false work_available; worker_next ())
| some => (signal work_available; some));
fun worker_loop name =
- (case SYNCHRONIZED name (fn () => worker_next false) of
+ (case SYNCHRONIZED name (fn () => worker_next ()) of
NONE => ()
| SOME work => (execute work; worker_loop name));
@@ -241,7 +238,9 @@
val tick = Time.<= (Time.+ (! last_round, next_round), now);
val _ = if tick then last_round := now else ();
- (*queue and worker status*)
+
+ (* queue and worker status *)
+
val _ =
if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
val _ =
@@ -263,7 +262,6 @@
end)
else ();
- (*worker threads*)
val _ =
if forall (Thread.isActive o #1) (! workers) then ()
else
@@ -275,6 +273,12 @@
"SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
end;
+
+ (* worker pool adjustments *)
+
+ val max_active0 = ! max_active;
+ val max_workers0 = ! max_workers;
+
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
val _ = max_active := m;
@@ -289,19 +293,26 @@
Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
else ();
val _ =
- if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then max_workers := mm
- else if ! worker_trend > 5 then max_workers := Int.min (mm, 2 * m)
+ if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
+ max_workers := mm
+ else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
+ max_workers := Int.min (mm, 2 * m)
else ();
val missing = ! max_workers - length (! workers);
val _ =
if missing > 0 then
- (funpow missing (fn () =>
- ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
- broadcast scheduler_event)
+ funpow missing (fn () =>
+ ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
else ();
- (*canceled groups*)
+ val _ =
+ if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
+ else signal work_available;
+
+
+ (* canceled groups *)
+
val _ =
if null (! canceled) then ()
else
@@ -310,13 +321,18 @@
Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
broadcast_work ());
- (*delay loop*)
+
+ (* delay loop *)
+
val _ = Exn.release (wait_timeout next_round scheduler_event);
- (*shutdown*)
+
+ (* shutdown *)
+
val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
val continue = not (! do_shutdown andalso null (! workers));
val _ = if continue then () else scheduler := NONE;
+
val _ = broadcast scheduler_event;
in continue end
handle Exn.Interrupt =>