worker activity: distinguish between waiting (formerly active) and sleeping;
authorwenzelm
Wed, 04 Nov 2009 11:58:29 +0100
changeset 33410 e351f4c1f18c
parent 33409 0a1c0c1209ec
child 33411 a07558eb5029
worker activity: distinguish between waiting (formerly active) and sleeping; tuned;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Wed Nov 04 11:37:06 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Nov 04 11:58:29 2009 +0100
@@ -99,18 +99,6 @@
 
 (** scheduling **)
 
-(* global state *)
-
-val queue = Unsynchronized.ref Task_Queue.empty;
-val next = Unsynchronized.ref 0;
-val max_workers = Unsynchronized.ref 0;
-val max_active = Unsynchronized.ref 0;
-val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
-val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
-val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
-val do_shutdown = Unsynchronized.ref false;
-
-
 (* synchronization *)
 
 val scheduler_event = ConditionVar.conditionVar ();
@@ -142,6 +130,23 @@
 end;
 
 
+(* global state *)
+
+val queue = Unsynchronized.ref Task_Queue.empty;
+val next = Unsynchronized.ref 0;
+val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
+val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
+val do_shutdown = Unsynchronized.ref false;
+val max_workers = Unsynchronized.ref 0;
+val max_active = Unsynchronized.ref 0;
+
+datatype worker_state = Working | Waiting | Sleeping;
+val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
+
+fun count_workers state = (*requires SYNCHRONIZED*)
+  fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
+
+
 (* execute future jobs *)
 
 fun future_job group (e: unit -> 'a) =
@@ -184,25 +189,17 @@
   in () end;
 
 
-(* worker activity *)
-
-fun count_active () = (*requires SYNCHRONIZED*)
-  fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
-
-fun find_active () = (*requires SYNCHRONIZED*)
-  (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
-    SOME active => active
-  | NONE => raise Fail "Unregistered worker thread");
-
-
 (* worker threads *)
 
-fun worker_wait cond = (*requires SYNCHRONIZED*)
+fun worker_wait active cond = (*requires SYNCHRONIZED*)
   let
-    val active = find_active ();
-    val _ = active := false;
+    val state =
+      (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
+        SOME state => state
+      | NONE => raise Fail "Unregistered worker thread");
+    val _ = state := (if active then Waiting else Sleeping);
     val _ = wait cond;
-    val _ = active := true;
+    val _ = state := Working;
   in () end;
 
 fun worker_next have_work = (*requires SYNCHRONIZED*)
@@ -211,13 +208,13 @@
      if have_work then signal work_available else ();
      broadcast scheduler_event;
      NONE)
-  else if count_active () > ! max_active then
+  else if count_workers Working > ! max_active then
     (if have_work then signal work_available else ();
-     worker_wait scheduler_event;
+     worker_wait false scheduler_event;
      worker_next false)
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
-      NONE => (worker_wait work_available; worker_next true)
+      NONE => (worker_wait true work_available; worker_next true)
     | some => some);
 
 fun worker_loop name =
@@ -227,7 +224,7 @@
 
 fun worker_start name = (*requires SYNCHRONIZED*)
   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
-    Unsynchronized.ref true));
+    Unsynchronized.ref Working));
 
 
 (* scheduler *)
@@ -252,14 +249,16 @@
           let
             val {ready, pending, running} = Task_Queue.status (! queue);
             val total = length (! workers);
-            val active = count_active ();
+            val active = count_workers Working;
+            val waiting = count_workers Waiting;
           in
             "SCHEDULE " ^ Time.toString now ^ ": " ^
               string_of_int ready ^ " ready, " ^
               string_of_int pending ^ " pending, " ^
               string_of_int running ^ " running; " ^
               string_of_int total ^ " workers, " ^
-              string_of_int active ^ " active "
+              string_of_int active ^ " active, " ^
+              string_of_int waiting ^ " waiting "
           end)
       else ();
 
@@ -373,7 +372,7 @@
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
       (NONE, []) => NONE
-    | (NONE, deps') => (worker_wait work_finished; join_next deps')
+    | (NONE, deps') => (worker_wait true work_finished; join_next deps')
     | (SOME work, deps') => SOME (work, deps'));
 
 fun execute_work NONE = ()