# HG changeset patch # User wenzelm # Date 1257422471 -3600 # Node ID 352fe8e9162d56c3dae23edd9243f0daf7bc2313 # Parent 9348016909914941bc3412dd6bad7bacf5f98c9e worker_next: plain signalling via work_available only, not scheduler_event; scheduler: tuned worker pool adjustments; diff -r 934801690991 -r 352fe8e9162d src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Thu Nov 05 00:13:00 2009 +0100 +++ b/src/Pure/Concurrent/future.ML Thu Nov 05 13:01:11 2009 +0100 @@ -203,23 +203,20 @@ val _ = state := Working; in () end; -fun worker_next have_work = (*requires SYNCHRONIZED*) +fun worker_next () = (*requires SYNCHRONIZED*) if length (! workers) > ! max_workers then (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); - if have_work then signal work_available else (); - broadcast scheduler_event; + signal work_available; NONE) else if count_workers Working > ! max_active then - (if have_work then signal work_available else (); - worker_wait false scheduler_event; - worker_next false) + (worker_wait false work_available; worker_next ()) else (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of - NONE => (worker_wait false work_available; worker_next true) + NONE => (worker_wait false work_available; worker_next ()) | some => (signal work_available; some)); fun worker_loop name = - (case SYNCHRONIZED name (fn () => worker_next false) of + (case SYNCHRONIZED name (fn () => worker_next ()) of NONE => () | SOME work => (execute work; worker_loop name)); @@ -241,7 +238,9 @@ val tick = Time.<= (Time.+ (! last_round, next_round), now); val _ = if tick then last_round := now else (); - (*queue and worker status*) + + (* queue and worker status *) + val _ = if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); val _ = @@ -263,7 +262,6 @@ end) else (); - (*worker threads*) val _ = if forall (Thread.isActive o #1) (! workers) then () else @@ -275,6 +273,12 @@ "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads") end; + + (* worker pool adjustments *) + + val max_active0 = ! max_active; + val max_workers0 = ! max_workers; + val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); val _ = max_active := m; @@ -289,19 +293,26 @@ Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) else (); val _ = - if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then max_workers := mm - else if ! worker_trend > 5 then max_workers := Int.min (mm, 2 * m) + if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then + max_workers := mm + else if ! worker_trend > 5 andalso ! max_workers < 2 * m then + max_workers := Int.min (mm, 2 * m) else (); val missing = ! max_workers - length (! workers); val _ = if missing > 0 then - (funpow missing (fn () => - ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) (); - broadcast scheduler_event) + funpow missing (fn () => + ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () else (); - (*canceled groups*) + val _ = + if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () + else signal work_available; + + + (* canceled groups *) + val _ = if null (! canceled) then () else @@ -310,13 +321,18 @@ Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ()); - (*delay loop*) + + (* delay loop *) + val _ = Exn.release (wait_timeout next_round scheduler_event); - (*shutdown*) + + (* shutdown *) + val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else (); val continue = not (! do_shutdown andalso null (! workers)); val _ = if continue then () else scheduler := NONE; + val _ = broadcast scheduler_event; in continue end handle Exn.Interrupt =>