# HG changeset patch # User wenzelm # Date 1334144949 -7200 # Node ID 8a179a0493e3e85a90b2493a819ef56188bebc26 # Parent 5832630f049a52932acbfba7277754c4ad7e9256 more robust Future.fulfill wrt. duplicate assignment and interrupt; diff -r 5832630f049a -r 8a179a0493e3 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Apr 11 13:37:46 2012 +0200 +++ b/src/Pure/Concurrent/future.ML Wed Apr 11 13:49:09 2012 +0200 @@ -598,7 +598,7 @@ else reraise exn; fun job () = Multithreading.with_attributes Multithreading.no_interrupts - (fn _ => assign () before abort ()); + (fn _ => Exn.release (Exn.capture assign () before abort ())); val task = SYNCHRONIZED "enqueue_passive" (fn () => Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job)); in Future {promised = true, task = task, result = result} end; @@ -614,11 +614,16 @@ val _ = Multithreading.with_attributes Multithreading.no_interrupts (fn _ => let - val still_passive = + val passive_job = SYNCHRONIZED "fulfill_result" (fn () => Unsynchronized.change_result queue (Task_Queue.dequeue_passive (Thread.self ()) task)); - in if still_passive then worker_exec (task, [job]) else () end); + in + (case passive_job of + SOME true => worker_exec (task, [job]) + | SOME false => () + | NONE => ignore (job (not (Task_Queue.is_canceled group)))) + end); val _ = if is_some (Single_Assignment.peek result) then () else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); diff -r 5832630f049a -r 8a179a0493e3 src/Pure/Concurrent/lazy.ML --- a/src/Pure/Concurrent/lazy.ML Wed Apr 11 13:37:46 2012 +0200 +++ b/src/Pure/Concurrent/lazy.ML Wed Apr 11 13:49:09 2012 +0200 @@ -61,15 +61,15 @@ SOME e => let val res0 = Exn.capture (restore_attributes e) (); - val _ = Future.fulfill_result x res0; - val res = Future.join_result x; + val res1 = Exn.capture (fn () => Future.fulfill_result x res0) (); + val res2 = Future.join_result x; + in (*semantic race: some other threads might see the same interrupt, until there is a fresh start*) - val _ = - if Exn.is_interrupt_exn res then - Synchronized.change var (fn _ => Expr e) - else (); - in res end + if Exn.is_interrupt_exn res1 orelse Exn.is_interrupt_exn res2 then + (Synchronized.change var (fn _ => Expr e); Exn.interrupt ()) + else res2 + end | NONE => Exn.capture (restore_attributes (fn () => Future.join x)) ()) end) ()); diff -r 5832630f049a -r 8a179a0493e3 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Wed Apr 11 13:37:46 2012 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Wed Apr 11 13:49:09 2012 +0200 @@ -38,7 +38,7 @@ val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue val extend: task -> (bool -> bool) -> queue -> queue option - val dequeue_passive: Thread.thread -> task -> queue -> bool * queue + val dequeue_passive: Thread.thread -> task -> queue -> bool option * queue val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue val dequeue_deps: Thread.thread -> task list -> queue -> (((task * (bool -> bool) list) option * task list) * queue) @@ -317,8 +317,9 @@ (case try (get_job jobs) task of SOME (Passive _) => let val jobs' = set_job task (Running thread) jobs - in (true, make_queue groups jobs') end - | _ => (false, queue)); + in (SOME true, make_queue groups jobs') end + | SOME _ => (SOME false, queue) + | NONE => (NONE, queue)); fun dequeue thread (queue as Queue {groups, jobs}) = (case Task_Graph.get_first (uncurry ready_job) jobs of