merged
authorwenzelm
Thu, 05 Nov 2009 13:57:56 +0100
changeset 33438 e87ce1a03a11
parent 33437 c8bc8dc5869f (current diff)
parent 33416 13d00799fe49 (diff)
child 33439 f5d95787224f
merged
--- a/src/Pure/Concurrent/future.ML	Wed Nov 04 17:17:30 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Thu Nov 05 13:57:56 2009 +0100
@@ -64,7 +64,7 @@
 type group = Task_Queue.group;
 
 local
-  val tag = Universal.tag () : (string * task * group) option Universal.tag;
+  val tag = Universal.tag () : (task * group) option Universal.tag;
 in
   fun thread_data () = the_default NONE (Thread.getLocal tag);
   fun setmp_thread_data data f x =
@@ -72,8 +72,8 @@
 end;
 
 val is_worker = is_some o thread_data;
-val worker_task = Option.map #2 o thread_data;
-val worker_group = Option.map #3 o thread_data;
+val worker_task = Option.map #1 o thread_data;
+val worker_group = Option.map #2 o thread_data;
 
 
 (* datatype future *)
@@ -99,17 +99,6 @@
 
 (** scheduling **)
 
-(* global state *)
-
-val queue = Unsynchronized.ref Task_Queue.empty;
-val next = Unsynchronized.ref 0;
-val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
-val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
-val excessive = Unsynchronized.ref 0;
-val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
-val do_shutdown = Unsynchronized.ref false;
-
-
 (* synchronization *)
 
 val scheduler_event = ConditionVar.conditionVar ();
@@ -141,6 +130,24 @@
 end;
 
 
+(* global state *)
+
+val queue = Unsynchronized.ref Task_Queue.empty;
+val next = Unsynchronized.ref 0;
+val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
+val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
+val do_shutdown = Unsynchronized.ref false;
+val max_workers = Unsynchronized.ref 0;
+val max_active = Unsynchronized.ref 0;
+val worker_trend = Unsynchronized.ref 0;
+
+datatype worker_state = Working | Waiting | Sleeping;
+val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
+
+fun count_workers state = (*requires SYNCHRONIZED*)
+  fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
+
+
 (* execute future jobs *)
 
 fun future_job group (e: unit -> 'a) =
@@ -165,10 +172,10 @@
  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   broadcast scheduler_event);
 
-fun execute name (task, group, jobs) =
+fun execute (task, group, jobs) =
   let
     val valid = not (Task_Queue.is_canceled group);
-    val ok = setmp_thread_data (name, task, group) (fn () =>
+    val ok = setmp_thread_data (task, group) (fn () =>
       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
     val _ = SYNCHRONIZED "finish" (fn () =>
       let
@@ -178,99 +185,134 @@
           else if Task_Queue.cancel (! queue) group then ()
           else do_cancel group;
         val _ = broadcast work_finished;
-        val _ = if maximal then () else broadcast work_available;
+        val _ = if maximal then () else signal work_available;
       in () end);
   in () end;
 
 
-(* worker activity *)
-
-fun count_active () = (*requires SYNCHRONIZED*)
-  fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
-
-fun change_active active = (*requires SYNCHRONIZED*)
-  Unsynchronized.change workers
-    (AList.update Thread.equal (Thread.self (), active));
-
-
 (* worker threads *)
 
-fun worker_wait cond = (*requires SYNCHRONIZED*)
-  (change_active false; wait cond; change_active true);
+fun worker_wait active cond = (*requires SYNCHRONIZED*)
+  let
+    val state =
+      (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
+        SOME state => state
+      | NONE => raise Fail "Unregistered worker thread");
+    val _ = state := (if active then Waiting else Sleeping);
+    val _ = wait cond;
+    val _ = state := Working;
+  in () end;
 
 fun worker_next () = (*requires SYNCHRONIZED*)
-  if ! excessive > 0 then
-    (Unsynchronized.dec excessive;
-     Unsynchronized.change workers
-      (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
-     broadcast scheduler_event;
+  if length (! workers) > ! max_workers then
+    (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
+     signal work_available;
      NONE)
-  else if count_active () > Multithreading.max_threads_value () then
-    (worker_wait scheduler_event; worker_next ())
+  else if count_workers Working > ! max_active then
+    (worker_wait false work_available; worker_next ())
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
-      NONE => (worker_wait work_available; worker_next ())
-    | some => some);
+      NONE => (worker_wait false work_available; worker_next ())
+    | some => (signal work_available; some));
 
 fun worker_loop name =
   (case SYNCHRONIZED name (fn () => worker_next ()) of
     NONE => ()
-  | SOME work => (execute name work; worker_loop name));
+  | SOME work => (execute work; worker_loop name));
 
 fun worker_start name = (*requires SYNCHRONIZED*)
-  Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
-     (broadcast scheduler_event; worker_loop name)), true));
+  Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
+    Unsynchronized.ref Working));
 
 
 (* scheduler *)
 
-val last_status = Unsynchronized.ref Time.zeroTime;
-val next_status = Time.fromMilliseconds 500;
+val status_ticks = Unsynchronized.ref 0;
+
+val last_round = Unsynchronized.ref Time.zeroTime;
 val next_round = Time.fromMilliseconds 50;
 
 fun scheduler_next () = (*requires SYNCHRONIZED*)
   let
