src/Pure/Concurrent/future.ML
changeset 32246 d4f7934e9555
parent 32230 9f6461b1c9cc
child 32247 3e7d1673f96e
equal deleted inserted replaced
32245:0c1cb95a434d 32246:d4f7934e9555
     1 (*  Title:      Pure/Concurrent/future.ML
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     2     Author:     Makarius
     3 
     3 
     4 Future values.
     4 Future values, see also
       
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     5 
     6 
     6 Notes:
     7 Notes:
     7 
     8 
     8   * Futures are similar to delayed evaluation, i.e. delay/force is
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
     9     generalized to fork/join (and variants).  The idea is to model
    10     generalized to fork/join (and variants).  The idea is to model
   142   ConditionVar.broadcast work_finished);
   143   ConditionVar.broadcast work_finished);
   143 
   144 
   144 end;
   145 end;
   145 
   146 
   146 
   147 
   147 (* worker activity *)
       
   148 
       
   149 fun count_active ws =
       
   150   fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
       
   151 
       
   152 fun change_active active = (*requires SYNCHRONIZED*)
       
   153   change workers (AList.update Thread.equal (Thread.self (), active));
       
   154 
       
   155 fun overloaded () =
       
   156   count_active (! workers) > Multithreading.max_threads_value ();
       
   157 
       
   158 
       
   159 (* execute future jobs *)
   148 (* execute future jobs *)
   160 
   149 
   161 fun future_job group (e: unit -> 'a) =
   150 fun future_job group (e: unit -> 'a) =
   162   let
   151   let
   163     val result = ref (NONE: 'a Exn.result option);
   152     val result = ref (NONE: 'a Exn.result option);
   184 fun execute name (task, group, jobs) =
   173 fun execute name (task, group, jobs) =
   185   let
   174   let
   186     val valid = not (Task_Queue.is_canceled group);
   175     val valid = not (Task_Queue.is_canceled group);
   187     val ok = setmp_thread_data (name, task, group) (fn () =>
   176     val ok = setmp_thread_data (name, task, group) (fn () =>
   188       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   177       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   189     val _ = SYNCHRONIZED "execute" (fn () =>
   178     val _ = SYNCHRONIZED "finish" (fn () =>
   190       let
   179       let
   191         val maximal = change_result queue (Task_Queue.finish task);
   180         val maximal = change_result queue (Task_Queue.finish task);
   192         val _ =
   181         val _ =
   193           if ok then ()
   182           if ok then ()
   194           else if Task_Queue.cancel (! queue) group then ()
   183           else if Task_Queue.cancel (! queue) group then ()
   197         val _ = if maximal then () else broadcast work_available;
   186         val _ = if maximal then () else broadcast work_available;
   198       in () end);
   187       in () end);
   199   in () end;
   188   in () end;
   200 
   189 
   201 
   190 
       
   191 (* worker activity *)
       
   192 
       
   193 fun count_active () = (*requires SYNCHRONIZED*)
       
   194   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
       
   195 
       
   196 fun change_active active = (*requires SYNCHRONIZED*)
       
   197   change workers (AList.update Thread.equal (Thread.self (), active));
       
   198 
       
   199 
   202 (* worker threads *)
   200 (* worker threads *)
   203 
   201 
   204 fun worker_wait cond = (*requires SYNCHRONIZED*)
   202 fun worker_wait cond = (*requires SYNCHRONIZED*)
   205  (change_active false; broadcast scheduler_event;
   203  (change_active false; broadcast scheduler_event;
   206   wait cond;
   204   wait cond;
   210   if ! excessive > 0 then
   208   if ! excessive > 0 then
   211     (dec excessive;
   209     (dec excessive;
   212      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   210      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   213      broadcast scheduler_event;
   211      broadcast scheduler_event;
   214      NONE)
   212      NONE)
   215   else if overloaded () then (worker_wait scheduler_event; worker_next ())
   213   else if count_active () > Multithreading.max_threads_value () then
       
   214     (worker_wait scheduler_event; worker_next ())
   216   else
   215   else
   217     (case change_result queue Task_Queue.dequeue of
   216     (case change_result queue Task_Queue.dequeue of
   218       NONE => (worker_wait work_available; worker_next ())
   217       NONE => (worker_wait work_available; worker_next ())
   219     | some => some);
   218     | some => some);
   220 
   219 
   241         else
   240         else
   242          (last_status := now; Multithreading.tracing 1 (fn () =>
   241          (last_status := now; Multithreading.tracing 1 (fn () =>
   243             let
   242             let
   244               val {ready, pending, running} = Task_Queue.status (! queue);
   243               val {ready, pending, running} = Task_Queue.status (! queue);
   245               val total = length (! workers);
   244               val total = length (! workers);
   246               val active = count_active (! workers);
   245               val active = count_active ();
   247             in
   246             in
   248               "SCHEDULE: " ^
   247               "SCHEDULE: " ^
   249                 string_of_int ready ^ " ready, " ^
   248                 string_of_int ready ^ " ready, " ^
   250                 string_of_int pending ^ " pending, " ^
   249                 string_of_int pending ^ " pending, " ^
   251                 string_of_int running ^ " running; " ^
   250                 string_of_int running ^ " running; " ^
   317     val group =
   316     val group =
   318       (case opt_group of
   317       (case opt_group of
   319         SOME group => group
   318         SOME group => group
   320       | NONE => Task_Queue.new_group (worker_group ()));
   319       | NONE => Task_Queue.new_group (worker_group ()));
   321     val (result, job) = future_job group e;
   320     val (result, job) = future_job group e;
   322     val task = SYNCHRONIZED "future" (fn () =>
   321     val task = SYNCHRONIZED "enqueue" (fn () =>
   323       let
   322       let
   324         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   323         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   325         val _ = if minimal then signal work_available else ();
   324         val _ = if minimal then signal work_available else ();
   326         val _ = scheduler_check ();
   325         val _ = scheduler_check ();
   327       in task end);
   326       in task end);
   364 
   363 
   365 in
   364 in
   366 
   365 
   367 fun join_results xs =
   366 fun join_results xs =
   368   if forall is_finished xs then map get_result xs
   367   if forall is_finished xs then map get_result xs
       
   368   else if Multithreading.self_critical () then
       
   369     error "Cannot join future values within critical section"
   369   else uninterruptible (fn _ => fn () =>
   370   else uninterruptible (fn _ => fn () =>
   370     let
   371      (if is_worker ()
   371       val _ = Multithreading.self_critical () andalso
   372       then join_work (map task_of xs)
   372         error "Cannot join future values within critical section";
   373       else List.app join_wait xs;
   373       val _ =
   374       map get_result xs)) ();
   374         if is_worker () then join_work (map task_of xs)
       
   375         else List.app join_wait xs;
       
   376     in map get_result xs end) ();
       
   377 
   375 
   378 end;
   376 end;
   379 
   377 
   380 fun join_result x = singleton join_results x;
   378 fun join_result x = singleton join_results x;
   381 fun join x = Exn.release (join_result x);
   379 fun join x = Exn.release (join_result x);
   387   let
   385   let
   388     val task = task_of x;
   386     val task = task_of x;
   389     val group = Task_Queue.new_group (SOME (group_of x));
   387     val group = Task_Queue.new_group (SOME (group_of x));
   390     val (result, job) = future_job group (fn () => f (join x));
   388     val (result, job) = future_job group (fn () => f (join x));
   391 
   389 
   392     val extended = SYNCHRONIZED "map_future" (fn () =>
   390     val extended = SYNCHRONIZED "extend" (fn () =>
   393       (case Task_Queue.extend task job (! queue) of
   391       (case Task_Queue.extend task job (! queue) of
   394         SOME queue' => (queue := queue'; true)
   392         SOME queue' => (queue := queue'; true)
   395       | NONE => false));
   393       | NONE => false));
   396   in
   394   in
   397     if extended then Future {task = task, group = group, result = result}
   395     if extended then Future {task = task, group = group, result = result}