src/Pure/Concurrent/future.ML
changeset 33406 1ddcb8472bd2
parent 33061 e3e61133e0fc
child 33407 1427333220bc
--- a/src/Pure/Concurrent/future.ML	Tue Nov 03 10:36:20 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Tue Nov 03 19:52:09 2009 +0100
@@ -103,9 +103,10 @@
 
 val queue = Unsynchronized.ref Task_Queue.empty;
 val next = Unsynchronized.ref 0;
-val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
+val max_workers = Unsynchronized.ref 0;
+val max_active = Unsynchronized.ref 0;
+val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
-val excessive = Unsynchronized.ref 0;
 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
 val do_shutdown = Unsynchronized.ref false;
 
@@ -186,26 +187,30 @@
 (* worker activity *)
 
 fun count_active () = (*requires SYNCHRONIZED*)
-  fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
+  fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
 
-fun change_active active = (*requires SYNCHRONIZED*)
-  Unsynchronized.change workers
-    (AList.update Thread.equal (Thread.self (), active));
+fun find_active () = (*requires SYNCHRONIZED*)
+  (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
+    SOME active => active
+  | NONE => raise Fail "Unregistered worker thread");
 
 
 (* worker threads *)
 
 fun worker_wait cond = (*requires SYNCHRONIZED*)
-  (change_active false; wait cond; change_active true);
+  let
+    val active = find_active ();
+    val _ = active := false;
+    val _ = wait cond;
+    val _ = active := true;
+  in () end;
 
 fun worker_next () = (*requires SYNCHRONIZED*)
-  if ! excessive > 0 then
-    (Unsynchronized.dec excessive;
-     Unsynchronized.change workers
-      (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
+  if length (! workers) > ! max_workers then
+    (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
      broadcast scheduler_event;
      NONE)
-  else if count_active () > Multithreading.max_threads_value () then
+  else if count_active () > ! max_active then
     (worker_wait scheduler_event; worker_next ())
   else
     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
@@ -217,9 +222,12 @@
     NONE => ()
   | SOME work => (execute name work; worker_loop name));
 
-fun worker_start name = (*requires SYNCHRONIZED*)
-  Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
-     (broadcast scheduler_event; worker_loop name)), true));
+fun worker_start name =
+  SimpleThread.fork false (fn () =>
+   (SYNCHRONIZED name (fn () =>
+      Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
+    broadcast scheduler_event;
+    worker_loop name));
 
 
 (* scheduler *)
@@ -261,13 +269,16 @@
               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
 
     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
+    val _ = max_active := m;
+
     val mm = if m = 9999 then 1 else m * 2;
+    val _ = max_workers := mm;
+
     val l = length (! workers);
-    val _ = excessive := l - mm;
     val _ =
       if mm > l then
         funpow (mm - l) (fn () =>
-          worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
+          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
       else ();
 
     (*canceled groups*)