more direct Future.bulk, which potentially reduces overhead for Par_List;
authorwenzelm
Mon Jan 31 21:54:49 2011 +0100 (2011-01-31 ago)
changeset 416722f70b1ddd09f
parent 41671 5ffa2cf4cced
child 41673 1c191a39549f
more direct Future.bulk, which potentially reduces overhead for Par_List;
tuned signature;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/par_list.ML
src/Pure/PIDE/document.ML
src/Pure/Thy/thy_info.ML
src/Pure/proofterm.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Mon Jan 31 17:19:23 2011 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Mon Jan 31 21:54:49 2011 +0100
     1.3 @@ -44,9 +44,7 @@
     1.4    val group_of: 'a future -> group
     1.5    val peek: 'a future -> 'a Exn.result option
     1.6    val is_finished: 'a future -> bool
     1.7 -  val fork_group: group -> (unit -> 'a) -> 'a future
     1.8 -  val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
     1.9 -  val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    1.10 +  val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list
    1.11    val fork_pri: int -> (unit -> 'a) -> 'a future
    1.12    val fork: (unit -> 'a) -> 'a future
    1.13    val join_results: 'a future list -> 'a Exn.result list
    1.14 @@ -401,27 +399,32 @@
    1.15  
    1.16  (* fork *)
    1.17  
    1.18 -fun fork_future opt_group deps pri e =
    1.19 +fun bulk {group, deps, pri} es =
    1.20    let
    1.21 -    val group =
    1.22 -      (case opt_group of
    1.23 +    val grp =
    1.24 +      (case group of
    1.25          NONE => worker_subgroup ()
    1.26 -      | SOME group => group);
    1.27 -    val (result, job) = future_job group e;
    1.28 -    val task = SYNCHRONIZED "enqueue" (fn () =>
    1.29 +      | SOME grp => grp);
    1.30 +    fun enqueue e (minimal, queue) =
    1.31        let
    1.32 -        val (task, minimal) =
    1.33 -          Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
    1.34 +        val (result, job) = future_job grp e;
    1.35 +        val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue;
    1.36 +        val future = Future {promised = false, task = task, group = grp, result = result};
    1.37 +      in (future, (minimal orelse minimal', queue')) end;
    1.38 +  in
    1.39 +    SYNCHRONIZED "enqueue" (fn () =>
    1.40 +      let
    1.41 +        val (futures, minimal) =
    1.42 +          Unsynchronized.change_result queue (fn q =>
    1.43 +            let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
    1.44 +            in ((futures, minimal), q') end);
    1.45          val _ = if minimal then signal work_available else ();
    1.46          val _ = scheduler_check ();
    1.47 -      in task end);
    1.48 -  in Future {promised = false, task = task, group = group, result = result} end;
    1.49 +      in futures end)
    1.50 +  end;
    1.51  
    1.52 -fun fork_group group e = fork_future (SOME group) [] 0 e;
    1.53 -fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
    1.54 -fun fork_deps deps e = fork_deps_pri deps 0 e;
    1.55 -fun fork_pri pri e = fork_deps_pri [] pri e;
    1.56 -fun fork e = fork_deps [] e;
    1.57 +fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e;
    1.58 +fun fork e = fork_pri 0 e;
    1.59  
    1.60  
    1.61  (* join *)
    1.62 @@ -495,7 +498,9 @@
    1.63        | NONE => false));
    1.64    in
    1.65      if extended then Future {promised = false, task = task, group = group, result = result}
    1.66 -    else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
    1.67 +    else
    1.68 +      singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task})
    1.69 +        (fn () => f (join x))
    1.70    end;
    1.71  
    1.72  
     2.1 --- a/src/Pure/Concurrent/par_list.ML	Mon Jan 31 17:19:23 2011 +0100
     2.2 +++ b/src/Pure/Concurrent/par_list.ML	Mon Jan 31 21:54:49 2011 +0100
     2.3 @@ -29,11 +29,11 @@
     2.4  fun managed_results f xs =
     2.5    if Multithreading.enabled () andalso not (Multithreading.self_critical ()) then
     2.6      let
     2.7 -      val shared_group = Task_Queue.new_group (Future.worker_group ());
     2.8 -      val results =
     2.9 -        Future.join_results (map (fn x => Future.fork_group shared_group (fn () => f x)) xs)
    2.10 -          handle exn =>
    2.11 -            (if Exn.is_interrupt exn then Future.cancel_group shared_group else (); reraise exn);
    2.12 +      val group = Task_Queue.new_group (Future.worker_group ());
    2.13 +      val futures =
    2.14 +        Future.bulk {group = SOME group, deps = [], pri = 0} (map (fn x => fn () => f x) xs);
    2.15 +      val results = Future.join_results futures
    2.16 +        handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
    2.17      in results end
    2.18    else map (Exn.capture f) xs;
    2.19  
     3.1 --- a/src/Pure/PIDE/document.ML	Mon Jan 31 17:19:23 2011 +0100
     3.2 +++ b/src/Pure/PIDE/document.ML	Mon Jan 31 21:54:49 2011 +0100
     3.3 @@ -208,7 +208,7 @@
     3.4  
     3.5  fun async_state tr st =
     3.6    ignore
     3.7 -    (Future.fork_group (Task_Queue.new_group NONE)
     3.8 +    (singleton (Future.bulk {group = SOME (Task_Queue.new_group NONE), deps = [], pri = 0})
     3.9        (fn () =>
    3.10          Toplevel.setmp_thread_position tr
    3.11            (fn () => Toplevel.print_state false st) ()));
    3.12 @@ -337,14 +337,14 @@
    3.13        val _ = cancel state;
    3.14  
    3.15        val execution' = (* FIXME proper node deps *)
    3.16 -        [Future.fork_pri 1 (fn () =>
    3.17 +        Future.bulk {group = NONE, deps = [], pri = 1} [fn () =>
    3.18            let
    3.19              val _ = await_cancellation state;
    3.20              val _ =
    3.21                node_names_of version |> List.app (fn name =>
    3.22                  fold_entries NONE (fn (_, exec) => fn () => force_exec exec)
    3.23                      (get_node version name) ());
    3.24 -          in () end)];
    3.25 +          in () end];
    3.26  
    3.27        val _ = await_cancellation state;  (* FIXME async!? *)
    3.28  
     4.1 --- a/src/Pure/Thy/thy_info.ML	Mon Jan 31 17:19:23 2011 +0100
     4.2 +++ b/src/Pure/Thy/thy_info.ML	Mon Jan 31 21:54:49 2011 +0100
     4.3 @@ -184,11 +184,13 @@
     4.4              val deps = map (`get) (Graph.imm_preds task_graph name);
     4.5              fun failed (future, parent) = if can Future.join future then NONE else SOME parent;
     4.6  
     4.7 -            val future = Future.fork_deps (map #1 deps) (fn () =>
     4.8 -              (case map_filter failed deps of
     4.9 -                [] => body (map (#1 o Future.join o get) parents)
    4.10 -              | bad => error (loader_msg
    4.11 -                  ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") [])));
    4.12 +            val future =
    4.13 +              singleton (Future.bulk {group = NONE, deps = map (Future.task_of o #1) deps, pri = 0})
    4.14 +              (fn () =>
    4.15 +                (case map_filter failed deps of
    4.16 +                  [] => body (map (#1 o Future.join o get) parents)
    4.17 +                | bad => error (loader_msg
    4.18 +                    ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") [])));
    4.19            in Symtab.update (name, future) tab end
    4.20        | Finished thy => Symtab.update (name, Future.value (thy, I)) tab));
    4.21      val _ =
     5.1 --- a/src/Pure/proofterm.ML	Mon Jan 31 17:19:23 2011 +0100
     5.2 +++ b/src/Pure/proofterm.ML	Mon Jan 31 21:54:49 2011 +0100
     5.3 @@ -1389,8 +1389,11 @@
     5.4        if not (Multithreading.enabled ()) then Future.value (postproc (Future.join body))
     5.5        else Future.map postproc body
     5.6    | fulfill_proof_future thy promises postproc body =
     5.7 -      Future.fork_deps (body :: map snd promises) (fn () =>
     5.8 -        postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body)));
     5.9 +      singleton
    5.10 +        (Future.bulk {group = NONE,
    5.11 +            deps = Future.task_of body :: map (Future.task_of o snd) promises, pri = 0})
    5.12 +        (fn () =>
    5.13 +          postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body)));
    5.14  
    5.15  
    5.16  (***** abstraction over sort constraints *****)