join_results: when dependencies are resulved (but not finished yet),
always continue execution as worker thread -- improved parallelism
at the cost of some reactivity;
--- a/src/Pure/Concurrent/future.ML Sun Jan 18 16:33:09 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Sun Jan 18 16:42:43 2009 +0100
@@ -270,8 +270,24 @@
(* join *)
+local
+
fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
+fun join_next pending = (*requires SYNCHRONIZED*)
+ if forall is_finished pending then NONE
+ else
+ (case change_result queue Task_Queue.dequeue of
+ NONE => (worker_wait (); join_next pending)
+ | some => some);
+
+fun join_loop name pending =
+ (case SYNCHRONIZED name (fn () => join_next pending) of
+ NONE => ()
+ | SOME work => (execute name work; join_loop name pending));
+
+in
+
fun join_results xs =
if forall is_finished xs then map get_result xs
else uninterruptible (fn _ => fn () =>
@@ -280,12 +296,13 @@
val _ = Multithreading.self_critical () andalso
error "Cannot join future values within critical section";
- fun join_loop _ [] = ()
- | join_loop name deps =
+ fun join_deps _ [] = ()
+ | join_deps name deps =
(case SYNCHRONIZED name (fn () =>
change_result queue (Task_Queue.dequeue_towards deps)) of
NONE => ()
- | SOME (work, deps') => (execute name work; join_loop name deps'));
+ | SOME (work, deps') => (execute name work; join_deps name deps'));
+
val _ =
(case thread_data () of
NONE =>
@@ -299,14 +316,14 @@
val deps = map task_of pending;
val _ = SYNCHRONIZED "join" (fn () =>
(change queue (Task_Queue.depend deps task); notify_all ()));
- val _ = join_loop ("join_loop: " ^ name) deps;
- val _ =
- while not (forall is_finished pending)
- do SYNCHRONIZED "join_task" (fn () => worker_wait ());
+ val _ = join_deps ("join_deps: " ^ name) deps;
+ val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending);
in () end);
in map get_result xs end) ();
+end;
+
fun join_result x = singleton join_results x;
fun join x = Exn.release (join_result x);