more uniform Future.map: always internalize failure;
authorwenzelm
Sun Mar 03 17:34:42 2013 +0100 (2013-03-03)
changeset 51325bcd6b1aa4db5
parent 51324 c17f5de0a915
child 51326 a75040aaf369
more uniform Future.map: always internalize failure;
added Future.flat as fast-path operation;
src/Pure/Concurrent/future.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Sun Mar 03 14:29:30 2013 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Sun Mar 03 17:34:42 2013 +0100
     1.3 @@ -70,6 +70,7 @@
     1.4    val value: 'a -> 'a future
     1.5    val cond_forks: params -> (unit -> 'a) list -> 'a future list
     1.6    val map: ('a -> 'b) -> 'a future -> 'b future
     1.7 +  val flat: 'a list future list -> 'a list future
     1.8    val promise_group: group -> (unit -> unit) -> 'a future
     1.9    val promise: (unit -> unit) -> 'a future
    1.10    val fulfill_result: 'a future -> 'a Exn.result -> unit
    1.11 @@ -570,7 +571,7 @@
    1.12  fun join x = Exn.release (join_result x);
    1.13  
    1.14  
    1.15 -(* fast-path versions -- bypassing task queue *)
    1.16 +(* fast-path operations -- bypass task queue if possible *)
    1.17  
    1.18  fun value_result (res: 'a Exn.result) =
    1.19    let
    1.20 @@ -587,7 +588,7 @@
    1.21    else map (fn e => value_result (Exn.interruptible_capture e ())) es;
    1.22  
    1.23  fun map_future f x =
    1.24 -  if is_finished x then value (f (join x))
    1.25 +  if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
    1.26    else
    1.27      let
    1.28        val task = task_of x;
    1.29 @@ -607,6 +608,20 @@
    1.30            (fn () => f (join x))
    1.31      end;
    1.32  
    1.33 +fun flat_future futures =
    1.34 +  (case filter_out is_finished futures of
    1.35 +    [] => value_result (Exn.interruptible_capture (flat o joins) futures)
    1.36 +  | [x] => map_future (fn _ => flat (joins (futures))) x
    1.37 +  | xs =>
    1.38 +      let
    1.39 +        val tasks = map task_of xs;
    1.40 +        val pri = foldl1 Int.min (map Task_Queue.pri_of_task tasks);
    1.41 +      in
    1.42 +        (singleton o cond_forks)
    1.43 +          {name = "flat_future", group = NONE, deps = tasks, pri = pri, interrupts = true}
    1.44 +          (fn () => flat (joins (futures)))
    1.45 +      end);
    1.46 +
    1.47  
    1.48  (* promised futures -- fulfilled by external means *)
    1.49  
    1.50 @@ -690,6 +705,7 @@
    1.51  
    1.52  (*final declarations of this structure!*)
    1.53  val map = map_future;
    1.54 +val flat = flat_future;
    1.55  
    1.56  end;
    1.57