--- a/src/Pure/Concurrent/future.ML Mon Jan 31 17:19:23 2011 +0100
+++ b/src/Pure/Concurrent/future.ML Mon Jan 31 21:54:49 2011 +0100
@@ -44,9 +44,7 @@
val group_of: 'a future -> group
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
- val fork_group: group -> (unit -> 'a) -> 'a future
- val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
- val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
+ val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list
val fork_pri: int -> (unit -> 'a) -> 'a future
val fork: (unit -> 'a) -> 'a future
val join_results: 'a future list -> 'a Exn.result list
@@ -401,27 +399,32 @@
(* fork *)
-fun fork_future opt_group deps pri e =
+fun bulk {group, deps, pri} es =
let
- val group =
- (case opt_group of
+ val grp =
+ (case group of
NONE => worker_subgroup ()
- | SOME group => group);
- val (result, job) = future_job group e;
- val task = SYNCHRONIZED "enqueue" (fn () =>
+ | SOME grp => grp);
+ fun enqueue e (minimal, queue) =
let
- val (task, minimal) =
- Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
+ val (result, job) = future_job grp e;
+ val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue;
+ val future = Future {promised = false, task = task, group = grp, result = result};
+ in (future, (minimal orelse minimal', queue')) end;
+ in
+ SYNCHRONIZED "enqueue" (fn () =>
+ let
+ val (futures, minimal) =
+ Unsynchronized.change_result queue (fn q =>
+ let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
+ in ((futures, minimal), q') end);
val _ = if minimal then signal work_available else ();
val _ = scheduler_check ();
- in task end);
- in Future {promised = false, task = task, group = group, result = result} end;
+ in futures end)
+ end;
-fun fork_group group e = fork_future (SOME group) [] 0 e;
-fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
-fun fork_deps deps e = fork_deps_pri deps 0 e;
-fun fork_pri pri e = fork_deps_pri [] pri e;
-fun fork e = fork_deps [] e;
+fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e;
+fun fork e = fork_pri 0 e;
(* join *)
@@ -495,7 +498,9 @@
| NONE => false));
in
if extended then Future {promised = false, task = task, group = group, result = result}
- else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
+ else
+ singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task})
+ (fn () => f (join x))
end;