--- a/src/Pure/Concurrent/future.ML Mon Feb 19 10:05:37 2018 +0100
+++ b/src/Pure/Concurrent/future.ML Mon Feb 19 10:35:53 2018 +0100
@@ -31,6 +31,8 @@
val join_result: 'a future -> 'a Exn.result
val joins: 'a future list -> 'a list
val join: 'a future -> 'a
+ val forked_results: {name: string, deps: Task_Queue.task list} ->
+ (unit -> 'a) list -> 'a Exn.result list
val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
val value_result: 'a Exn.result -> 'a future
val value: 'a -> 'a future
@@ -539,6 +541,24 @@
fun join x = Exn.release (join_result x);
+(* forked results: nested parallel evaluation *)
+
+fun forked_results {name, deps} es =
+ Thread_Attributes.uninterruptible (fn restore_attributes => fn () =>
+ let
+ val (group, pri) =
+ (case worker_task () of
+ SOME task =>
+ (new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task)
+ | NONE => (new_group NONE, 0));
+ val futures =
+ forks {name = name, group = SOME group, deps = deps, pri = pri, interrupts = true} es;
+ in
+ restore_attributes join_results futures
+ handle exn => (if Exn.is_interrupt exn then cancel_group group else (); Exn.reraise exn)
+ end) ();
+
+
(* task context for running thread *)
fun task_context name group f x =
--- a/src/Pure/Concurrent/par_list.ML Mon Feb 19 10:05:37 2018 +0100
+++ b/src/Pure/Concurrent/par_list.ML Mon Feb 19 10:35:53 2018 +0100
@@ -16,7 +16,6 @@
signature PAR_LIST =
sig
- val managed_results: string -> ('a -> 'b) -> 'a list -> 'b Exn.result list
val map_name: string -> ('a -> 'b) -> 'a list -> 'b list
val map_independent: ('a -> 'b) -> 'a list -> 'b list
val map: ('a -> 'b) -> 'a list -> 'b list
@@ -29,27 +28,12 @@
structure Par_List: PAR_LIST =
struct
-fun managed_results name f xs =
+fun forked_results name f xs =
if null xs orelse null (tl xs) orelse not (Multithreading.enabled ())
then map (Exn.capture f) xs
- else
- Thread_Attributes.uninterruptible (fn restore_attributes => fn () =>
- let
- val (group, pri) =
- (case Future.worker_task () of
- SOME task =>
- (Future.new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task)
- | NONE => (Future.new_group NONE, 0));
- val futures =
- Future.forks {name = name, group = SOME group, deps = [], pri = pri, interrupts = true}
- (map (fn x => fn () => f x) xs);
- val results =
- restore_attributes Future.join_results futures
- handle exn =>
- (if Exn.is_interrupt exn then Future.cancel_group group else (); Exn.reraise exn);
- in results end) ();
+ else Future.forked_results {name = name, deps = []} (map (fn x => fn () => f x) xs);
-fun map_name name f xs = Par_Exn.release_first (managed_results name f xs);
+fun map_name name f xs = Par_Exn.release_first (forked_results name f xs);
fun map f = map_name "Par_List.map" f;
fun map_independent f = map (Exn.interruptible_capture f) #> Par_Exn.release_all;
@@ -57,7 +41,7 @@
let
exception FOUND of 'b;
val results =
- managed_results "Par_List.get_some"
+ forked_results "Par_List.get_some"
(fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs;
in
(case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of