more robust Future.fulfill wrt. duplicate assignment and interrupt;
authorwenzelm
Wed, 11 Apr 2012 13:49:09 +0200
changeset 47423 8a179a0493e3
parent 47422 5832630f049a
child 47424 57ff63a52c53
more robust Future.fulfill wrt. duplicate assignment and interrupt;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/lazy.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Wed Apr 11 13:37:46 2012 +0200
+++ b/src/Pure/Concurrent/future.ML	Wed Apr 11 13:49:09 2012 +0200
@@ -598,7 +598,7 @@
             else reraise exn;
     fun job () =
       Multithreading.with_attributes Multithreading.no_interrupts
-        (fn _ => assign () before abort ());
+        (fn _ => Exn.release (Exn.capture assign () before abort ()));
     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   in Future {promised = true, task = task, result = result} end;
@@ -614,11 +614,16 @@
       val _ =
         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
           let
-            val still_passive =
+            val passive_job =
               SYNCHRONIZED "fulfill_result" (fn () =>
                 Unsynchronized.change_result queue
                   (Task_Queue.dequeue_passive (Thread.self ()) task));
-          in if still_passive then worker_exec (task, [job]) else () end);
+          in
+            (case passive_job of
+              SOME true => worker_exec (task, [job])
+            | SOME false => ()
+            | NONE => ignore (job (not (Task_Queue.is_canceled group))))
+          end);
       val _ =
         if is_some (Single_Assignment.peek result) then ()
         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
--- a/src/Pure/Concurrent/lazy.ML	Wed Apr 11 13:37:46 2012 +0200
+++ b/src/Pure/Concurrent/lazy.ML	Wed Apr 11 13:49:09 2012 +0200
@@ -61,15 +61,15 @@
             SOME e =>
               let
                 val res0 = Exn.capture (restore_attributes e) ();
-                val _ = Future.fulfill_result x res0;
-                val res = Future.join_result x;
+                val res1 = Exn.capture (fn () => Future.fulfill_result x res0) ();
+                val res2 = Future.join_result x;
+              in
                 (*semantic race: some other threads might see the same
                   interrupt, until there is a fresh start*)
-                val _ =
-                  if Exn.is_interrupt_exn res then
-                    Synchronized.change var (fn _ => Expr e)
-                  else ();
-              in res end
+                if Exn.is_interrupt_exn res1 orelse Exn.is_interrupt_exn res2 then
+                  (Synchronized.change var (fn _ => Expr e); Exn.interrupt ())
+                else res2
+              end
           | NONE => Exn.capture (restore_attributes (fn () => Future.join x)) ())
         end) ());
 
--- a/src/Pure/Concurrent/task_queue.ML	Wed Apr 11 13:37:46 2012 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Wed Apr 11 13:49:09 2012 +0200
@@ -38,7 +38,7 @@
   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
   val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
-  val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
+  val dequeue_passive: Thread.thread -> task -> queue -> bool option * queue
   val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
   val dequeue_deps: Thread.thread -> task list -> queue ->
     (((task * (bool -> bool) list) option * task list) * queue)
@@ -317,8 +317,9 @@
   (case try (get_job jobs) task of
     SOME (Passive _) =>
       let val jobs' = set_job task (Running thread) jobs
-      in (true, make_queue groups jobs') end
-  | _ => (false, queue));
+      in (SOME true, make_queue groups jobs') end
+  | SOME _ => (SOME false, queue)
+  | NONE => (NONE, queue));
 
 fun dequeue thread (queue as Queue {groups, jobs}) =
   (case Task_Graph.get_first (uncurry ready_job) jobs of