Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
--- a/src/Pure/Concurrent/future.ML Fri Sep 10 12:39:20 2010 +0200
+++ b/src/Pure/Concurrent/future.ML Fri Sep 10 14:54:08 2010 +0200
@@ -489,7 +489,11 @@
fun promise_group group : 'a future =
let
val result = Single_Assignment.var "promise" : 'a result;
- fun abort () = assign_result group result Exn.interrupt_exn handle Fail _ => true;
+ fun abort () = assign_result group result Exn.interrupt_exn
+ handle Fail _ => true
+ | exn =>
+ if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
+ else reraise exn;
val task = SYNCHRONIZED "enqueue_passive" (fn () =>
Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
in Future {promised = true, task = task, group = group, result = result} end;
@@ -497,11 +501,20 @@
fun promise () = promise_group (worker_subgroup ());
fun fulfill_result (Future {promised, task, group, result}) res =
- let
- val _ = promised orelse raise Fail "Not a promised future";
- fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
- val _ = execute (task, group, [job]);
- in () end;
+ if not promised then raise Fail "Not a promised future"
+ else
+ let
+ fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
+ val _ =
+ Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
+ let
+ val still_passive =
+ SYNCHRONIZED "fulfill_result" (fn () =>
+ Unsynchronized.change_result queue
+ (Task_Queue.dequeue_passive (Thread.self ()) task));
+ in if still_passive then execute (task, group, [job]) else () end);
+ val _ = Single_Assignment.await result;
+ in () end;
fun fulfill x res = fulfill_result x (Exn.Result res);
--- a/src/Pure/Concurrent/task_queue.ML Fri Sep 10 12:39:20 2010 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Fri Sep 10 14:54:08 2010 +0200
@@ -27,6 +27,7 @@
val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
val extend: task -> (bool -> bool) -> queue -> queue option
+ val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
val depend: task -> task list -> queue -> queue
val dequeue_towards: Thread.thread -> task list -> queue ->
@@ -216,12 +217,19 @@
(* dequeue *)
+fun dequeue_passive thread task (queue as Queue {groups, jobs}) =
+ (case try (get_job jobs) task of
+ SOME (Passive _) =>
+ let val jobs' = set_job task (Running thread) jobs
+ in (true, make_queue groups jobs') end
+ | _ => (false, queue));
+
fun dequeue thread (queue as Queue {groups, jobs}) =
(case Task_Graph.get_first (uncurry ready_job) jobs of
- NONE => (NONE, queue)
- | SOME (result as (task, _, _)) =>
+ SOME (result as (task, _, _)) =>
let val jobs' = set_job task (Running thread) jobs
- in (SOME result, make_queue groups jobs') end);
+ in (SOME result, make_queue groups jobs') end
+ | NONE => (NONE, queue));
(* dequeue_towards -- adhoc dependencies *)