# HG changeset patch # User wenzelm # Date 1519032953 -3600 # Node ID 67e5deb9e2905c25ed8bae28fabc70a897759739 # Parent ef20d4961f866fc34e7f7128f12f3f6211db347b clarified modules; diff -r ef20d4961f86 -r 67e5deb9e290 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Mon Feb 19 10:05:37 2018 +0100 +++ b/src/Pure/Concurrent/future.ML Mon Feb 19 10:35:53 2018 +0100 @@ -31,6 +31,8 @@ val join_result: 'a future -> 'a Exn.result val joins: 'a future list -> 'a list val join: 'a future -> 'a + val forked_results: {name: string, deps: Task_Queue.task list} -> + (unit -> 'a) list -> 'a Exn.result list val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b val value_result: 'a Exn.result -> 'a future val value: 'a -> 'a future @@ -539,6 +541,24 @@ fun join x = Exn.release (join_result x); +(* forked results: nested parallel evaluation *) + +fun forked_results {name, deps} es = + Thread_Attributes.uninterruptible (fn restore_attributes => fn () => + let + val (group, pri) = + (case worker_task () of + SOME task => + (new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task) + | NONE => (new_group NONE, 0)); + val futures = + forks {name = name, group = SOME group, deps = deps, pri = pri, interrupts = true} es; + in + restore_attributes join_results futures + handle exn => (if Exn.is_interrupt exn then cancel_group group else (); Exn.reraise exn) + end) (); + + (* task context for running thread *) fun task_context name group f x = diff -r ef20d4961f86 -r 67e5deb9e290 src/Pure/Concurrent/par_list.ML --- a/src/Pure/Concurrent/par_list.ML Mon Feb 19 10:05:37 2018 +0100 +++ b/src/Pure/Concurrent/par_list.ML Mon Feb 19 10:35:53 2018 +0100 @@ -16,7 +16,6 @@ signature PAR_LIST = sig - val managed_results: string -> ('a -> 'b) -> 'a list -> 'b Exn.result list val map_name: string -> ('a -> 'b) -> 'a list -> 'b list val map_independent: ('a -> 'b) -> 'a list -> 'b list val map: ('a -> 'b) -> 'a list -> 'b list @@ -29,27 +28,12 @@ structure Par_List: PAR_LIST = struct -fun managed_results name f xs = +fun forked_results name f xs = if null xs orelse null (tl xs) orelse not (Multithreading.enabled ()) then map (Exn.capture f) xs - else - Thread_Attributes.uninterruptible (fn restore_attributes => fn () => - let - val (group, pri) = - (case Future.worker_task () of - SOME task => - (Future.new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task) - | NONE => (Future.new_group NONE, 0)); - val futures = - Future.forks {name = name, group = SOME group, deps = [], pri = pri, interrupts = true} - (map (fn x => fn () => f x) xs); - val results = - restore_attributes Future.join_results futures - handle exn => - (if Exn.is_interrupt exn then Future.cancel_group group else (); Exn.reraise exn); - in results end) (); + else Future.forked_results {name = name, deps = []} (map (fn x => fn () => f x) xs); -fun map_name name f xs = Par_Exn.release_first (managed_results name f xs); +fun map_name name f xs = Par_Exn.release_first (forked_results name f xs); fun map f = map_name "Par_List.map" f; fun map_independent f = map (Exn.interruptible_capture f) #> Par_Exn.release_all; @@ -57,7 +41,7 @@ let exception FOUND of 'b; val results = - managed_results "Par_List.get_some" + forked_results "Par_List.get_some" (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs; in (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of