src/Pure/Concurrent/future.ML
changeset 32295 400cc493d466
parent 32293 e0b8da3fae4d
child 32296 e6a8ed8aed3a
--- a/src/Pure/Concurrent/future.ML	Thu Jul 30 23:50:11 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Sat Aug 01 00:09:45 2009 +0200
@@ -120,11 +120,10 @@
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 
 fun wait cond = (*requires SYNCHRONIZED*)
-  Multithreading.sync_wait NONE cond lock;
+  Multithreading.sync_wait NONE NONE cond lock;
 
-fun wait_interruptible timeout cond = (*requires SYNCHRONIZED*)
-  interruptible (fn () =>
-    ignore (Multithreading.sync_wait (SOME (Time.+ (Time.now (), timeout))) cond lock)) ();
+fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
+  Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
 
 fun signal cond = (*requires SYNCHRONIZED*)
   ConditionVar.signal cond;
@@ -149,11 +148,11 @@
         val res =
           if ok then
             Exn.capture (fn () =>
-             (Thread.testInterrupt ();
-              Multithreading.with_attributes Multithreading.restricted_interrupts
-                (fn _ => fn () => e ())) ()) ()
+              Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
           else Exn.Exn Exn.Interrupt;
-        val _ = Synchronized.change result (K (SOME res));
+        val _ = Synchronized.change result
+          (fn NONE => SOME res
+            | SOME _ => raise Fail "Duplicate assignment of future value");
       in
         (case res of
           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
@@ -276,20 +275,24 @@
         broadcast_work ());
 
     (*delay loop*)
-    val _ = wait_interruptible next_round scheduler_event
-      handle Exn.Interrupt =>
-        (Multithreading.tracing 1 (fn () => "Interrupt");
-          List.app do_cancel (Task_Queue.cancel_all (! queue)));
+    val _ = Exn.release (wait_timeout next_round scheduler_event);
 
     (*shutdown*)
     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
     val _ = broadcast scheduler_event;
-  in continue end;
+  in continue end
+  handle Exn.Interrupt =>
+   (Multithreading.tracing 1 (fn () => "Interrupt");
+    List.app do_cancel (Task_Queue.cancel_all (! queue));
+    scheduler_next ());
+
 
 fun scheduler_loop () =
-  while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
+  Multithreading.with_attributes
+    (Multithreading.sync_interrupts Multithreading.public_interrupts)
+    (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
 
 fun scheduler_active () = (*requires SYNCHRONIZED*)
   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
@@ -393,12 +396,11 @@
 
 fun interruptible_task f x =
   if Multithreading.available then
-   (Thread.testInterrupt ();
     Multithreading.with_attributes
       (if is_worker ()
-       then Multithreading.restricted_interrupts
-       else Multithreading.regular_interrupts)
-      (fn _ => fn x => f x) x)
+       then Multithreading.private_interrupts
+       else Multithreading.public_interrupts)
+      (fn _ => f x)
   else interruptible f x;
 
 (*cancel: present and future group members will be interrupted eventually*)