# HG changeset patch # User wenzelm # Date 1257274329 -3600 # Node ID 1ddcb8472bd244f17811a9c27fb49abe62c08b37 # Parent 5c1928d5db38c309b03aef388f2a0e07b740691e slightly leaner and more direct control of worker activity etc.; diff -r 5c1928d5db38 -r 1ddcb8472bd2 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Nov 03 10:36:20 2009 +0100 +++ b/src/Pure/Concurrent/future.ML Tue Nov 03 19:52:09 2009 +0100 @@ -103,9 +103,10 @@ val queue = Unsynchronized.ref Task_Queue.empty; val next = Unsynchronized.ref 0; -val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list); +val max_workers = Unsynchronized.ref 0; +val max_active = Unsynchronized.ref 0; +val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list); val scheduler = Unsynchronized.ref (NONE: Thread.thread option); -val excessive = Unsynchronized.ref 0; val canceled = Unsynchronized.ref ([]: Task_Queue.group list); val do_shutdown = Unsynchronized.ref false; @@ -186,26 +187,30 @@ (* worker activity *) fun count_active () = (*requires SYNCHRONIZED*) - fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0; + fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0; -fun change_active active = (*requires SYNCHRONIZED*) - Unsynchronized.change workers - (AList.update Thread.equal (Thread.self (), active)); +fun find_active () = (*requires SYNCHRONIZED*) + (case AList.lookup Thread.equal (! workers) (Thread.self ()) of + SOME active => active + | NONE => raise Fail "Unregistered worker thread"); (* worker threads *) fun worker_wait cond = (*requires SYNCHRONIZED*) - (change_active false; wait cond; change_active true); + let + val active = find_active (); + val _ = active := false; + val _ = wait cond; + val _ = active := true; + in () end; fun worker_next () = (*requires SYNCHRONIZED*) - if ! excessive > 0 then - (Unsynchronized.dec excessive; - Unsynchronized.change workers - (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); + if length (! workers) > ! max_workers then + (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); broadcast scheduler_event; NONE) - else if count_active () > Multithreading.max_threads_value () then + else if count_active () > ! max_active then (worker_wait scheduler_event; worker_next ()) else (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of @@ -217,9 +222,12 @@ NONE => () | SOME work => (execute name work; worker_loop name)); -fun worker_start name = (*requires SYNCHRONIZED*) - Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => - (broadcast scheduler_event; worker_loop name)), true)); +fun worker_start name = + SimpleThread.fork false (fn () => + (SYNCHRONIZED name (fn () => + Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true))); + broadcast scheduler_event; + worker_loop name)); (* scheduler *) @@ -261,13 +269,16 @@ "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads"))); val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); + val _ = max_active := m; + val mm = if m = 9999 then 1 else m * 2; + val _ = max_workers := mm; + val l = length (! workers); - val _ = excessive := l - mm; val _ = if mm > l then funpow (mm - l) (fn () => - worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) () + ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () else (); (*canceled groups*)