--- a/src/Pure/Concurrent/future.ML Tue Nov 03 10:36:20 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Tue Nov 03 19:52:09 2009 +0100
@@ -103,9 +103,10 @@
val queue = Unsynchronized.ref Task_Queue.empty;
val next = Unsynchronized.ref 0;
-val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
+val max_workers = Unsynchronized.ref 0;
+val max_active = Unsynchronized.ref 0;
+val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
-val excessive = Unsynchronized.ref 0;
val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
val do_shutdown = Unsynchronized.ref false;
@@ -186,26 +187,30 @@
(* worker activity *)
fun count_active () = (*requires SYNCHRONIZED*)
- fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
+ fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
-fun change_active active = (*requires SYNCHRONIZED*)
- Unsynchronized.change workers
- (AList.update Thread.equal (Thread.self (), active));
+fun find_active () = (*requires SYNCHRONIZED*)
+ (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
+ SOME active => active
+ | NONE => raise Fail "Unregistered worker thread");
(* worker threads *)
fun worker_wait cond = (*requires SYNCHRONIZED*)
- (change_active false; wait cond; change_active true);
+ let
+ val active = find_active ();
+ val _ = active := false;
+ val _ = wait cond;
+ val _ = active := true;
+ in () end;
fun worker_next () = (*requires SYNCHRONIZED*)
- if ! excessive > 0 then
- (Unsynchronized.dec excessive;
- Unsynchronized.change workers
- (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
+ if length (! workers) > ! max_workers then
+ (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
broadcast scheduler_event;
NONE)
- else if count_active () > Multithreading.max_threads_value () then
+ else if count_active () > ! max_active then
(worker_wait scheduler_event; worker_next ())
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
@@ -217,9 +222,12 @@
NONE => ()
| SOME work => (execute name work; worker_loop name));
-fun worker_start name = (*requires SYNCHRONIZED*)
- Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
- (broadcast scheduler_event; worker_loop name)), true));
+fun worker_start name =
+ SimpleThread.fork false (fn () =>
+ (SYNCHRONIZED name (fn () =>
+ Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
+ broadcast scheduler_event;
+ worker_loop name));
(* scheduler *)
@@ -261,13 +269,16 @@
"SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
+ val _ = max_active := m;
+
val mm = if m = 9999 then 1 else m * 2;
+ val _ = max_workers := mm;
+
val l = length (! workers);
- val _ = excessive := l - mm;
val _ =
if mm > l then
funpow (mm - l) (fn () =>
- worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
+ ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
else ();
(*canceled groups*)