worker_next: plain signalling via work_available only, not scheduler_event;
authorwenzelm
Thu, 05 Nov 2009 13:01:11 +0100
changeset 33415 352fe8e9162d
parent 33414 934801690991
child 33416 13d00799fe49
worker_next: plain signalling via work_available only, not scheduler_event; scheduler: tuned worker pool adjustments;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Thu Nov 05 00:13:00 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Thu Nov 05 13:01:11 2009 +0100
@@ -203,23 +203,20 @@
     val _ = state := Working;
   in () end;
 
-fun worker_next have_work = (*requires SYNCHRONIZED*)
+fun worker_next () = (*requires SYNCHRONIZED*)
   if length (! workers) > ! max_workers then
     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
-     if have_work then signal work_available else ();
-     broadcast scheduler_event;
+     signal work_available;
      NONE)
   else if count_workers Working > ! max_active then
-    (if have_work then signal work_available else ();
-     worker_wait false scheduler_event;
-     worker_next false)
+    (worker_wait false work_available; worker_next ())
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
-      NONE => (worker_wait false work_available; worker_next true)
+      NONE => (worker_wait false work_available; worker_next ())
     | some => (signal work_available; some));
 
 fun worker_loop name =
-  (case SYNCHRONIZED name (fn () => worker_next false) of
+  (case SYNCHRONIZED name (fn () => worker_next ()) of
     NONE => ()
   | SOME work => (execute work; worker_loop name));
 
@@ -241,7 +238,9 @@
     val tick = Time.<= (Time.+ (! last_round, next_round), now);
     val _ = if tick then last_round := now else ();
 
-    (*queue and worker status*)
+
+    (* queue and worker status *)
+
     val _ =
       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
     val _ =
@@ -263,7 +262,6 @@
           end)
       else ();
 
-    (*worker threads*)
     val _ =
       if forall (Thread.isActive o #1) (! workers) then ()
       else
@@ -275,6 +273,12 @@
             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
         end;
 
+
+    (* worker pool adjustments *)
+
+    val max_active0 = ! max_active;
+    val max_workers0 = ! max_workers;
+
     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
     val _ = max_active := m;
 
@@ -289,19 +293,26 @@
         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
       else ();
     val _ =
-      if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then max_workers := mm
-      else if ! worker_trend > 5 then max_workers := Int.min (mm, 2 * m)
+      if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
+        max_workers := mm
+      else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
+        max_workers := Int.min (mm, 2 * m)
       else ();
 
     val missing = ! max_workers - length (! workers);
     val _ =
       if missing > 0 then
-       (funpow missing (fn () =>
-          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
-        broadcast scheduler_event)
+        funpow missing (fn () =>
+          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
       else ();
 
-    (*canceled groups*)
+    val _ =
+      if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
+      else signal work_available;
+
+
+    (* canceled groups *)
+
     val _ =
       if null (! canceled) then ()
       else
@@ -310,13 +321,18 @@
         Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
         broadcast_work ());
 
-    (*delay loop*)
+
+    (* delay loop *)
+
     val _ = Exn.release (wait_timeout next_round scheduler_event);
 
-    (*shutdown*)
+
+    (* shutdown *)
+
     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
+
     val _ = broadcast scheduler_event;
   in continue end
   handle Exn.Interrupt =>