worker_next: treat wait for work_available as Sleeping, not Waiting;
authorwenzelm
Wed, 04 Nov 2009 20:31:36 +0100
changeset 33411 a07558eb5029
parent 33410 e351f4c1f18c
child 33412 4b403f72a511
worker_next: treat wait for work_available as Sleeping, not Waiting; max_threads: simple adaptive scheme between m and 2 * m;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Wed Nov 04 11:58:29 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Nov 04 20:31:36 2009 +0100
@@ -139,6 +139,7 @@
 val do_shutdown = Unsynchronized.ref false;
 val max_workers = Unsynchronized.ref 0;
 val max_active = Unsynchronized.ref 0;
+val worker_trend = Unsynchronized.ref 0;
 
 datatype worker_state = Working | Waiting | Sleeping;
 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
@@ -214,7 +215,7 @@
      worker_next false)
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
-      NONE => (worker_wait true work_available; worker_next true)
+      NONE => (worker_wait false work_available; worker_next true)
     | some => some);
 
 fun worker_loop name =
@@ -277,8 +278,20 @@
     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
     val _ = max_active := m;
 
-    val mm = if m = 9999 then 1 else m * 2;
-    val _ = max_workers := mm;
+    val mm =
+      if ! do_shutdown then 0
+      else if m = 9999 then 1
+      else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 2 * m);
+    val _ =
+      if tick andalso mm > ! max_workers then
+        Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
+      else if tick andalso mm < ! max_workers then
+        Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
+      else ();
+    val _ =
+      if mm = 0 orelse ! worker_trend > 5 orelse ! worker_trend < ~50
+      then max_workers := mm
+      else ();
 
     val missing = ! max_workers - length (! workers);
     val _ =