src/Pure/Concurrent/future.ML
changeset 32224 a46f5e9b1718
parent 32222 572b92362496
child 32225 d5d6f47fb018
equal deleted inserted replaced
32223:f5f46d6eb95b 32224:a46f5e9b1718
   326     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   326     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   327   | SOME (Exn.Exn Exn.Interrupt) =>
   327   | SOME (Exn.Exn Exn.Interrupt) =>
   328       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   328       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   329   | SOME res => res);
   329   | SOME res => res);
   330 
   330 
       
   331 fun join_wait x =
       
   332   if SYNCHRONIZED "join_wait" (fn () =>
       
   333     is_finished x orelse (wait work_finished; false))
       
   334   then () else join_wait x;
       
   335 
   331 fun join_next deps = (*requires SYNCHRONIZED*)
   336 fun join_next deps = (*requires SYNCHRONIZED*)
   332   if overloaded () then (worker_wait scheduler_event; join_next deps)
   337   if null deps then NONE
   333   else change_result queue (Task_Queue.dequeue_towards deps);
   338   else if overloaded () then (worker_wait scheduler_event; join_next deps)
   334 
   339   else
   335 fun join_deps deps =
   340     (case change_result queue (Task_Queue.dequeue_towards deps) of
       
   341       (NONE, []) => NONE
       
   342     | (NONE, deps') => (worker_wait work_finished; join_next deps')
       
   343     | (SOME work, deps') => SOME (work, deps'));
       
   344 
       
   345 fun join_work deps =
   336   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   346   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   337     NONE => ()
   347     NONE => ()
   338   | SOME (work, deps') => (execute "join" work; join_deps deps'));
   348   | SOME (work, deps') => (execute "join" work; join_work deps'));
   339 
   349 
   340 in
   350 in
   341 
   351 
   342 fun join_results xs =
   352 fun join_results xs =
   343   if forall is_finished xs then map get_result xs
   353   if forall is_finished xs then map get_result xs
   344   else uninterruptible (fn _ => fn () =>
   354   else uninterruptible (fn _ => fn () =>
   345     let
   355     let
   346       val _ = scheduler_check "join check";
   356       val _ = scheduler_check "join check";
   347       val _ = Multithreading.self_critical () andalso
   357       val _ = Multithreading.self_critical () andalso
   348         error "Cannot join future values within critical section";
   358         error "Cannot join future values within critical section";
   349 
   359       val _ =
   350       val worker = is_worker ();
   360         if is_worker () then join_work (map task_of xs)
   351       val _ = if worker then join_deps (map task_of xs) else ();
   361         else List.app join_wait xs;
   352 
       
   353       fun join_wait x =
       
   354         if SYNCHRONIZED "join_wait" (fn () =>
       
   355           is_finished x orelse ((if worker then worker_wait else wait) work_finished; false))
       
   356         then () else join_wait x;
       
   357 
       
   358       val _ = xs |> List.app (fn x =>
       
   359         let val time = Multithreading.real_time join_wait x in
       
   360           Multithreading.tracing_time true time
       
   361             (fn () => "joined after " ^ Time.toString time)
       
   362         end);
       
   363     in map get_result xs end) ();
   362     in map get_result xs end) ();
   364 
   363 
   365 end;
   364 end;
   366 
   365 
   367 fun join_result x = singleton join_results x;
   366 fun join_result x = singleton join_results x;