scheduler: shutdown spontaneously (after some delay) if queue is empty;
authorwenzelm
Mon, 27 Jul 2009 16:53:28 +0200
changeset 32228 7622c03141b0
parent 32227 a7e901209005
child 32229 abdc0f6214c8
scheduler: shutdown spontaneously (after some delay) if queue is empty; scheduler_check: critical, only performed after fork/enqueue; shutdown: passively wait for termination;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Mon Jul 27 16:08:41 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Mon Jul 27 16:53:28 2009 +0200
@@ -277,12 +277,14 @@
       if null (! canceled) then ()
       else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ());
 
-    val timeout =
+    (*delay loop*)
+    val delay =
       Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
-    val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
+    val _ = interruptible (fn () => wait_timeout scheduler_event delay) ()
       handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
 
     (*shutdown*)
+    val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
     val _ = broadcast scheduler_event;
@@ -294,11 +296,10 @@
 fun scheduler_active () = (*requires SYNCHRONIZED*)
   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
 
-fun scheduler_check name = SYNCHRONIZED name (fn () =>
+fun scheduler_check () = (*requires SYNCHRONIZED*)
+ (do_shutdown := false;
   if not (scheduler_active ()) then
-   (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
-    broadcast scheduler_event)
-  else if ! do_shutdown then error "Scheduler shutdown in progress"
+   (scheduler := SOME (SimpleThread.fork false scheduler_loop); broadcast scheduler_event)
   else ());
 
 
@@ -309,8 +310,6 @@
 
 fun fork_future opt_group deps pri e =
   let
-    val _ = scheduler_check "future check";
-
     val group =
       (case opt_group of
         SOME group => group
@@ -320,6 +319,7 @@
       let
         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
         val _ = if minimal then signal work_available else ();
+        val _ = scheduler_check ();
       in task end);
   in Future {task = task, group = group, result = result} end;
 
@@ -364,7 +364,6 @@
   if forall is_finished xs then map get_result xs
   else uninterruptible (fn _ => fn () =>
     let
-      val _ = scheduler_check "join check";
       val _ = Multithreading.self_critical () andalso
         error "Cannot join future values within critical section";
       val _ =
@@ -382,8 +381,6 @@
 
 fun map_future f x =
   let
-    val _ = scheduler_check "map_future check";
-
     val task = task_of x;
     val group = Task_Queue.new_group (SOME (group_of x));
     val (result, job) = future_job group (fn () => f (join x));
@@ -409,24 +406,18 @@
       (fn _ => f) x
   else interruptible f x;
 
-(*cancel_group: present and future group members will be interrupted eventually*)
-fun cancel_group group =
- (scheduler_check "cancel check";
-  SYNCHRONIZED "cancel" (fn () => do_cancel group));
-
+(*cancel: present and future group members will be interrupted eventually*)
+fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
 fun cancel x = cancel_group (group_of x);
 
 
-(** global join and shutdown **)
+(* shutdown *)
 
 fun shutdown () =
   if Multithreading.available then
-   (scheduler_check "shutdown check";
     SYNCHRONIZED "shutdown" (fn () =>
-     (while not (scheduler_active ()) do wait scheduler_event;
-      while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
-      do_shutdown := true;
-      while scheduler_active () do (broadcast_all (); wait scheduler_event))))
+     while scheduler_active () do
+      (wait scheduler_event; broadcast_all ()))
   else ();