more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
--- a/src/Pure/Concurrent/future.ML Sat Jan 03 21:44:24 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Sat Jan 03 21:45:53 2009 +0100
@@ -116,7 +116,7 @@
ConditionVar.wait (cond, lock);
fun wait_timeout timeout = (*requires SYNCHRONIZED*)
- ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
+ ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
fun notify_all () = (*requires SYNCHRONIZED*)
ConditionVar.broadcast cond;
@@ -139,6 +139,9 @@
(* execute *)
+fun do_cancel group = (*requires SYNCHRONIZED*)
+ change canceled (insert Task_Queue.eq_group group);
+
fun execute name (task, group, run) =
let
val _ = trace_active ();
@@ -147,7 +150,7 @@
(change queue (Task_Queue.finish task);
if ok then ()
else if Task_Queue.cancel (! queue) group then ()
- else change canceled (cons group);
+ else do_cancel group;
notify_all ()));
in () end;
@@ -205,7 +208,8 @@
val _ = if continue then () else scheduler := NONE;
val _ = notify_all ();
- val _ = wait_timeout (Time.fromSeconds 3);
+ val _ = interruptible (fn () => wait_timeout (Time.fromSeconds 1)) ()
+ handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
in continue end;
fun scheduler_loop () =
@@ -306,7 +310,7 @@
(*cancel: present and future group members will be interrupted eventually*)
fun cancel x =
(scheduler_check "cancel check";
- SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ())));
+ SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ())));
(*global join and shutdown*)