src/Pure/Concurrent/future.ML
changeset 29551 95e469919c3e
parent 29431 0ebe652bfd5a
child 30612 cb6421b6a18f
     1.1 --- a/src/Pure/Concurrent/future.ML	Sun Jan 18 16:33:09 2009 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Sun Jan 18 16:42:43 2009 +0100
     1.3 @@ -270,8 +270,24 @@
     1.4  
     1.5  (* join *)
     1.6  
     1.7 +local
     1.8 +
     1.9  fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
    1.10  
    1.11 +fun join_next pending = (*requires SYNCHRONIZED*)
    1.12 +  if forall is_finished pending then NONE
    1.13 +  else
    1.14 +    (case change_result queue Task_Queue.dequeue of
    1.15 +      NONE => (worker_wait (); join_next pending)
    1.16 +    | some => some);
    1.17 +
    1.18 +fun join_loop name pending =
    1.19 +  (case SYNCHRONIZED name (fn () => join_next pending) of
    1.20 +    NONE => ()
    1.21 +  | SOME work => (execute name work; join_loop name pending));
    1.22 +
    1.23 +in
    1.24 +
    1.25  fun join_results xs =
    1.26    if forall is_finished xs then map get_result xs
    1.27    else uninterruptible (fn _ => fn () =>
    1.28 @@ -280,12 +296,13 @@
    1.29        val _ = Multithreading.self_critical () andalso
    1.30          error "Cannot join future values within critical section";
    1.31  
    1.32 -      fun join_loop _ [] = ()
    1.33 -        | join_loop name deps =
    1.34 +      fun join_deps _ [] = ()
    1.35 +        | join_deps name deps =
    1.36              (case SYNCHRONIZED name (fn () =>
    1.37                  change_result queue (Task_Queue.dequeue_towards deps)) of
    1.38                NONE => ()
    1.39 -            | SOME (work, deps') => (execute name work; join_loop name deps'));
    1.40 +            | SOME (work, deps') => (execute name work; join_deps name deps'));
    1.41 +
    1.42        val _ =
    1.43          (case thread_data () of
    1.44            NONE =>
    1.45 @@ -299,14 +316,14 @@
    1.46                val deps = map task_of pending;
    1.47                val _ = SYNCHRONIZED "join" (fn () =>
    1.48                  (change queue (Task_Queue.depend deps task); notify_all ()));
    1.49 -              val _ = join_loop ("join_loop: " ^ name) deps;
    1.50 -              val _ =
    1.51 -                while not (forall is_finished pending)
    1.52 -                do SYNCHRONIZED "join_task" (fn () => worker_wait ());
    1.53 +              val _ = join_deps ("join_deps: " ^ name) deps;
    1.54 +              val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending);
    1.55              in () end);
    1.56  
    1.57      in map get_result xs end) ();
    1.58  
    1.59 +end;
    1.60 +
    1.61  fun join_result x = singleton join_results x;
    1.62  fun join x = Exn.release (join_result x);
    1.63