more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
authorwenzelm
Sat, 03 Jan 2009 21:45:53 +0100
changeset 29341 6bb007a0f9f2
parent 29340 057a30ee8570
child 29342 23504001c4fb
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Sat Jan 03 21:44:24 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Sat Jan 03 21:45:53 2009 +0100
@@ -116,7 +116,7 @@
   ConditionVar.wait (cond, lock);
 
 fun wait_timeout timeout = (*requires SYNCHRONIZED*)
-  ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
+  ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
 
 fun notify_all () = (*requires SYNCHRONIZED*)
   ConditionVar.broadcast cond;
@@ -139,6 +139,9 @@
 
 (* execute *)
 
+fun do_cancel group = (*requires SYNCHRONIZED*)
+  change canceled (insert Task_Queue.eq_group group);
+
 fun execute name (task, group, run) =
   let
     val _ = trace_active ();
@@ -147,7 +150,7 @@
      (change queue (Task_Queue.finish task);
       if ok then ()
       else if Task_Queue.cancel (! queue) group then ()
-      else change canceled (cons group);
+      else do_cancel group;
       notify_all ()));
   in () end;
 
@@ -205,7 +208,8 @@
     val _ = if continue then () else scheduler := NONE;
 
     val _ = notify_all ();
-    val _ = wait_timeout (Time.fromSeconds 3);
+    val _ = interruptible (fn () => wait_timeout (Time.fromSeconds 1)) ()
+      handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
   in continue end;
 
 fun scheduler_loop () =
@@ -306,7 +310,7 @@
 (*cancel: present and future group members will be interrupted eventually*)
 fun cancel x =
  (scheduler_check "cancel check";
-  SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ())));
+  SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ())));
 
 
 (*global join and shutdown*)