src/Pure/Concurrent/future.ML
changeset 28186 6a8417f36837
parent 28177 8c0335bc9336
child 28191 9e5f556409c6
equal deleted inserted replaced
28185:0f20cbce4935 28186:6a8417f36837
    11   type group = TaskQueue.group
    11   type group = TaskQueue.group
    12   type 'a T
    12   type 'a T
    13   val task_of: 'a T -> task
    13   val task_of: 'a T -> task
    14   val group_of: 'a T -> group
    14   val group_of: 'a T -> group
    15   val shutdown_request: unit -> unit
    15   val shutdown_request: unit -> unit
    16   val cancel: 'a T -> unit
       
    17   val future: bool -> task list -> (unit -> 'a) -> 'a T
    16   val future: bool -> task list -> (unit -> 'a) -> 'a T
    18   val fork: (unit -> 'a) -> 'a T
    17   val fork: (unit -> 'a) -> 'a T
       
    18   val cancel: 'a T -> unit
       
    19   val join_all: 'a T list -> 'a list
    19   val join: 'a T -> 'a
    20   val join: 'a T -> 'a
    20 end;
    21 end;
    21 
    22 
    22 structure Future: FUTURE =
    23 structure Future: FUTURE =
    23 struct
    24 struct
    67 datatype request = Shutdown | Cancel of group;
    68 datatype request = Shutdown | Cancel of group;
    68 val requests = Mailbox.create () : request Mailbox.T;
    69 val requests = Mailbox.create () : request Mailbox.T;
    69 
    70 
    70 fun shutdown_request () = Mailbox.send requests Shutdown;
    71 fun shutdown_request () = Mailbox.send requests Shutdown;
    71 fun cancel_request group = Mailbox.send requests (Cancel group);
    72 fun cancel_request group = Mailbox.send requests (Cancel group);
    72 fun cancel x = cancel_request (group_of x);
       
    73 
    73 
    74 
    74 
    75 (* synchronization *)
    75 (* synchronization *)
    76 
    76 
    77 local
    77 local
    98 
    98 
    99 end;
    99 end;
   100 
   100 
   101 
   101 
   102 (* execute *)
   102 (* execute *)
   103 
       
   104 fun cancel_group group = (*requires SYNCHRONIZED*)
       
   105   (case change_result queue (TaskQueue.cancel group) of
       
   106     [] => true
       
   107   | running => (List.app (fn t => Thread.interrupt t handle Thread _ => ()) running; false));
       
   108 
   103 
   109 fun execute name (task, group, run) =
   104 fun execute name (task, group, run) =
   110   let
   105   let
   111     val _ = set_thread_data (SOME (task, group));
   106     val _ = set_thread_data (SOME (task, group));
   112     val _ = Multithreading.tracing 4 (fn () => name ^ ": running");
   107     val _ = Multithreading.tracing 4 (fn () => name ^ ": running");
   113     val ok = run ();
   108     val ok = run ();
   114     val _ = Multithreading.tracing 4 (fn () => name ^ ": finished");
   109     val _ = Multithreading.tracing 4 (fn () => name ^ ": finished");
   115     val _ = set_thread_data NONE;
   110     val _ = set_thread_data NONE;
   116     val _ = SYNCHRONIZED (fn () =>
   111     val _ = SYNCHRONIZED (fn () =>
   117      (change queue (TaskQueue.finish task);
   112      (change queue (TaskQueue.finish task);
   118       if ok then () else if cancel_group group then () else cancel_request group;
   113       if ok then ()
       
   114       else if change_result queue (TaskQueue.cancel group) then ()
       
   115       else cancel_request group;
   119       notify_all ()));
   116       notify_all ()));
   120   in () end;
   117   in () end;
   121 
   118 
   122 
   119 
   123 (* worker threads *)
   120 (* worker threads *)
   124 
   121 
   125 fun change_active b = (*requires SYNCHRONIZED*)
   122 fun change_active b = (*requires SYNCHRONIZED*)
   126  (change active (fn n => if b then n + 1 else n - 1); trace_active ());
   123   (change active (fn n => if b then n + 1 else n - 1); trace_active ());
       
   124 
       
   125 fun worker_wait name = (*requires SYNCHRONIZED*)
       
   126   (change_active false; wait name; change_active true);
   127 
   127 
   128 fun worker_next name = (*requires SYNCHRONIZED*)
   128 fun worker_next name = (*requires SYNCHRONIZED*)
   129   if ! excessive > 0 then
   129   if ! excessive > 0 then
   130     (dec excessive;
   130     (dec excessive;
   131      change_active false;
   131      change_active false;
   132      change workers (remove Thread.equal (Thread.self ()));
   132      change workers (remove Thread.equal (Thread.self ()));
   133      NONE)
   133      NONE)
   134   else
   134   else
   135     (case change_result queue (TaskQueue.dequeue (Thread.self ())) of
   135     (case change_result queue TaskQueue.dequeue of
   136       NONE => (change_active false; wait name; change_active true; worker_next name)
   136       NONE => (worker_wait name; worker_next name)
   137     | some => some);
   137     | some => some);
   138 
   138 
   139 fun worker_loop name =
   139 fun worker_loop name =
   140   (case SYNCHRONIZED (fn () => worker_next name) of
   140   (case SYNCHRONIZED (fn () => worker_next name) of
   141     NONE => ()
   141     NONE => ()
   156     val _ = excessive := l - m;
   156     val _ = excessive := l - m;
   157   in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end);
   157   in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end);
   158 
   158 
   159 fun scheduler_loop canceled =
   159 fun scheduler_loop canceled =
   160   let
   160   let
   161     val canceled' = SYNCHRONIZED (fn () => filter_out cancel_group canceled);
   161     val canceled' = SYNCHRONIZED (fn () =>
       
   162       filter_out (change_result queue o TaskQueue.cancel) canceled);
   162     val _ = scheduler_fork ();
   163     val _ = scheduler_fork ();
   163   in
   164   in
   164     (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
   165     (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
   165       SOME Shutdown => ()   (* FIXME proper worker shutdown *)
   166       SOME Shutdown => ()   (* FIXME proper worker shutdown *)
   166     | SOME (Cancel group) => scheduler_loop (group :: canceled')
   167     | SOME (Cancel group) => scheduler_loop (group :: canceled')
   193       change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
   194       change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
   194   in Future {task = task, group = group, result = result} end;
   195   in Future {task = task, group = group, result = result} end;
   195 
   196 
   196 fun fork e = future false [] e;
   197 fun fork e = future false [] e;
   197 
   198 
   198 fun join (Future {result, ...}) =
   199 fun cancel x = (check_scheduler (); cancel_request (group_of x));
       
   200 
       
   201 
       
   202 (* join *)
       
   203 
       
   204 fun join_all xs =
   199   let
   205   let
   200     val _ = check_scheduler ();
   206     val _ = check_scheduler ();
   201 
   207 
   202     fun passive_loop () =
   208     fun unfinished () =
   203       (case ! result of
   209       xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
   204         NONE => (wait "join"; passive_loop ())
   210 
   205       | SOME res => res);
   211     (*alien thread -- refrain from contending for resources*)
   206   in Exn.release (SYNCHRONIZED passive_loop) end;
   212     fun passive_join () = (*requires SYNCHRONIZED*)
   207 
   213       (case unfinished () of [] => ()
   208 end;
   214       | _ => (wait "join"; passive_join ()));
       
   215 
       
   216     (*proper worker thread -- actively work towards results*)
       
   217     fun active_join () = (*requires SYNCHRONIZED*)
       
   218       (case unfinished () of [] => ()
       
   219       | tasks =>
       
   220           (case change_result queue (TaskQueue.dequeue_towards tasks) of
       
   221             NONE => (worker_wait "join"; active_join ())
       
   222           | SOME work => (execute "join" work; active_join ())));
       
   223 
       
   224     val _ =
       
   225       (case thread_data () of
       
   226         NONE => SYNCHRONIZED passive_join
       
   227       | SOME (task, _) => SYNCHRONIZED (fn () =>
       
   228          (change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
       
   229 
       
   230     val res = xs |> map (fn Future {result = ref (SOME res), ...} => res);
       
   231   in
       
   232     (case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of
       
   233       NONE => map Exn.release res
       
   234     | SOME e => raise e)
       
   235   end;
       
   236 
       
   237 fun join x = singleton join_all x;
       
   238 
       
   239 end;