src/Pure/Concurrent/future.ML
changeset 28186 6a8417f36837
parent 28177 8c0335bc9336
child 28191 9e5f556409c6
     1.1 --- a/src/Pure/Concurrent/future.ML	Tue Sep 09 23:30:00 2008 +0200
     1.2 +++ b/src/Pure/Concurrent/future.ML	Tue Sep 09 23:30:05 2008 +0200
     1.3 @@ -13,9 +13,10 @@
     1.4    val task_of: 'a T -> task
     1.5    val group_of: 'a T -> group
     1.6    val shutdown_request: unit -> unit
     1.7 -  val cancel: 'a T -> unit
     1.8    val future: bool -> task list -> (unit -> 'a) -> 'a T
     1.9    val fork: (unit -> 'a) -> 'a T
    1.10 +  val cancel: 'a T -> unit
    1.11 +  val join_all: 'a T list -> 'a list
    1.12    val join: 'a T -> 'a
    1.13  end;
    1.14  
    1.15 @@ -69,7 +70,6 @@
    1.16  
    1.17  fun shutdown_request () = Mailbox.send requests Shutdown;
    1.18  fun cancel_request group = Mailbox.send requests (Cancel group);
    1.19 -fun cancel x = cancel_request (group_of x);
    1.20  
    1.21  
    1.22  (* synchronization *)
    1.23 @@ -101,11 +101,6 @@
    1.24  
    1.25  (* execute *)
    1.26  
    1.27 -fun cancel_group group = (*requires SYNCHRONIZED*)
    1.28 -  (case change_result queue (TaskQueue.cancel group) of
    1.29 -    [] => true
    1.30 -  | running => (List.app (fn t => Thread.interrupt t handle Thread _ => ()) running; false));
    1.31 -
    1.32  fun execute name (task, group, run) =
    1.33    let
    1.34      val _ = set_thread_data (SOME (task, group));
    1.35 @@ -115,7 +110,9 @@
    1.36      val _ = set_thread_data NONE;
    1.37      val _ = SYNCHRONIZED (fn () =>
    1.38       (change queue (TaskQueue.finish task);
    1.39 -      if ok then () else if cancel_group group then () else cancel_request group;
    1.40 +      if ok then ()
    1.41 +      else if change_result queue (TaskQueue.cancel group) then ()
    1.42 +      else cancel_request group;
    1.43        notify_all ()));
    1.44    in () end;
    1.45  
    1.46 @@ -123,7 +120,10 @@
    1.47  (* worker threads *)
    1.48  
    1.49  fun change_active b = (*requires SYNCHRONIZED*)
    1.50 - (change active (fn n => if b then n + 1 else n - 1); trace_active ());
    1.51 +  (change active (fn n => if b then n + 1 else n - 1); trace_active ());
    1.52 +
    1.53 +fun worker_wait name = (*requires SYNCHRONIZED*)
    1.54 +  (change_active false; wait name; change_active true);
    1.55  
    1.56  fun worker_next name = (*requires SYNCHRONIZED*)
    1.57    if ! excessive > 0 then
    1.58 @@ -132,8 +132,8 @@
    1.59       change workers (remove Thread.equal (Thread.self ()));
    1.60       NONE)
    1.61    else
    1.62 -    (case change_result queue (TaskQueue.dequeue (Thread.self ())) of
    1.63 -      NONE => (change_active false; wait name; change_active true; worker_next name)
    1.64 +    (case change_result queue TaskQueue.dequeue of
    1.65 +      NONE => (worker_wait name; worker_next name)
    1.66      | some => some);
    1.67  
    1.68  fun worker_loop name =
    1.69 @@ -158,7 +158,8 @@
    1.70  
    1.71  fun scheduler_loop canceled =
    1.72    let
    1.73 -    val canceled' = SYNCHRONIZED (fn () => filter_out cancel_group canceled);
    1.74 +    val canceled' = SYNCHRONIZED (fn () =>
    1.75 +      filter_out (change_result queue o TaskQueue.cancel) canceled);
    1.76      val _ = scheduler_fork ();
    1.77    in
    1.78      (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
    1.79 @@ -195,14 +196,44 @@
    1.80  
    1.81  fun fork e = future false [] e;
    1.82  
    1.83 -fun join (Future {result, ...}) =
    1.84 +fun cancel x = (check_scheduler (); cancel_request (group_of x));
    1.85 +
    1.86 +
    1.87 +(* join *)
    1.88 +
    1.89 +fun join_all xs =
    1.90    let
    1.91      val _ = check_scheduler ();
    1.92  
    1.93 -    fun passive_loop () =
    1.94 -      (case ! result of
    1.95 -        NONE => (wait "join"; passive_loop ())
    1.96 -      | SOME res => res);
    1.97 -  in Exn.release (SYNCHRONIZED passive_loop) end;
    1.98 +    fun unfinished () =
    1.99 +      xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
   1.100 +
   1.101 +    (*alien thread -- refrain from contending for resources*)
   1.102 +    fun passive_join () = (*requires SYNCHRONIZED*)
   1.103 +      (case unfinished () of [] => ()
   1.104 +      | _ => (wait "join"; passive_join ()));
   1.105 +
   1.106 +    (*proper worker thread -- actively work towards results*)
   1.107 +    fun active_join () = (*requires SYNCHRONIZED*)
   1.108 +      (case unfinished () of [] => ()
   1.109 +      | tasks =>
   1.110 +          (case change_result queue (TaskQueue.dequeue_towards tasks) of
   1.111 +            NONE => (worker_wait "join"; active_join ())
   1.112 +          | SOME work => (execute "join" work; active_join ())));
   1.113 +
   1.114 +    val _ =
   1.115 +      (case thread_data () of
   1.116 +        NONE => SYNCHRONIZED passive_join
   1.117 +      | SOME (task, _) => SYNCHRONIZED (fn () =>
   1.118 +         (change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
   1.119 +
   1.120 +    val res = xs |> map (fn Future {result = ref (SOME res), ...} => res);
   1.121 +  in
   1.122 +    (case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of
   1.123 +      NONE => map Exn.release res
   1.124 +    | SOME e => raise e)
   1.125 +  end;
   1.126 +
   1.127 +fun join x = singleton join_all x;
   1.128  
   1.129  end;