src/Pure/Concurrent/future.ML
changeset 29551 95e469919c3e
parent 29431 0ebe652bfd5a
child 30612 cb6421b6a18f
equal deleted inserted replaced
29550:67ec51c032cb 29551:95e469919c3e
   268 fun fork_pri pri e = fork_future NONE [] pri e;
   268 fun fork_pri pri e = fork_future NONE [] pri e;
   269 
   269 
   270 
   270 
   271 (* join *)
   271 (* join *)
   272 
   272 
       
   273 local
       
   274 
   273 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
   275 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
       
   276 
       
   277 fun join_next pending = (*requires SYNCHRONIZED*)
       
   278   if forall is_finished pending then NONE
       
   279   else
       
   280     (case change_result queue Task_Queue.dequeue of
       
   281       NONE => (worker_wait (); join_next pending)
       
   282     | some => some);
       
   283 
       
   284 fun join_loop name pending =
       
   285   (case SYNCHRONIZED name (fn () => join_next pending) of
       
   286     NONE => ()
       
   287   | SOME work => (execute name work; join_loop name pending));
       
   288 
       
   289 in
   274 
   290 
   275 fun join_results xs =
   291 fun join_results xs =
   276   if forall is_finished xs then map get_result xs
   292   if forall is_finished xs then map get_result xs
   277   else uninterruptible (fn _ => fn () =>
   293   else uninterruptible (fn _ => fn () =>
   278     let
   294     let
   279       val _ = scheduler_check "join check";
   295       val _ = scheduler_check "join check";
   280       val _ = Multithreading.self_critical () andalso
   296       val _ = Multithreading.self_critical () andalso
   281         error "Cannot join future values within critical section";
   297         error "Cannot join future values within critical section";
   282 
   298 
   283       fun join_loop _ [] = ()
   299       fun join_deps _ [] = ()
   284         | join_loop name deps =
   300         | join_deps name deps =
   285             (case SYNCHRONIZED name (fn () =>
   301             (case SYNCHRONIZED name (fn () =>
   286                 change_result queue (Task_Queue.dequeue_towards deps)) of
   302                 change_result queue (Task_Queue.dequeue_towards deps)) of
   287               NONE => ()
   303               NONE => ()
   288             | SOME (work, deps') => (execute name work; join_loop name deps'));
   304             | SOME (work, deps') => (execute name work; join_deps name deps'));
       
   305 
   289       val _ =
   306       val _ =
   290         (case thread_data () of
   307         (case thread_data () of
   291           NONE =>
   308           NONE =>
   292             (*alien thread -- refrain from contending for resources*)
   309             (*alien thread -- refrain from contending for resources*)
   293             while not (forall is_finished xs)
   310             while not (forall is_finished xs)
   297             let
   314             let
   298               val pending = filter_out is_finished xs;
   315               val pending = filter_out is_finished xs;
   299               val deps = map task_of pending;
   316               val deps = map task_of pending;
   300               val _ = SYNCHRONIZED "join" (fn () =>
   317               val _ = SYNCHRONIZED "join" (fn () =>
   301                 (change queue (Task_Queue.depend deps task); notify_all ()));
   318                 (change queue (Task_Queue.depend deps task); notify_all ()));
   302               val _ = join_loop ("join_loop: " ^ name) deps;
   319               val _ = join_deps ("join_deps: " ^ name) deps;
   303               val _ =
   320               val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending);
   304                 while not (forall is_finished pending)
       
   305                 do SYNCHRONIZED "join_task" (fn () => worker_wait ());
       
   306             in () end);
   321             in () end);
   307 
   322 
   308     in map get_result xs end) ();
   323     in map get_result xs end) ();
       
   324 
       
   325 end;
   309 
   326 
   310 fun join_result x = singleton join_results x;
   327 fun join_result x = singleton join_results x;
   311 fun join x = Exn.release (join_result x);
   328 fun join x = Exn.release (join_result x);
   312 
   329 
   313 
   330