join_results: when dependencies are resulved (but not finished yet),
authorwenzelm
Sun, 18 Jan 2009 16:42:43 +0100
changeset 29551 95e469919c3e
parent 29550 67ec51c032cb
child 29552 5b21c79785b0
join_results: when dependencies are resulved (but not finished yet), always continue execution as worker thread -- improved parallelism at the cost of some reactivity;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Sun Jan 18 16:33:09 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Sun Jan 18 16:42:43 2009 +0100
@@ -270,8 +270,24 @@
 
 (* join *)
 
+local
+
 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
 
+fun join_next pending = (*requires SYNCHRONIZED*)
+  if forall is_finished pending then NONE
+  else
+    (case change_result queue Task_Queue.dequeue of
+      NONE => (worker_wait (); join_next pending)
+    | some => some);
+
+fun join_loop name pending =
+  (case SYNCHRONIZED name (fn () => join_next pending) of
+    NONE => ()
+  | SOME work => (execute name work; join_loop name pending));
+
+in
+
 fun join_results xs =
   if forall is_finished xs then map get_result xs
   else uninterruptible (fn _ => fn () =>
@@ -280,12 +296,13 @@
       val _ = Multithreading.self_critical () andalso
         error "Cannot join future values within critical section";
 
-      fun join_loop _ [] = ()
-        | join_loop name deps =
+      fun join_deps _ [] = ()
+        | join_deps name deps =
             (case SYNCHRONIZED name (fn () =>
                 change_result queue (Task_Queue.dequeue_towards deps)) of
               NONE => ()
-            | SOME (work, deps') => (execute name work; join_loop name deps'));
+            | SOME (work, deps') => (execute name work; join_deps name deps'));
+
       val _ =
         (case thread_data () of
           NONE =>
@@ -299,14 +316,14 @@
               val deps = map task_of pending;
               val _ = SYNCHRONIZED "join" (fn () =>
                 (change queue (Task_Queue.depend deps task); notify_all ()));
-              val _ = join_loop ("join_loop: " ^ name) deps;
-              val _ =
-                while not (forall is_finished pending)
-                do SYNCHRONIZED "join_task" (fn () => worker_wait ());
+              val _ = join_deps ("join_deps: " ^ name) deps;
+              val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending);
             in () end);
 
     in map get_result xs end) ();
 
+end;
+
 fun join_result x = singleton join_results x;
 fun join x = Exn.release (join_result x);