scheduler: shutdown spontaneously (after some delay) if queue is empty;
scheduler_check: critical, only performed after fork/enqueue;
shutdown: passively wait for termination;
--- a/src/Pure/Concurrent/future.ML Mon Jul 27 16:08:41 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Mon Jul 27 16:53:28 2009 +0200
@@ -277,12 +277,14 @@
if null (! canceled) then ()
else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ());
- val timeout =
+ (*delay loop*)
+ val delay =
Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
- val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
+ val _ = interruptible (fn () => wait_timeout scheduler_event delay) ()
handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
(*shutdown*)
+ val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
val continue = not (! do_shutdown andalso null (! workers));
val _ = if continue then () else scheduler := NONE;
val _ = broadcast scheduler_event;
@@ -294,11 +296,10 @@
fun scheduler_active () = (*requires SYNCHRONIZED*)
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
-fun scheduler_check name = SYNCHRONIZED name (fn () =>
+fun scheduler_check () = (*requires SYNCHRONIZED*)
+ (do_shutdown := false;
if not (scheduler_active ()) then
- (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
- broadcast scheduler_event)
- else if ! do_shutdown then error "Scheduler shutdown in progress"
+ (scheduler := SOME (SimpleThread.fork false scheduler_loop); broadcast scheduler_event)
else ());
@@ -309,8 +310,6 @@
fun fork_future opt_group deps pri e =
let
- val _ = scheduler_check "future check";
-
val group =
(case opt_group of
SOME group => group
@@ -320,6 +319,7 @@
let
val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
val _ = if minimal then signal work_available else ();
+ val _ = scheduler_check ();
in task end);
in Future {task = task, group = group, result = result} end;
@@ -364,7 +364,6 @@
if forall is_finished xs then map get_result xs
else uninterruptible (fn _ => fn () =>
let
- val _ = scheduler_check "join check";
val _ = Multithreading.self_critical () andalso
error "Cannot join future values within critical section";
val _ =
@@ -382,8 +381,6 @@
fun map_future f x =
let
- val _ = scheduler_check "map_future check";
-
val task = task_of x;
val group = Task_Queue.new_group (SOME (group_of x));
val (result, job) = future_job group (fn () => f (join x));
@@ -409,24 +406,18 @@
(fn _ => f) x
else interruptible f x;
-(*cancel_group: present and future group members will be interrupted eventually*)
-fun cancel_group group =
- (scheduler_check "cancel check";
- SYNCHRONIZED "cancel" (fn () => do_cancel group));
-
+(*cancel: present and future group members will be interrupted eventually*)
+fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
fun cancel x = cancel_group (group_of x);
-(** global join and shutdown **)
+(* shutdown *)
fun shutdown () =
if Multithreading.available then
- (scheduler_check "shutdown check";
SYNCHRONIZED "shutdown" (fn () =>
- (while not (scheduler_active ()) do wait scheduler_event;
- while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
- do_shutdown := true;
- while scheduler_active () do (broadcast_all (); wait scheduler_event))))
+ while scheduler_active () do
+ (wait scheduler_event; broadcast_all ()))
else ();