more direct Future.bulk, which potentially reduces overhead for Par_List;
tuned signature;
--- a/src/Pure/Concurrent/future.ML Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/Concurrent/future.ML Mon Jan 31 21:54:49 2011 +0100
@@ -44,9 +44,7 @@
val group_of: 'a future -> group
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
- val fork_group: group -> (unit -> 'a) -> 'a future
- val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
- val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
+ val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list
val fork_pri: int -> (unit -> 'a) -> 'a future
val fork: (unit -> 'a) -> 'a future
val join_results: 'a future list -> 'a Exn.result list
@@ -401,27 +399,32 @@
(* fork *)
-fun fork_future opt_group deps pri e =
+fun bulk {group, deps, pri} es =
let
- val group =
- (case opt_group of
+ val grp =
+ (case group of
NONE => worker_subgroup ()
- | SOME group => group);
- val (result, job) = future_job group e;
- val task = SYNCHRONIZED "enqueue" (fn () =>
+ | SOME grp => grp);
+ fun enqueue e (minimal, queue) =
let
- val (task, minimal) =
- Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
+ val (result, job) = future_job grp e;
+ val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue;
+ val future = Future {promised = false, task = task, group = grp, result = result};
+ in (future, (minimal orelse minimal', queue')) end;
+ in
+ SYNCHRONIZED "enqueue" (fn () =>
+ let
+ val (futures, minimal) =
+ Unsynchronized.change_result queue (fn q =>
+ let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
+ in ((futures, minimal), q') end);
val _ = if minimal then signal work_available else ();
val _ = scheduler_check ();
- in task end);
- in Future {promised = false, task = task, group = group, result = result} end;
+ in futures end)
+ end;
-fun fork_group group e = fork_future (SOME group) [] 0 e;
-fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
-fun fork_deps deps e = fork_deps_pri deps 0 e;
-fun fork_pri pri e = fork_deps_pri [] pri e;
-fun fork e = fork_deps [] e;
+fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e;
+fun fork e = fork_pri 0 e;
(* join *)
@@ -495,7 +498,9 @@
| NONE => false));
in
if extended then Future {promised = false, task = task, group = group, result = result}
- else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
+ else
+ singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task})
+ (fn () => f (join x))
end;
--- a/src/Pure/Concurrent/par_list.ML Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/Concurrent/par_list.ML Mon Jan 31 21:54:49 2011 +0100
@@ -29,11 +29,11 @@
fun managed_results f xs =
if Multithreading.enabled () andalso not (Multithreading.self_critical ()) then
let
- val shared_group = Task_Queue.new_group (Future.worker_group ());
- val results =
- Future.join_results (map (fn x => Future.fork_group shared_group (fn () => f x)) xs)
- handle exn =>
- (if Exn.is_interrupt exn then Future.cancel_group shared_group else (); reraise exn);
+ val group = Task_Queue.new_group (Future.worker_group ());
+ val futures =
+ Future.bulk {group = SOME group, deps = [], pri = 0} (map (fn x => fn () => f x) xs);
+ val results = Future.join_results futures
+ handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
in results end
else map (Exn.capture f) xs;
--- a/src/Pure/PIDE/document.ML Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/PIDE/document.ML Mon Jan 31 21:54:49 2011 +0100
@@ -208,7 +208,7 @@
fun async_state tr st =
ignore
- (Future.fork_group (Task_Queue.new_group NONE)
+ (singleton (Future.bulk {group = SOME (Task_Queue.new_group NONE), deps = [], pri = 0})
(fn () =>
Toplevel.setmp_thread_position tr
(fn () => Toplevel.print_state false st) ()));
@@ -337,14 +337,14 @@
val _ = cancel state;
val execution' = (* FIXME proper node deps *)
- [Future.fork_pri 1 (fn () =>
+ Future.bulk {group = NONE, deps = [], pri = 1} [fn () =>
let
val _ = await_cancellation state;
val _ =
node_names_of version |> List.app (fn name =>
fold_entries NONE (fn (_, exec) => fn () => force_exec exec)
(get_node version name) ());
- in () end)];
+ in () end];
val _ = await_cancellation state; (* FIXME async!? *)
--- a/src/Pure/Thy/thy_info.ML Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/Thy/thy_info.ML Mon Jan 31 21:54:49 2011 +0100
@@ -184,11 +184,13 @@
val deps = map (`get) (Graph.imm_preds task_graph name);
fun failed (future, parent) = if can Future.join future then NONE else SOME parent;
- val future = Future.fork_deps (map #1 deps) (fn () =>
- (case map_filter failed deps of
- [] => body (map (#1 o Future.join o get) parents)
- | bad => error (loader_msg
- ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") [])));
+ val future =
+ singleton (Future.bulk {group = NONE, deps = map (Future.task_of o #1) deps, pri = 0})
+ (fn () =>
+ (case map_filter failed deps of
+ [] => body (map (#1 o Future.join o get) parents)
+ | bad => error (loader_msg
+ ("failed to load " ^ quote name ^ " (unresolved " ^ commas_quote bad ^ ")") [])));
in Symtab.update (name, future) tab end
| Finished thy => Symtab.update (name, Future.value (thy, I)) tab));
val _ =
--- a/src/Pure/proofterm.ML Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/proofterm.ML Mon Jan 31 21:54:49 2011 +0100
@@ -1389,8 +1389,11 @@
if not (Multithreading.enabled ()) then Future.value (postproc (Future.join body))
else Future.map postproc body
| fulfill_proof_future thy promises postproc body =
- Future.fork_deps (body :: map snd promises) (fn () =>
- postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body)));
+ singleton
+ (Future.bulk {group = NONE,
+ deps = Future.task_of body :: map (Future.task_of o snd) promises, pri = 0})
+ (fn () =>
+ postproc (fulfill_norm_proof thy (map (apsnd Future.join) promises) (Future.join body)));
(***** abstraction over sort constraints *****)