-    (*queue and worker status*)
+    val now = Time.now ();
+    val tick = Time.<= (Time.+ (! last_round, next_round), now);
+    val _ = if tick then last_round := now else ();
+
+
+    (* queue and worker status *)
+
+    val _ =
+      if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
     val _ =
-      let val now = Time.now () in
-        if Time.> (Time.+ (! last_status, next_status), now) then ()
-        else
-         (last_status := now; Multithreading.tracing 1 (fn () =>
-            let
-              val {ready, pending, running} = Task_Queue.status (! queue);
-              val total = length (! workers);
-              val active = count_active ();
-            in
-              "SCHEDULE " ^ Time.toString now ^ ": " ^
-                string_of_int ready ^ " ready, " ^
-                string_of_int pending ^ " pending, " ^
-                string_of_int running ^ " running; " ^
-                string_of_int total ^ " workers, " ^
-                string_of_int active ^ " active"
-            end))
-      end;
+      if tick andalso ! status_ticks = 0 then
+        Multithreading.tracing 1 (fn () =>
+          let
+            val {ready, pending, running} = Task_Queue.status (! queue);
+            val total = length (! workers);
+            val active = count_workers Working;
+            val waiting = count_workers Waiting;
+          in
+            "SCHEDULE " ^ Time.toString now ^ ": " ^
+              string_of_int ready ^ " ready, " ^
+              string_of_int pending ^ " pending, " ^
+              string_of_int running ^ " running; " ^
+              string_of_int total ^ " workers, " ^
+              string_of_int active ^ " active, " ^
+              string_of_int waiting ^ " waiting "
+          end)
+      else ();
 
-    (*worker threads*)
     val _ =
       if forall (Thread.isActive o #1) (! workers) then ()
       else
-        (case List.partition (Thread.isActive o #1) (! workers) of
-          (_, []) => ()
-        | (alive, dead) =>
-            (workers := alive; Multithreading.tracing 0 (fn () =>
-              "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
+        let
+          val  (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
+          val _ = workers := alive;
+        in
+          Multithreading.tracing 0 (fn () =>
+            "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
+        end;
+
+
+    (* worker pool adjustments *)
+
+    val max_active0 = ! max_active;
+    val max_workers0 = ! max_workers;
 
     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
-    val mm = if m = 9999 then 1 else m * 2;
-    val l = length (! workers);
-    val _ = excessive := l - mm;
+    val _ = max_active := m;
+
+    val mm =
+      if ! do_shutdown then 0
+      else if m = 9999 then 1
+      else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
     val _ =
-      if mm > l then
-        funpow (mm - l) (fn () =>
-          worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
+      if tick andalso mm > ! max_workers then
+        Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
+      else if tick andalso mm < ! max_workers then
+        Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
+      else ();
+    val _ =
+      if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
+        max_workers := mm
+      else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
+        max_workers := Int.min (mm, 2 * m)
       else ();
 
-    (*canceled groups*)
+    val missing = ! max_workers - length (! workers);
+    val _ =
+      if missing > 0 then
+        funpow missing (fn () =>
+          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
+      else ();
+
+    val _ =
+      if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
+      else signal work_available;
+
+
+    (* canceled groups *)
+
     val _ =
       if null (! canceled) then ()
       else
@@ -279,24 +321,30 @@
         Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
         broadcast_work ());
 
-    (*delay loop*)
+
+    (* delay loop *)
+
     val _ = Exn.release (wait_timeout next_round scheduler_event);
 
-    (*shutdown*)
+
+    (* shutdown *)
+
     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
+
     val _ = broadcast scheduler_event;
   in continue end
   handle Exn.Interrupt =>
    (Multithreading.tracing 1 (fn () => "Interrupt");
-    uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
-    scheduler_next ());
+    List.app do_cancel (Task_Queue.cancel_all (! queue)); true);
 
 fun scheduler_loop () =
-  Multithreading.with_attributes
-    (Multithreading.sync_interrupts Multithreading.public_interrupts)
-    (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
+  while
+    Multithreading.with_attributes
+      (Multithreading.sync_interrupts Multithreading.public_interrupts)
+      (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
+  do ();
 
 fun scheduler_active () = (*requires SYNCHRONIZED*)
   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
@@ -346,7 +394,7 @@
       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   | SOME res => res);
 
-fun join_wait x =
+fun passive_wait x =
   Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
 
 fun join_next deps = (*requires SYNCHRONIZED*)
@@ -354,11 +402,11 @@
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
       (NONE, []) => NONE
-    | (NONE, deps') => (worker_wait work_finished; join_next deps')
+    | (NONE, deps') => (worker_wait true work_finished; join_next deps')
     | (SOME work, deps') => SOME (work, deps'));
 
 fun execute_work NONE = ()
-  | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps')
+  | execute_work (SOME (work, deps')) = (execute work; join_work deps')
 and join_work deps =
   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
 
@@ -375,7 +423,7 @@
   else
     (case worker_task () of
       SOME task => join_depend task (map task_of xs)
-    | NONE => List.app join_wait xs;
+    | NONE => List.app passive_wait xs;
     map get_result xs);
 
 end;