shutdown: global join-and-shutdown operation;
reduced trace_active;
scheduler_fork: always notify;
tracing for thread exit;
unique ids for workers;
--- a/src/Pure/Concurrent/future.ML Thu Sep 11 18:07:58 2008 +0200
+++ b/src/Pure/Concurrent/future.ML Thu Sep 11 21:04:05 2008 +0200
@@ -32,7 +32,6 @@
type 'a T
val task_of: 'a T -> task
val group_of: 'a T -> group
- val shutdown_request: unit -> unit
val future: group option -> task list -> (unit -> 'a) -> 'a T
val fork: (unit -> 'a) -> 'a T
val join_results: 'a T list -> 'a Exn.result list
@@ -40,6 +39,7 @@
val focus: task list -> unit
val cancel: 'a T -> unit
val interrupt_task: string -> unit
+ val shutdown: unit -> unit
end;
structure Future: FUTURE =
@@ -80,13 +80,6 @@
val excessive = ref 0;
-fun trace_active () =
- let
- val ws = ! workers;
- val m = string_of_int (length ws);
- val n = string_of_int (length (filter #2 ws));
- in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
-
(* requests *)
@@ -148,7 +141,13 @@
(* worker threads *)
fun change_active active = (*requires SYNCHRONIZED*)
- (change workers (AList.update Thread.equal (Thread.self (), active)); trace_active ());
+ let
+ val _ = change workers (AList.update Thread.equal (Thread.self (), active));
+ val ws = ! workers;
+ val m = string_of_int (length ws);
+ val n = string_of_int (length (filter #2 ws));
+ in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
+
fun worker_wait name = (*requires SYNCHRONIZED*)
(change_active false; wait name; change_active true);
@@ -157,6 +156,7 @@
if ! excessive > 0 then
(dec excessive;
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
+ notify_all ();
NONE)
else
(case change_result queue TaskQueue.dequeue of
@@ -165,7 +165,7 @@
fun worker_loop name =
(case SYNCHRONIZED name (fn () => worker_next name) of
- NONE => ()
+ NONE => Multithreading.tracing 4 (fn () => name ^ ": exit")
| SOME work => (execute name work; worker_loop name));
fun worker_start name = (*requires SYNCHRONIZED*)
@@ -177,7 +177,6 @@
fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () =>
let
- val _ = trace_active ();
val _ =
(case List.partition (Thread.isActive o #1) (! workers) of
(_, []) => ()
@@ -188,12 +187,16 @@
val m = if shutdown then 0 else Multithreading.max_threads_value ();
val l = length (! workers);
val _ = excessive := l - m;
- val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1);
- val _ = if shutdown then notify_all () else ();
- in shutdown andalso null (! workers) end);
+ val _ =
+ if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) ()
+ else ();
+ val terminate = shutdown andalso null (! workers);
+ val _ = if terminate then scheduler := NONE else ();
+ val _ = notify_all ();
+ in terminate end);
fun scheduler_loop (shutdown, canceled) =
- if scheduler_fork shutdown then ()
+ if scheduler_fork shutdown then Multithreading.tracing 4 (fn () => "scheduler: exit")
else
let
val canceled' = SYNCHRONIZED "scheduler"
@@ -205,6 +208,9 @@
| NONE => scheduler_loop (shutdown, canceled'))
end;
+fun scheduler_active () = (*requires SYNCHRONIZED*)
+ (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
+
fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () =>
if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then ()
else scheduler :=
@@ -246,20 +252,20 @@
(*alien thread -- refrain from contending for resources*)
fun passive_join () = (*requires SYNCHRONIZED*)
(case unfinished () of [] => ()
- | _ => (wait "join"; passive_join ()));
+ | _ => (wait "passive_join"; passive_join ()));
(*proper worker thread -- actively work towards results*)
fun active_join () = (*requires SYNCHRONIZED*)
(case unfinished () of [] => ()
| tasks =>
(case change_result queue (TaskQueue.dequeue_towards tasks) of
- NONE => (worker_wait "join"; active_join ())
- | SOME work => (execute "join" work; active_join ())));
+ NONE => (worker_wait "active_join"; active_join ())
+ | SOME work => (execute "active_join" work; active_join ())));
val _ =
(case thread_data () of
- NONE => SYNCHRONIZED "join" passive_join
- | SOME (task, _) => SYNCHRONIZED "join" (fn () =>
+ NONE => SYNCHRONIZED "passive_join" passive_join
+ | SOME (task, _) => SYNCHRONIZED "active_join" (fn () =>
(change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
in xs |> map (fn Future {result = ref (SOME res), ...} => res) end;
@@ -280,4 +286,13 @@
fun interrupt_task id = SYNCHRONIZED "interrupt"
(fn () => TaskQueue.interrupt_external (! queue) id);
+(*global join and shutdown*)
+fun shutdown () =
+ (scheduler_check ();
+ SYNCHRONIZED "shutdown" (fn () =>
+ (while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
+ shutdown_request ();
+ while not (null (! workers)) do wait "shutdown: workers";
+ while scheduler_active () do wait "shutdown: scheduler")));
+
end;