more uniform Future.map: always internalize failure;
added Future.flat as fast-path operation;
--- a/src/Pure/Concurrent/future.ML Sun Mar 03 14:29:30 2013 +0100
+++ b/src/Pure/Concurrent/future.ML Sun Mar 03 17:34:42 2013 +0100
@@ -70,6 +70,7 @@
val value: 'a -> 'a future
val cond_forks: params -> (unit -> 'a) list -> 'a future list
val map: ('a -> 'b) -> 'a future -> 'b future
+ val flat: 'a list future list -> 'a list future
val promise_group: group -> (unit -> unit) -> 'a future
val promise: (unit -> unit) -> 'a future
val fulfill_result: 'a future -> 'a Exn.result -> unit
@@ -570,7 +571,7 @@
fun join x = Exn.release (join_result x);
-(* fast-path versions -- bypassing task queue *)
+(* fast-path operations -- bypass task queue if possible *)
fun value_result (res: 'a Exn.result) =
let
@@ -587,7 +588,7 @@
else map (fn e => value_result (Exn.interruptible_capture e ())) es;
fun map_future f x =
- if is_finished x then value (f (join x))
+ if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
else
let
val task = task_of x;
@@ -607,6 +608,20 @@
(fn () => f (join x))
end;
+fun flat_future futures =
+ (case filter_out is_finished futures of
+ [] => value_result (Exn.interruptible_capture (flat o joins) futures)
+ | [x] => map_future (fn _ => flat (joins (futures))) x
+ | xs =>
+ let
+ val tasks = map task_of xs;
+ val pri = foldl1 Int.min (map Task_Queue.pri_of_task tasks);
+ in
+ (singleton o cond_forks)
+ {name = "flat_future", group = NONE, deps = tasks, pri = pri, interrupts = true}
+ (fn () => flat (joins (futures)))
+ end);
+
(* promised futures -- fulfilled by external means *)
@@ -690,6 +705,7 @@
(*final declarations of this structure!*)
val map = map_future;
+val flat = flat_future;
end;