src/Pure/Concurrent/par_list.ML
changeset 67658 67e5deb9e290
parent 62923 3a122e1e352a
child 67659 11b390e971f6
equal deleted inserted replaced
67657:ef20d4961f86 67658:67e5deb9e290
    14     for operators that have side-effects or raise exceptions!
    14     for operators that have side-effects or raise exceptions!
    15 *)
    15 *)
    16 
    16 
    17 signature PAR_LIST =
    17 signature PAR_LIST =
    18 sig
    18 sig
    19   val managed_results: string -> ('a -> 'b) -> 'a list -> 'b Exn.result list
       
    20   val map_name: string -> ('a -> 'b) -> 'a list -> 'b list
    19   val map_name: string -> ('a -> 'b) -> 'a list -> 'b list
    21   val map_independent: ('a -> 'b) -> 'a list -> 'b list
    20   val map_independent: ('a -> 'b) -> 'a list -> 'b list
    22   val map: ('a -> 'b) -> 'a list -> 'b list
    21   val map: ('a -> 'b) -> 'a list -> 'b list
    23   val get_some: ('a -> 'b option) -> 'a list -> 'b option
    22   val get_some: ('a -> 'b option) -> 'a list -> 'b option
    24   val find_some: ('a -> bool) -> 'a list -> 'a option
    23   val find_some: ('a -> bool) -> 'a list -> 'a option
    27 end;
    26 end;
    28 
    27 
    29 structure Par_List: PAR_LIST =
    28 structure Par_List: PAR_LIST =
    30 struct
    29 struct
    31 
    30 
    32 fun managed_results name f xs =
    31 fun forked_results name f xs =
    33   if null xs orelse null (tl xs) orelse not (Multithreading.enabled ())
    32   if null xs orelse null (tl xs) orelse not (Multithreading.enabled ())
    34   then map (Exn.capture f) xs
    33   then map (Exn.capture f) xs
    35   else
    34   else Future.forked_results {name = name, deps = []} (map (fn x => fn () => f x) xs);
    36     Thread_Attributes.uninterruptible (fn restore_attributes => fn () =>
       
    37       let
       
    38         val (group, pri) =
       
    39           (case Future.worker_task () of
       
    40             SOME task =>
       
    41               (Future.new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task)
       
    42           | NONE => (Future.new_group NONE, 0));
       
    43         val futures =
       
    44           Future.forks {name = name, group = SOME group, deps = [], pri = pri, interrupts = true}
       
    45             (map (fn x => fn () => f x) xs);
       
    46         val results =
       
    47           restore_attributes Future.join_results futures
       
    48             handle exn =>
       
    49               (if Exn.is_interrupt exn then Future.cancel_group group else (); Exn.reraise exn);
       
    50       in results end) ();
       
    51 
    35 
    52 fun map_name name f xs = Par_Exn.release_first (managed_results name f xs);
    36 fun map_name name f xs = Par_Exn.release_first (forked_results name f xs);
    53 fun map f = map_name "Par_List.map" f;
    37 fun map f = map_name "Par_List.map" f;
    54 fun map_independent f = map (Exn.interruptible_capture f) #> Par_Exn.release_all;
    38 fun map_independent f = map (Exn.interruptible_capture f) #> Par_Exn.release_all;
    55 
    39 
    56 fun get_some f xs =
    40 fun get_some f xs =
    57   let
    41   let
    58     exception FOUND of 'b;
    42     exception FOUND of 'b;
    59     val results =
    43     val results =
    60       managed_results "Par_List.get_some"
    44       forked_results "Par_List.get_some"
    61         (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs;
    45         (fn x => (case f x of NONE => () | SOME y => raise FOUND y)) xs;
    62   in
    46   in
    63     (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of
    47     (case get_first (fn Exn.Exn (FOUND res) => SOME res | _ => NONE) results of
    64       NONE => (Par_Exn.release_first results; NONE)
    48       NONE => (Par_Exn.release_first results; NONE)
    65     | some => some)
    49     | some => some)