# HG changeset patch # User wenzelm # Date 1232293363 -3600 # Node ID 95e469919c3e46229e7a81650bbc5f6d14ef603c # Parent 67ec51c032cbe0c3f70be2bbd1bf3ba931fa3347 join_results: when dependencies are resulved (but not finished yet), always continue execution as worker thread -- improved parallelism at the cost of some reactivity; diff -r 67ec51c032cb -r 95e469919c3e src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Sun Jan 18 16:33:09 2009 +0100 +++ b/src/Pure/Concurrent/future.ML Sun Jan 18 16:42:43 2009 +0100 @@ -270,8 +270,24 @@ (* join *) +local + fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); +fun join_next pending = (*requires SYNCHRONIZED*) + if forall is_finished pending then NONE + else + (case change_result queue Task_Queue.dequeue of + NONE => (worker_wait (); join_next pending) + | some => some); + +fun join_loop name pending = + (case SYNCHRONIZED name (fn () => join_next pending) of + NONE => () + | SOME work => (execute name work; join_loop name pending)); + +in + fun join_results xs = if forall is_finished xs then map get_result xs else uninterruptible (fn _ => fn () => @@ -280,12 +296,13 @@ val _ = Multithreading.self_critical () andalso error "Cannot join future values within critical section"; - fun join_loop _ [] = () - | join_loop name deps = + fun join_deps _ [] = () + | join_deps name deps = (case SYNCHRONIZED name (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of NONE => () - | SOME (work, deps') => (execute name work; join_loop name deps')); + | SOME (work, deps') => (execute name work; join_deps name deps')); + val _ = (case thread_data () of NONE => @@ -299,14 +316,14 @@ val deps = map task_of pending; val _ = SYNCHRONIZED "join" (fn () => (change queue (Task_Queue.depend deps task); notify_all ())); - val _ = join_loop ("join_loop: " ^ name) deps; - val _ = - while not (forall is_finished pending) - do SYNCHRONIZED "join_task" (fn () => worker_wait ()); + val _ = join_deps ("join_deps: " ^ name) deps; + val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending); in () end); in map get_result xs end) (); +end; + fun join_result x = singleton join_results x; fun join x = Exn.release (join_result x);