more robust Future.fulfill wrt. duplicate assignment and interrupt;
--- a/src/Pure/Concurrent/future.ML Wed Apr 11 13:37:46 2012 +0200
+++ b/src/Pure/Concurrent/future.ML Wed Apr 11 13:49:09 2012 +0200
@@ -598,7 +598,7 @@
else reraise exn;
fun job () =
Multithreading.with_attributes Multithreading.no_interrupts
- (fn _ => assign () before abort ());
+ (fn _ => Exn.release (Exn.capture assign () before abort ()));
val task = SYNCHRONIZED "enqueue_passive" (fn () =>
Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
in Future {promised = true, task = task, result = result} end;
@@ -614,11 +614,16 @@
val _ =
Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
let
- val still_passive =
+ val passive_job =
SYNCHRONIZED "fulfill_result" (fn () =>
Unsynchronized.change_result queue
(Task_Queue.dequeue_passive (Thread.self ()) task));
- in if still_passive then worker_exec (task, [job]) else () end);
+ in
+ (case passive_job of
+ SOME true => worker_exec (task, [job])
+ | SOME false => ()
+ | NONE => ignore (job (not (Task_Queue.is_canceled group))))
+ end);
val _ =
if is_some (Single_Assignment.peek result) then ()
else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
--- a/src/Pure/Concurrent/lazy.ML Wed Apr 11 13:37:46 2012 +0200
+++ b/src/Pure/Concurrent/lazy.ML Wed Apr 11 13:49:09 2012 +0200
@@ -61,15 +61,15 @@
SOME e =>
let
val res0 = Exn.capture (restore_attributes e) ();
- val _ = Future.fulfill_result x res0;
- val res = Future.join_result x;
+ val res1 = Exn.capture (fn () => Future.fulfill_result x res0) ();
+ val res2 = Future.join_result x;
+ in
(*semantic race: some other threads might see the same
interrupt, until there is a fresh start*)
- val _ =
- if Exn.is_interrupt_exn res then
- Synchronized.change var (fn _ => Expr e)
- else ();
- in res end
+ if Exn.is_interrupt_exn res1 orelse Exn.is_interrupt_exn res2 then
+ (Synchronized.change var (fn _ => Expr e); Exn.interrupt ())
+ else res2
+ end
| NONE => Exn.capture (restore_attributes (fn () => Future.join x)) ())
end) ());
--- a/src/Pure/Concurrent/task_queue.ML Wed Apr 11 13:37:46 2012 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Wed Apr 11 13:49:09 2012 +0200
@@ -38,7 +38,7 @@
val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
val extend: task -> (bool -> bool) -> queue -> queue option
- val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
+ val dequeue_passive: Thread.thread -> task -> queue -> bool option * queue
val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
val dequeue_deps: Thread.thread -> task list -> queue ->
(((task * (bool -> bool) list) option * task list) * queue)
@@ -317,8 +317,9 @@
(case try (get_job jobs) task of
SOME (Passive _) =>
let val jobs' = set_job task (Running thread) jobs
- in (true, make_queue groups jobs') end
- | _ => (false, queue));
+ in (SOME true, make_queue groups jobs') end
+ | SOME _ => (SOME false, queue)
+ | NONE => (NONE, queue));
fun dequeue thread (queue as Queue {groups, jobs}) =
(case Task_Graph.get_first (uncurry ready_job) jobs of