src/Pure/Concurrent/future.ML
changeset 41672 2f70b1ddd09f
parent 41670 74010c6af0a4
child 41673 1c191a39549f
equal deleted inserted replaced
41671:5ffa2cf4cced 41672:2f70b1ddd09f
    42   type 'a future
    42   type 'a future
    43   val task_of: 'a future -> task
    43   val task_of: 'a future -> task
    44   val group_of: 'a future -> group
    44   val group_of: 'a future -> group
    45   val peek: 'a future -> 'a Exn.result option
    45   val peek: 'a future -> 'a Exn.result option
    46   val is_finished: 'a future -> bool
    46   val is_finished: 'a future -> bool
    47   val fork_group: group -> (unit -> 'a) -> 'a future
    47   val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list
    48   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
       
    49   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
       
    50   val fork_pri: int -> (unit -> 'a) -> 'a future
    48   val fork_pri: int -> (unit -> 'a) -> 'a future
    51   val fork: (unit -> 'a) -> 'a future
    49   val fork: (unit -> 'a) -> 'a future
    52   val join_results: 'a future list -> 'a Exn.result list
    50   val join_results: 'a future list -> 'a Exn.result list
    53   val join_result: 'a future -> 'a Exn.result
    51   val join_result: 'a future -> 'a Exn.result
    54   val join: 'a future -> 'a
    52   val join: 'a future -> 'a
   399 
   397 
   400 (** futures **)
   398 (** futures **)
   401 
   399 
   402 (* fork *)
   400 (* fork *)
   403 
   401 
   404 fun fork_future opt_group deps pri e =
   402 fun bulk {group, deps, pri} es =
   405   let
   403   let
   406     val group =
   404     val grp =
   407       (case opt_group of
   405       (case group of
   408         NONE => worker_subgroup ()
   406         NONE => worker_subgroup ()
   409       | SOME group => group);
   407       | SOME grp => grp);
   410     val (result, job) = future_job group e;
   408     fun enqueue e (minimal, queue) =
   411     val task = SYNCHRONIZED "enqueue" (fn () =>
       
   412       let
   409       let
   413         val (task, minimal) =
   410         val (result, job) = future_job grp e;
   414           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   411         val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue;
       
   412         val future = Future {promised = false, task = task, group = grp, result = result};
       
   413       in (future, (minimal orelse minimal', queue')) end;
       
   414   in
       
   415     SYNCHRONIZED "enqueue" (fn () =>
       
   416       let
       
   417         val (futures, minimal) =
       
   418           Unsynchronized.change_result queue (fn q =>
       
   419             let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
       
   420             in ((futures, minimal), q') end);
   415         val _ = if minimal then signal work_available else ();
   421         val _ = if minimal then signal work_available else ();
   416         val _ = scheduler_check ();
   422         val _ = scheduler_check ();
   417       in task end);
   423       in futures end)
   418   in Future {promised = false, task = task, group = group, result = result} end;
   424   end;
   419 
   425 
   420 fun fork_group group e = fork_future (SOME group) [] 0 e;
   426 fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e;
   421 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   427 fun fork e = fork_pri 0 e;
   422 fun fork_deps deps e = fork_deps_pri deps 0 e;
       
   423 fun fork_pri pri e = fork_deps_pri [] pri e;
       
   424 fun fork e = fork_deps [] e;
       
   425 
   428 
   426 
   429 
   427 (* join *)
   430 (* join *)
   428 
   431 
   429 local
   432 local
   493       (case Task_Queue.extend task job (! queue) of
   496       (case Task_Queue.extend task job (! queue) of
   494         SOME queue' => (queue := queue'; true)
   497         SOME queue' => (queue := queue'; true)
   495       | NONE => false));
   498       | NONE => false));
   496   in
   499   in
   497     if extended then Future {promised = false, task = task, group = group, result = result}
   500     if extended then Future {promised = false, task = task, group = group, result = result}
   498     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   501     else
       
   502       singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task})
       
   503         (fn () => f (join x))
   499   end;
   504   end;
   500 
   505 
   501 
   506 
   502 (* promised futures -- fulfilled by external means *)
   507 (* promised futures -- fulfilled by external means *)
   503 
   508