src/Pure/Concurrent/future.ML
changeset 32102 81d03a29980c
parent 32099 5382c93108db
child 32107 47d0da617fcc
equal deleted inserted replaced
32101:e25107ff4f56 32102:81d03a29980c
    28 sig
    28 sig
    29   val enabled: unit -> bool
    29   val enabled: unit -> bool
    30   type task = Task_Queue.task
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    32   val is_worker: unit -> bool
       
    33   val worker_group: unit -> Task_Queue.group option
    33   type 'a future
    34   type 'a future
    34   val task_of: 'a future -> task
    35   val task_of: 'a future -> task
    35   val group_of: 'a future -> group
    36   val group_of: 'a future -> group
    36   val peek: 'a future -> 'a Exn.result option
    37   val peek: 'a future -> 'a Exn.result option
    37   val is_finished: 'a future -> bool
    38   val is_finished: 'a future -> bool
    38   val value: 'a -> 'a future
    39   val value: 'a -> 'a future
    39   val fork: (unit -> 'a) -> 'a future
    40   val fork: (unit -> 'a) -> 'a future
    40   val fork_group: group -> (unit -> 'a) -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    41   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    42   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    42   val fork_pri: int -> (unit -> 'a) -> 'a future
    43   val fork_pri: int -> (unit -> 'a) -> 'a future
    43   val fork_local: int -> (unit -> 'a) -> 'a future
       
    44   val join_results: 'a future list -> 'a Exn.result list
    44   val join_results: 'a future list -> 'a Exn.result list
    45   val join_result: 'a future -> 'a Exn.result
    45   val join_result: 'a future -> 'a Exn.result
    46   val join: 'a future -> 'a
    46   val join: 'a future -> 'a
    47   val map: ('a -> 'b) -> 'a future -> 'b future
    47   val map: ('a -> 'b) -> 'a future -> 'b future
    48   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    48   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    74   fun setmp_thread_data data f x =
    74   fun setmp_thread_data data f x =
    75     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    75     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    76 end;
    76 end;
    77 
    77 
    78 val is_worker = is_some o thread_data;
    78 val is_worker = is_some o thread_data;
       
    79 val worker_group = Option.map #3 o thread_data;
    79 
    80 
    80 
    81 
    81 (* datatype future *)
    82 (* datatype future *)
    82 
    83 
    83 datatype 'a future = Future of
    84 datatype 'a future = Future of
    91 fun peek (Future {result, ...}) = ! result;
    92 fun peek (Future {result, ...}) = ! result;
    92 fun is_finished x = is_some (peek x);
    93 fun is_finished x = is_some (peek x);
    93 
    94 
    94 fun value x = Future
    95 fun value x = Future
    95  {task = Task_Queue.new_task 0,
    96  {task = Task_Queue.new_task 0,
    96   group = Task_Queue.new_group (),
    97   group = Task_Queue.new_group NONE,
    97   result = ref (SOME (Exn.Result x))};
    98   result = ref (SOME (Exn.Result x))};
    98 
    99 
    99 
   100 
   100 
   101 
   101 (** scheduling **)
   102 (** scheduling **)
   170   change canceled (insert Task_Queue.eq_group group);
   171   change canceled (insert Task_Queue.eq_group group);
   171 
   172 
   172 fun execute name (task, group, jobs) =
   173 fun execute name (task, group, jobs) =
   173   let
   174   let
   174     val _ = trace_active ();
   175     val _ = trace_active ();
   175     val valid = null (Task_Queue.group_exns group);
   176     val valid = not (Task_Queue.is_canceled group);
   176     val ok = setmp_thread_data (name, task, group) (fn () =>
   177     val ok = setmp_thread_data (name, task, group) (fn () =>
   177       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   178       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   178     val _ = SYNCHRONIZED "execute" (fn () =>
   179     val _ = SYNCHRONIZED "execute" (fn () =>
   179      (change queue (Task_Queue.finish task);
   180      (change queue (Task_Queue.finish task);
   180       if ok then ()
   181       if ok then ()
   277 
   278 
   278 fun fork_future opt_group deps pri e =
   279 fun fork_future opt_group deps pri e =
   279   let
   280   let
   280     val _ = scheduler_check "future check";
   281     val _ = scheduler_check "future check";
   281 
   282 
   282     val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
   283     val group =
       
   284       (case opt_group of
       
   285         SOME group => group
       
   286       | NONE => Task_Queue.new_group (worker_group ()));
   283     val (result, job) = future_job group e;
   287     val (result, job) = future_job group e;
   284     val task = SYNCHRONIZED "future" (fn () =>
   288     val task = SYNCHRONIZED "future" (fn () =>
   285       change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
   289       change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
   286   in Future {task = task, group = group, result = result} end;
   290   in Future {task = task, group = group, result = result} end;
   287 
   291 
   288 fun fork e = fork_future NONE [] 0 e;
   292 fun fork e = fork_future NONE [] 0 e;
   289 fun fork_group group e = fork_future (SOME group) [] 0 e;
   293 fun fork_group group e = fork_future (SOME group) [] 0 e;
   290 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   294 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   291 fun fork_pri pri e = fork_future NONE [] pri e;
   295 fun fork_pri pri e = fork_future NONE [] pri e;
   292 fun fork_local pri e = fork_future (Option.map #3 (thread_data ())) [] pri e;
       
   293 
   296 
   294 
   297 
   295 (* join *)
   298 (* join *)
   296 
   299 
   297 local
   300 local
   298 
   301 
   299 fun get_result x =
   302 fun get_result x =
   300   (case peek x of
   303   (case peek x of
   301     SOME (Exn.Exn Exn.Interrupt) => Exn.Exn (Exn.EXCEPTIONS (Task_Queue.group_exns (group_of x)))
   304     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   302   | SOME res => res
   305   | SOME (Exn.Exn Exn.Interrupt) =>
   303   | NONE => Exn.Exn (SYS_ERROR "unfinished future"));
   306       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
       
   307   | SOME res => res);
   304 
   308 
   305 fun join_next deps = (*requires SYNCHRONIZED*)
   309 fun join_next deps = (*requires SYNCHRONIZED*)
   306   if overloaded () then (worker_wait (); join_next deps)
   310   if overloaded () then (worker_wait (); join_next deps)
   307   else change_result queue (Task_Queue.dequeue_towards deps);
   311   else change_result queue (Task_Queue.dequeue_towards deps);
   308 
   312 
   343 fun map_future f x =
   347 fun map_future f x =
   344   let
   348   let
   345     val _ = scheduler_check "map_future check";
   349     val _ = scheduler_check "map_future check";
   346 
   350 
   347     val task = task_of x;
   351     val task = task_of x;
   348     val group = Task_Queue.new_group ();
   352     val group = Task_Queue.new_group (SOME (group_of x));
   349     val (result, job) = future_job group (fn () => f (join x));
   353     val (result, job) = future_job group (fn () => f (join x));
   350 
   354 
   351     val extended = SYNCHRONIZED "map_future" (fn () =>
   355     val extended = SYNCHRONIZED "map_future" (fn () =>
   352       (case Task_Queue.extend task job (! queue) of
   356       (case Task_Queue.extend task job (! queue) of
   353         SOME queue' => (queue := queue'; true)
   357         SOME queue' => (queue := queue'; true)