14 for operators that have side-effects or raise exceptions! |
14 for operators that have side-effects or raise exceptions! |
15 *) |
15 *) |
16 |
16 |
17 signature PAR_LIST = |
17 signature PAR_LIST = |
18 sig |
18 sig |
19 val managed_results: string -> ('a -> 'b) -> 'a list -> 'b Exn.result list |
|
20 val map_name: string -> ('a -> 'b) -> 'a list -> 'b list |
19 val map_name: string -> ('a -> 'b) -> 'a list -> 'b list |
21 val map_independent: ('a -> 'b) -> 'a list -> 'b list |
20 val map_independent: ('a -> 'b) -> 'a list -> 'b list |
22 val map: ('a -> 'b) -> 'a list -> 'b list |
21 val map: ('a -> 'b) -> 'a list -> 'b list |
23 val get_some: ('a -> 'b option) -> 'a list -> 'b option |
22 val get_some: ('a -> 'b option) -> 'a list -> 'b option |
24 val find_some: ('a -> bool) -> 'a list -> 'a option |
23 val find_some: ('a -> bool) -> 'a list -> 'a option |
27 end; |
26 end; |
28 |
27 |
29 structure Par_List: PAR_LIST = |
28 structure Par_List: PAR_LIST = |
30 struct |
29 struct |
31 |
30 |
32 fun managed_results name f xs = |
31 fun forked_results name f xs = |
33 if null xs orelse null (tl xs) orelse not (Multithreading.enabled ()) |
32 if null xs orelse null (tl xs) orelse not (Multithreading.enabled ()) |
34 then map (Exn.capture f) xs |
33 then map (Exn.capture f) xs |
35 else |
34 else Future.forked_results {name = name, deps = []} (map (fn x => fn () => f x) xs); |
36 Thread_Attributes.uninterruptible (fn restore_attributes => fn () => |
|
37 let |
|
38 val (group, pri) = |
|
39 (case Future.worker_task () of |
|
40 SOME task => |
|
41 (Future.new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task) |
|
42 | NONE => (Future.new_group NONE, 0)); |
|
43 val futures = |
|
44 Future.forks {name = name, group = SOME group, deps = [], pri = pri, interrupts = true} |
|
45 (map (fn x => fn () => f x) xs); |
|
46 val results = |
|
47 restore_attributes Future.join_results futures |
|
48 handle exn => |
|
49 (if Exn.is_interrupt exn then Future.cancel_group group else (); Exn.reraise exn); |
|
50 in results end) (); |
|
51 |
35 |
52 fun map_name name f xs = Par_Exn.release_first (managed_results name f xs); |
36 fun map_name name f xs = Par_Exn.release_first (forked_results name f xs); |
53 fun map f = map_name "Par_List.map" f; |
37 fun map f = map_name "Par_List.map" f; |
54 fun map_independent f = map (Exn.interruptible_capture f) #> Par_Exn.release_all; |
38 fun map_independent f = map (Exn.interruptible_capture f) #> Par_Exn.release_all; |
55 |
39 |
56 fun get_some f xs = |
40 fun get_some f xs = |
57 let |
41 let |
58 exception FOUND of 'b; |
42 exception FOUND of 'b; |
59 val results = |
43 val results = |
60 managed_results "Par_List.get_some" |
44 forked_results "Par_List.get_some" |
61 (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs; |
45 (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs; |
62 in |
46 in |
63 (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of |
47 (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of |
64 NONE => (Par_Exn.release_first results; NONE) |
48 NONE => (Par_Exn.release_first results; NONE) |
65 | some => some) |
49 | some => some) |