src/Pure/Concurrent/future.ML
changeset 32055 6a46898aa805
parent 32053 257eac3946e9
child 32058 c76fd93b3b99
equal deleted inserted replaced
32054:db50e76b0046 32055:6a46898aa805
   283 
   283 
   284 local
   284 local
   285 
   285 
   286 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
   286 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
   287 
   287 
   288 fun join_wait x =
   288 fun join_deps deps =
   289   if SYNCHRONIZED "join_wait" (fn () => is_finished x orelse (wait (); false))
   289   (case SYNCHRONIZED "join" (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of
   290   then () else join_wait x;
       
   291 
       
   292 fun join_next x = (*requires SYNCHRONIZED*)
       
   293   if is_finished x then NONE
       
   294   else
       
   295     (case change_result queue Task_Queue.dequeue of
       
   296       NONE => (worker_wait (); join_next x)
       
   297     | some => some);
       
   298 
       
   299 fun join_loop x =
       
   300   (case SYNCHRONIZED "join" (fn () => join_next x) of
       
   301     NONE => ()
   290     NONE => ()
   302   | SOME work => (execute "join" work; join_loop x));
   291   | SOME (work, deps') => (execute "join" work; join_deps deps'));
   303 
   292 
   304 in
   293 in
   305 
   294 
   306 fun join_results xs =
   295 fun join_results xs =
   307   if forall is_finished xs then map get_result xs
   296   if forall is_finished xs then map get_result xs
   308   else uninterruptible (fn _ => fn () =>
   297   else uninterruptible (fn _ => fn () =>
   309     let
   298     let
   310       val _ = scheduler_check "join check";
   299       val _ = scheduler_check "join check";
   311       val _ = Multithreading.self_critical () andalso
   300       val _ = Multithreading.self_critical () andalso
   312         error "Cannot join future values within critical section";
   301         error "Cannot join future values within critical section";
   313       val _ =
   302 
   314         if is_some (thread_data ())
   303       val is_worker = is_some (thread_data ());
   315         then List.app join_loop xs   (*proper task -- continue work*)
   304       fun join_wait x =
   316         else List.app join_wait xs;  (*alien thread -- refrain from contending for resources*)
   305         if SYNCHRONIZED "join_wait" (fn () =>
       
   306           is_finished x orelse (if is_worker then worker_wait () else wait (); false))
       
   307         then () else join_wait x;
       
   308 
       
   309       val _ = if is_worker then join_deps (map task_of xs) else ();
       
   310       val _ = List.app join_wait xs;
       
   311 
   317     in map get_result xs end) ();
   312     in map get_result xs end) ();
   318 
   313 
   319 end;
   314 end;
   320 
   315 
   321 fun join_result x = singleton join_results x;
   316 fun join_result x = singleton join_results x;