Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
authorwenzelm
Fri, 10 Sep 2010 14:54:08 +0200
changeset 39243 307e3d07d19f
parent 39242 28d3809ff91f
child 39244 d31c03a34f76
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Fri Sep 10 12:39:20 2010 +0200
+++ b/src/Pure/Concurrent/future.ML	Fri Sep 10 14:54:08 2010 +0200
@@ -489,7 +489,11 @@
 fun promise_group group : 'a future =
   let
     val result = Single_Assignment.var "promise" : 'a result;
-    fun abort () = assign_result group result Exn.interrupt_exn handle Fail _ => true;
+    fun abort () = assign_result group result Exn.interrupt_exn
+      handle Fail _ => true
+        | exn =>
+            if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
+            else reraise exn;
     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   in Future {promised = true, task = task, group = group, result = result} end;
@@ -497,11 +501,20 @@
 fun promise () = promise_group (worker_subgroup ());
 
 fun fulfill_result (Future {promised, task, group, result}) res =
-  let
-    val _ = promised orelse raise Fail "Not a promised future";
-    fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
-    val _ = execute (task, group, [job]);
-  in () end;
+  if not promised then raise Fail "Not a promised future"
+  else
+    let
+      fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
+      val _ =
+        Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
+          let
+            val still_passive =
+              SYNCHRONIZED "fulfill_result" (fn () =>
+                Unsynchronized.change_result queue
+                  (Task_Queue.dequeue_passive (Thread.self ()) task));
+          in if still_passive then execute (task, group, [job]) else () end);
+      val _ = Single_Assignment.await result;
+    in () end;
 
 fun fulfill x res = fulfill_result x (Exn.Result res);
 
--- a/src/Pure/Concurrent/task_queue.ML	Fri Sep 10 12:39:20 2010 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Fri Sep 10 14:54:08 2010 +0200
@@ -27,6 +27,7 @@
   val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
   val extend: task -> (bool -> bool) -> queue -> queue option
+  val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
   val depend: task -> task list -> queue -> queue
   val dequeue_towards: Thread.thread -> task list -> queue ->
@@ -216,12 +217,19 @@
 
 (* dequeue *)
 
+fun dequeue_passive thread task (queue as Queue {groups, jobs}) =
+  (case try (get_job jobs) task of
+    SOME (Passive _) =>
+      let val jobs' = set_job task (Running thread) jobs
+      in (true, make_queue groups jobs') end
+  | _ => (false, queue));
+
 fun dequeue thread (queue as Queue {groups, jobs}) =
   (case Task_Graph.get_first (uncurry ready_job) jobs of
-    NONE => (NONE, queue)
-  | SOME (result as (task, _, _)) =>
+    SOME (result as (task, _, _)) =>
       let val jobs' = set_job task (Running thread) jobs
-      in (SOME result, make_queue groups jobs') end);
+      in (SOME result, make_queue groups jobs') end
+  | NONE => (NONE, queue));
 
 
 (* dequeue_towards -- adhoc dependencies *)