# HG changeset patch # User wenzelm # Date 1284123248 -7200 # Node ID 307e3d07d19f65d6d07bd4b53445642d729985c8 # Parent 28d3809ff91f116ec7f6166898f6df13d99e39fd Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455); diff -r 28d3809ff91f -r 307e3d07d19f src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Fri Sep 10 12:39:20 2010 +0200 +++ b/src/Pure/Concurrent/future.ML Fri Sep 10 14:54:08 2010 +0200 @@ -489,7 +489,11 @@ fun promise_group group : 'a future = let val result = Single_Assignment.var "promise" : 'a result; - fun abort () = assign_result group result Exn.interrupt_exn handle Fail _ => true; + fun abort () = assign_result group result Exn.interrupt_exn + handle Fail _ => true + | exn => + if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise" + else reraise exn; val task = SYNCHRONIZED "enqueue_passive" (fn () => Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort)); in Future {promised = true, task = task, group = group, result = result} end; @@ -497,11 +501,20 @@ fun promise () = promise_group (worker_subgroup ()); fun fulfill_result (Future {promised, task, group, result}) res = - let - val _ = promised orelse raise Fail "Not a promised future"; - fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn); - val _ = execute (task, group, [job]); - in () end; + if not promised then raise Fail "Not a promised future" + else + let + fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn); + val _ = + Multithreading.with_attributes Multithreading.no_interrupts (fn _ => + let + val still_passive = + SYNCHRONIZED "fulfill_result" (fn () => + Unsynchronized.change_result queue + (Task_Queue.dequeue_passive (Thread.self ()) task)); + in if still_passive then execute (task, group, [job]) else () end); + val _ = Single_Assignment.await result; + in () end; fun fulfill x res = fulfill_result x (Exn.Result res); diff -r 28d3809ff91f -r 307e3d07d19f src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Fri Sep 10 12:39:20 2010 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Fri Sep 10 14:54:08 2010 +0200 @@ -27,6 +27,7 @@ val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue val extend: task -> (bool -> bool) -> queue -> queue option + val dequeue_passive: Thread.thread -> task -> queue -> bool * queue val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue val depend: task -> task list -> queue -> queue val dequeue_towards: Thread.thread -> task list -> queue -> @@ -216,12 +217,19 @@ (* dequeue *) +fun dequeue_passive thread task (queue as Queue {groups, jobs}) = + (case try (get_job jobs) task of + SOME (Passive _) => + let val jobs' = set_job task (Running thread) jobs + in (true, make_queue groups jobs') end + | _ => (false, queue)); + fun dequeue thread (queue as Queue {groups, jobs}) = (case Task_Graph.get_first (uncurry ready_job) jobs of - NONE => (NONE, queue) - | SOME (result as (task, _, _)) => + SOME (result as (task, _, _)) => let val jobs' = set_job task (Running thread) jobs - in (SOME result, make_queue groups jobs') end); + in (SOME result, make_queue groups jobs') end + | NONE => (NONE, queue)); (* dequeue_towards -- adhoc dependencies *)