# HG changeset patch # User wenzelm # Date 1221159845 -7200 # Node ID 88f18387f1c93df1718e670b6825b6939216e1c1 # Parent 23cb9a974630bb0257b11a664b9ee4a42cfd847f shutdown: global join-and-shutdown operation; reduced trace_active; scheduler_fork: always notify; tracing for thread exit; unique ids for workers; diff -r 23cb9a974630 -r 88f18387f1c9 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Thu Sep 11 18:07:58 2008 +0200 +++ b/src/Pure/Concurrent/future.ML Thu Sep 11 21:04:05 2008 +0200 @@ -32,7 +32,6 @@ type 'a T val task_of: 'a T -> task val group_of: 'a T -> group - val shutdown_request: unit -> unit val future: group option -> task list -> (unit -> 'a) -> 'a T val fork: (unit -> 'a) -> 'a T val join_results: 'a T list -> 'a Exn.result list @@ -40,6 +39,7 @@ val focus: task list -> unit val cancel: 'a T -> unit val interrupt_task: string -> unit + val shutdown: unit -> unit end; structure Future: FUTURE = @@ -80,13 +80,6 @@ val excessive = ref 0; -fun trace_active () = - let - val ws = ! workers; - val m = string_of_int (length ws); - val n = string_of_int (length (filter #2 ws)); - in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; - (* requests *) @@ -148,7 +141,13 @@ (* worker threads *) fun change_active active = (*requires SYNCHRONIZED*) - (change workers (AList.update Thread.equal (Thread.self (), active)); trace_active ()); + let + val _ = change workers (AList.update Thread.equal (Thread.self (), active)); + val ws = ! workers; + val m = string_of_int (length ws); + val n = string_of_int (length (filter #2 ws)); + in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; + fun worker_wait name = (*requires SYNCHRONIZED*) (change_active false; wait name; change_active true); @@ -157,6 +156,7 @@ if ! excessive > 0 then (dec excessive; change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); + notify_all (); NONE) else (case change_result queue TaskQueue.dequeue of @@ -165,7 +165,7 @@ fun worker_loop name = (case SYNCHRONIZED name (fn () => worker_next name) of - NONE => () + NONE => Multithreading.tracing 4 (fn () => name ^ ": exit") | SOME work => (execute name work; worker_loop name)); fun worker_start name = (*requires SYNCHRONIZED*) @@ -177,7 +177,6 @@ fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () => let - val _ = trace_active (); val _ = (case List.partition (Thread.isActive o #1) (! workers) of (_, []) => () @@ -188,12 +187,16 @@ val m = if shutdown then 0 else Multithreading.max_threads_value (); val l = length (! workers); val _ = excessive := l - m; - val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1); - val _ = if shutdown then notify_all () else (); - in shutdown andalso null (! workers) end); + val _ = + if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) () + else (); + val terminate = shutdown andalso null (! workers); + val _ = if terminate then scheduler := NONE else (); + val _ = notify_all (); + in terminate end); fun scheduler_loop (shutdown, canceled) = - if scheduler_fork shutdown then () + if scheduler_fork shutdown then Multithreading.tracing 4 (fn () => "scheduler: exit") else let val canceled' = SYNCHRONIZED "scheduler" @@ -205,6 +208,9 @@ | NONE => scheduler_loop (shutdown, canceled')) end; +fun scheduler_active () = (*requires SYNCHRONIZED*) + (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); + fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () => if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () else scheduler := @@ -246,20 +252,20 @@ (*alien thread -- refrain from contending for resources*) fun passive_join () = (*requires SYNCHRONIZED*) (case unfinished () of [] => () - | _ => (wait "join"; passive_join ())); + | _ => (wait "passive_join"; passive_join ())); (*proper worker thread -- actively work towards results*) fun active_join () = (*requires SYNCHRONIZED*) (case unfinished () of [] => () | tasks => (case change_result queue (TaskQueue.dequeue_towards tasks) of - NONE => (worker_wait "join"; active_join ()) - | SOME work => (execute "join" work; active_join ()))); + NONE => (worker_wait "active_join"; active_join ()) + | SOME work => (execute "active_join" work; active_join ()))); val _ = (case thread_data () of - NONE => SYNCHRONIZED "join" passive_join - | SOME (task, _) => SYNCHRONIZED "join" (fn () => + NONE => SYNCHRONIZED "passive_join" passive_join + | SOME (task, _) => SYNCHRONIZED "active_join" (fn () => (change queue (TaskQueue.depend (unfinished ()) task); active_join ()))); in xs |> map (fn Future {result = ref (SOME res), ...} => res) end; @@ -280,4 +286,13 @@ fun interrupt_task id = SYNCHRONIZED "interrupt" (fn () => TaskQueue.interrupt_external (! queue) id); +(*global join and shutdown*) +fun shutdown () = + (scheduler_check (); + SYNCHRONIZED "shutdown" (fn () => + (while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join"; + shutdown_request (); + while not (null (! workers)) do wait "shutdown: workers"; + while scheduler_active () do wait "shutdown: scheduler"))); + end;