# HG changeset patch # User wenzelm # Date 1248168612 -7200 # Node ID ad4be204fdfe3d42db554fc409dbe432877dfdbb # Parent 89b9210c7506455e934d98df705e7d10b615fb53 tuned tracing; provide spare worker threads to saturate hardware while other workers wait during join; diff -r 89b9210c7506 -r ad4be204fdfe src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jul 21 10:24:57 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Tue Jul 21 11:30:12 2009 +0200 @@ -134,16 +134,22 @@ (* worker activity *) -fun trace_active () = +fun count_active ws = + fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0; + +fun trace_active () = Multithreading.tracing 1 (fn () => let val ws = ! workers; val m = string_of_int (length ws); - val n = string_of_int (length (filter #2 ws)); - in Multithreading.tracing 2 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; + val n = string_of_int (count_active ws); + in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end); fun change_active active = (*requires SYNCHRONIZED*) change workers (AList.update Thread.equal (Thread.self (), active)); +fun overloaded () = + count_active (! workers) > Multithreading.max_threads_value (); + (* execute jobs *) @@ -176,6 +182,7 @@ change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); notify_all (); NONE) + else if overloaded () then (worker_wait (); worker_next ()) else (case change_result queue Task_Queue.dequeue of NONE => (worker_wait (); worker_next ()) @@ -204,26 +211,31 @@ end); (*worker threads*) + val ws = ! workers; val _ = - (case List.partition (Thread.isActive o #1) (! workers) of - (_, []) => () - | (active, inactive) => - (workers := active; Multithreading.tracing 0 (fn () => - "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); + if forall (Thread.isActive o #1) ws then () + else + (case List.partition (Thread.isActive o #1) ws of + (_, []) => () + | (active, inactive) => + (workers := active; Multithreading.tracing 0 (fn () => + "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); val _ = trace_active (); val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); - val l = length (! workers); - val _ = excessive := l - m; + val mm = (m * 3) div 2; + val l = length ws; + val _ = excessive := l - mm; val _ = - if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) () + if mm > l then + funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) () else (); (*canceled groups*) - val _ = change canceled (filter_out (Task_Queue.cancel (! queue))); + val _ = change canceled (filter_out (Task_Queue.cancel (! queue))); (*shutdown*) - val continue = not (! do_shutdown andalso null (! workers)); + val continue = not (! do_shutdown andalso null ws); val _ = if continue then () else scheduler := NONE; val _ = notify_all (); @@ -292,8 +304,12 @@ fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); +fun join_next deps = (*requires SYNCHRONIZED*) + if overloaded () then (worker_wait (); join_next deps) + else change_result queue (Task_Queue.dequeue_towards deps); + fun join_deps deps = - (case SYNCHRONIZED "join" (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of + (case SYNCHRONIZED "join" (fn () => join_next deps) of NONE => () | SOME (work, deps') => (execute "join" work; join_deps deps'));