worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
authorwenzelm
Wed, 04 Nov 2009 00:29:58 +0100
changeset 33407 1427333220bc
parent 33406 1ddcb8472bd2
child 33408 a69ddd7dce95
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock); worker_start: back to non-self version; scheduler: status output based on ticks;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Tue Nov 03 19:52:09 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Nov 04 00:29:58 2009 +0100
@@ -205,58 +205,63 @@
     val _ = active := true;
   in () end;
 
-fun worker_next () = (*requires SYNCHRONIZED*)
+fun worker_next has_work = (*requires SYNCHRONIZED*)
   if length (! workers) > ! max_workers then
     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
      broadcast scheduler_event;
+     if has_work then signal work_available else ();
      NONE)
   else if count_active () > ! max_active then
-    (worker_wait scheduler_event; worker_next ())
+    (if has_work then signal work_available else ();
+     worker_wait scheduler_event;
+     worker_next false)
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
-      NONE => (worker_wait work_available; worker_next ())
+      NONE => (worker_wait work_available; worker_next true)
     | some => some);
 
 fun worker_loop name =
-  (case SYNCHRONIZED name (fn () => worker_next ()) of
+  (case SYNCHRONIZED name (fn () => worker_next false) of
     NONE => ()
   | SOME work => (execute name work; worker_loop name));
 
-fun worker_start name =
-  SimpleThread.fork false (fn () =>
-   (SYNCHRONIZED name (fn () =>
-      Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
-    broadcast scheduler_event;
-    worker_loop name));
+fun worker_start name = (*requires SYNCHRONIZED*)
+  Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
+    Unsynchronized.ref true));
 
 
 (* scheduler *)
 
-val last_status = Unsynchronized.ref Time.zeroTime;
-val next_status = Time.fromMilliseconds 500;
+val status_ticks = Unsynchronized.ref 0;
+
+val last_round = Unsynchronized.ref Time.zeroTime;
 val next_round = Time.fromMilliseconds 50;
 
 fun scheduler_next () = (*requires SYNCHRONIZED*)
   let
+    val now = Time.now ();
+    val tick = Time.<= (Time.+ (! last_round, next_round), now);
+    val _ = if tick then last_round := now else ();
+
     (*queue and worker status*)
     val _ =
-      let val now = Time.now () in
-        if Time.> (Time.+ (! last_status, next_status), now) then ()
-        else
-         (last_status := now; Multithreading.tracing 1 (fn () =>
-            let
-              val {ready, pending, running} = Task_Queue.status (! queue);
-              val total = length (! workers);
-              val active = count_active ();
-            in
-              "SCHEDULE " ^ Time.toString now ^ ": " ^
-                string_of_int ready ^ " ready, " ^
-                string_of_int pending ^ " pending, " ^
-                string_of_int running ^ " running; " ^
-                string_of_int total ^ " workers, " ^
-                string_of_int active ^ " active"
-            end))
-      end;
+      if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
+    val _ =
+      if tick andalso ! status_ticks = 0 then
+        Multithreading.tracing 1 (fn () =>
+          let
+            val {ready, pending, running} = Task_Queue.status (! queue);
+            val total = length (! workers);
+            val active = count_active ();
+          in
+            "SCHEDULE " ^ Time.toString now ^ ": " ^
+              string_of_int ready ^ " ready, " ^
+              string_of_int pending ^ " pending, " ^
+              string_of_int running ^ " running; " ^
+              string_of_int total ^ " workers, " ^
+              string_of_int active ^ " active "
+          end)
+      else ();
 
     (*worker threads*)
     val _ =
@@ -274,11 +279,12 @@
     val mm = if m = 9999 then 1 else m * 2;
     val _ = max_workers := mm;
 
-    val l = length (! workers);
+    val missing = ! max_workers - length (! workers);
     val _ =
-      if mm > l then
-        funpow (mm - l) (fn () =>
-          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
+      if missing > 0 then
+       (funpow missing (fn () =>
+          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
+        broadcast scheduler_event)
       else ();
 
     (*canceled groups*)