worker activity: distinguish between waiting (formerly active) and sleeping;
tuned;
--- a/src/Pure/Concurrent/future.ML Wed Nov 04 11:37:06 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Wed Nov 04 11:58:29 2009 +0100
@@ -99,18 +99,6 @@
(** scheduling **)
-(* global state *)
-
-val queue = Unsynchronized.ref Task_Queue.empty;
-val next = Unsynchronized.ref 0;
-val max_workers = Unsynchronized.ref 0;
-val max_active = Unsynchronized.ref 0;
-val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
-val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
-val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
-val do_shutdown = Unsynchronized.ref false;
-
-
(* synchronization *)
val scheduler_event = ConditionVar.conditionVar ();
@@ -142,6 +130,23 @@
end;
+(* global state *)
+
+val queue = Unsynchronized.ref Task_Queue.empty;
+val next = Unsynchronized.ref 0;
+val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
+val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
+val do_shutdown = Unsynchronized.ref false;
+val max_workers = Unsynchronized.ref 0;
+val max_active = Unsynchronized.ref 0;
+
+datatype worker_state = Working | Waiting | Sleeping;
+val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
+
+fun count_workers state = (*requires SYNCHRONIZED*)
+ fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
+
+
(* execute future jobs *)
fun future_job group (e: unit -> 'a) =
@@ -184,25 +189,17 @@
in () end;
-(* worker activity *)
-
-fun count_active () = (*requires SYNCHRONIZED*)
- fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
-
-fun find_active () = (*requires SYNCHRONIZED*)
- (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
- SOME active => active
- | NONE => raise Fail "Unregistered worker thread");
-
-
(* worker threads *)
-fun worker_wait cond = (*requires SYNCHRONIZED*)
+fun worker_wait active cond = (*requires SYNCHRONIZED*)
let
- val active = find_active ();
- val _ = active := false;
+ val state =
+ (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
+ SOME state => state
+ | NONE => raise Fail "Unregistered worker thread");
+ val _ = state := (if active then Waiting else Sleeping);
val _ = wait cond;
- val _ = active := true;
+ val _ = state := Working;
in () end;
fun worker_next have_work = (*requires SYNCHRONIZED*)
@@ -211,13 +208,13 @@
if have_work then signal work_available else ();
broadcast scheduler_event;
NONE)
- else if count_active () > ! max_active then
+ else if count_workers Working > ! max_active then
(if have_work then signal work_available else ();
- worker_wait scheduler_event;
+ worker_wait false scheduler_event;
worker_next false)
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
- NONE => (worker_wait work_available; worker_next true)
+ NONE => (worker_wait true work_available; worker_next true)
| some => some);
fun worker_loop name =
@@ -227,7 +224,7 @@
fun worker_start name = (*requires SYNCHRONIZED*)
Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
- Unsynchronized.ref true));
+ Unsynchronized.ref Working));
(* scheduler *)
@@ -252,14 +249,16 @@
let
val {ready, pending, running} = Task_Queue.status (! queue);
val total = length (! workers);
- val active = count_active ();
+ val active = count_workers Working;
+ val waiting = count_workers Waiting;
in
"SCHEDULE " ^ Time.toString now ^ ": " ^
string_of_int ready ^ " ready, " ^
string_of_int pending ^ " pending, " ^
string_of_int running ^ " running; " ^
string_of_int total ^ " workers, " ^
- string_of_int active ^ " active "
+ string_of_int active ^ " active, " ^
+ string_of_int waiting ^ " waiting "
end)
else ();
@@ -373,7 +372,7 @@
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
(NONE, []) => NONE
- | (NONE, deps') => (worker_wait work_finished; join_next deps')
+ | (NONE, deps') => (worker_wait true work_finished; join_next deps')
| (SOME work, deps') => SOME (work, deps'));
fun execute_work NONE = ()