worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
worker_start: back to non-self version;
scheduler: status output based on ticks;
--- a/src/Pure/Concurrent/future.ML Tue Nov 03 19:52:09 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Wed Nov 04 00:29:58 2009 +0100
@@ -205,58 +205,63 @@
val _ = active := true;
in () end;
-fun worker_next () = (*requires SYNCHRONIZED*)
+fun worker_next has_work = (*requires SYNCHRONIZED*)
if length (! workers) > ! max_workers then
(Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
broadcast scheduler_event;
+ if has_work then signal work_available else ();
NONE)
else if count_active () > ! max_active then
- (worker_wait scheduler_event; worker_next ())
+ (if has_work then signal work_available else ();
+ worker_wait scheduler_event;
+ worker_next false)
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
- NONE => (worker_wait work_available; worker_next ())
+ NONE => (worker_wait work_available; worker_next true)
| some => some);
fun worker_loop name =
- (case SYNCHRONIZED name (fn () => worker_next ()) of
+ (case SYNCHRONIZED name (fn () => worker_next false) of
NONE => ()
| SOME work => (execute name work; worker_loop name));
-fun worker_start name =
- SimpleThread.fork false (fn () =>
- (SYNCHRONIZED name (fn () =>
- Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
- broadcast scheduler_event;
- worker_loop name));
+fun worker_start name = (*requires SYNCHRONIZED*)
+ Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
+ Unsynchronized.ref true));
(* scheduler *)
-val last_status = Unsynchronized.ref Time.zeroTime;
-val next_status = Time.fromMilliseconds 500;
+val status_ticks = Unsynchronized.ref 0;
+
+val last_round = Unsynchronized.ref Time.zeroTime;
val next_round = Time.fromMilliseconds 50;
fun scheduler_next () = (*requires SYNCHRONIZED*)
let
+ val now = Time.now ();
+ val tick = Time.<= (Time.+ (! last_round, next_round), now);
+ val _ = if tick then last_round := now else ();
+
(*queue and worker status*)
val _ =
- let val now = Time.now () in
- if Time.> (Time.+ (! last_status, next_status), now) then ()
- else
- (last_status := now; Multithreading.tracing 1 (fn () =>
- let
- val {ready, pending, running} = Task_Queue.status (! queue);
- val total = length (! workers);
- val active = count_active ();
- in
- "SCHEDULE " ^ Time.toString now ^ ": " ^
- string_of_int ready ^ " ready, " ^
- string_of_int pending ^ " pending, " ^
- string_of_int running ^ " running; " ^
- string_of_int total ^ " workers, " ^
- string_of_int active ^ " active"
- end))
- end;
+ if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
+ val _ =
+ if tick andalso ! status_ticks = 0 then
+ Multithreading.tracing 1 (fn () =>
+ let
+ val {ready, pending, running} = Task_Queue.status (! queue);
+ val total = length (! workers);
+ val active = count_active ();
+ in
+ "SCHEDULE " ^ Time.toString now ^ ": " ^
+ string_of_int ready ^ " ready, " ^
+ string_of_int pending ^ " pending, " ^
+ string_of_int running ^ " running; " ^
+ string_of_int total ^ " workers, " ^
+ string_of_int active ^ " active "
+ end)
+ else ();
(*worker threads*)
val _ =
@@ -274,11 +279,12 @@
val mm = if m = 9999 then 1 else m * 2;
val _ = max_workers := mm;
- val l = length (! workers);
+ val missing = ! max_workers - length (! workers);
val _ =
- if mm > l then
- funpow (mm - l) (fn () =>
- ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
+ if missing > 0 then
+ (funpow missing (fn () =>
+ ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
+ broadcast scheduler_event)
else ();
(*canceled groups*)