--- a/src/Pure/Concurrent/future.ML Thu Sep 11 21:04:09 2008 +0200
+++ b/src/Pure/Concurrent/future.ML Thu Sep 11 21:53:53 2008 +0200
@@ -37,8 +37,8 @@
val join_results: 'a T list -> 'a Exn.result list
val join: 'a T -> 'a
val focus: task list -> unit
+ val interrupt_task: string -> unit
val cancel: 'a T -> unit
- val interrupt_task: string -> unit
val shutdown: unit -> unit
end;
@@ -77,17 +77,9 @@
val queue = ref TaskQueue.empty;
val workers = ref ([]: (Thread.thread * bool) list);
val scheduler = ref (NONE: Thread.thread option);
-
val excessive = ref 0;
-
-
-(* requests *)
-
-datatype request = Shutdown | Cancel of group;
-val requests = Mailbox.create () : request Mailbox.T;
-
-fun shutdown_request () = Mailbox.send requests Shutdown;
-fun cancel_request group = Mailbox.send requests (Cancel group);
+val canceled = ref ([]: TaskQueue.group list);
+val do_shutdown = ref false;
(* synchronization *)
@@ -108,11 +100,10 @@
in Exn.release result end) ();
fun wait name = (*requires SYNCHRONIZED*)
- let
- val _ = Multithreading.tracing 4 (fn () => name ^ ": waiting");
- val _ = ConditionVar.wait (cond, lock);
- val _ = Multithreading.tracing 4 (fn () => name ^ ": notified");
- in () end;
+ ConditionVar.wait (cond, lock);
+
+fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
+ ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
fun notify_all () = (*requires SYNCHRONIZED*)
ConditionVar.broadcast cond;
@@ -133,7 +124,7 @@
(change queue (TaskQueue.finish task);
if ok then ()
else if TaskQueue.cancel (! queue) group then ()
- else cancel_request group;
+ else change canceled (cons group);
notify_all ()));
in () end;
@@ -175,8 +166,9 @@
(* scheduler *)
-fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () =>
+fun scheduler_next () = (*requires SYNCHRONIZED*)
let
+ (*worker threads*)
val _ =
(case List.partition (Thread.isActive o #1) (! workers) of
(_, []) => ()
@@ -184,37 +176,37 @@
(workers := active; Multithreading.tracing 0 (fn () =>
"SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
- val m = if shutdown then 0 else Multithreading.max_threads_value ();
+ val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
val l = length (! workers);
val _ = excessive := l - m;
val _ =
if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) ()
else ();
- val terminate = shutdown andalso null (! workers);
- val _ = if terminate then scheduler := NONE else ();
- val _ = notify_all ();
- in terminate end);
+
+ (*canceled groups*)
+ val _ = change canceled (filter_out (TaskQueue.cancel (! queue)));
+
+ (*shutdown*)
+ val continue = not (! do_shutdown andalso null (! workers));
+ val _ = if continue then () else scheduler := NONE;
-fun scheduler_loop (shutdown, canceled) =
- if scheduler_fork shutdown then Multithreading.tracing 4 (fn () => "scheduler: exit")
- else
- let
- val canceled' = SYNCHRONIZED "scheduler"
- (fn () => filter_out (TaskQueue.cancel (! queue)) canceled);
- in
- (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
- SOME Shutdown => scheduler_loop (true, canceled')
- | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled')
- | NONE => scheduler_loop (shutdown, canceled'))
- end;
+ val _ = notify_all ();
+ val _ = wait_timeout "scheduler" (Time.fromSeconds 1);
+ in continue end;
+
+fun scheduler_loop () =
+ (while SYNCHRONIZED "scheduler" scheduler_next do ();
+ Multithreading.tracing 4 (fn () => "scheduler: exit"));
fun scheduler_active () = (*requires SYNCHRONIZED*)
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () =>
- if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then ()
- else scheduler :=
- SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts)));
+ if not (scheduler_active ()) then
+ (do_shutdown := false;
+ scheduler := SOME (Thread.fork (scheduler_loop, Multithreading.no_interrupts)))
+ else if ! do_shutdown then error "Scheduler shutdown in progress"
+ else ());
(* future values: fork independent computation *)
@@ -242,9 +234,9 @@
fun join_results xs =
let
+ val _ = scheduler_check ();
val _ = Multithreading.self_critical () andalso
error "Cannot join future values within critical section";
- val _ = scheduler_check ();
fun unfinished () =
xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
@@ -279,20 +271,23 @@
fun focus tasks = SYNCHRONIZED "interrupt" (fn () =>
change queue (TaskQueue.focus tasks));
-(*cancel: present and future group members will be interrupted eventually*)
-fun cancel x = (scheduler_check (); cancel_request (group_of x));
-
(*interrupt: permissive signal, may get ignored*)
fun interrupt_task id = SYNCHRONIZED "interrupt"
(fn () => TaskQueue.interrupt_external (! queue) id);
+(*cancel: present and future group members will be interrupted eventually*)
+fun cancel x =
+ (scheduler_check (); SYNCHRONIZED "cancel" (fn () => change canceled (cons (group_of x))));
+
+
(*global join and shutdown*)
fun shutdown () =
(scheduler_check ();
SYNCHRONIZED "shutdown" (fn () =>
- (while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
- shutdown_request ();
+ (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
+ while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
+ do_shutdown := true;
while not (null (! workers)) do wait "shutdown: workers";
- while scheduler_active () do wait "shutdown: scheduler")));
+ while scheduler_active () do wait "shutdown: scheduler still active")));
end;