added worker_group;
fork: default to subgroup of worker_group;
removed obsolete fork_local;
join/get_result: cumulative flattened exceptions;
map: subgroup of worker_group;
--- a/src/Pure/Concurrent/future.ML Tue Jul 21 20:37:31 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Tue Jul 21 20:37:31 2009 +0200
@@ -30,6 +30,7 @@
type task = Task_Queue.task
type group = Task_Queue.group
val is_worker: unit -> bool
+ val worker_group: unit -> Task_Queue.group option
type 'a future
val task_of: 'a future -> task
val group_of: 'a future -> group
@@ -40,7 +41,6 @@
val fork_group: group -> (unit -> 'a) -> 'a future
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
val fork_pri: int -> (unit -> 'a) -> 'a future
- val fork_local: int -> (unit -> 'a) -> 'a future
val join_results: 'a future list -> 'a Exn.result list
val join_result: 'a future -> 'a Exn.result
val join: 'a future -> 'a
@@ -76,6 +76,7 @@
end;
val is_worker = is_some o thread_data;
+val worker_group = Option.map #3 o thread_data;
(* datatype future *)
@@ -93,7 +94,7 @@
fun value x = Future
{task = Task_Queue.new_task 0,
- group = Task_Queue.new_group (),
+ group = Task_Queue.new_group NONE,
result = ref (SOME (Exn.Result x))};
@@ -172,7 +173,7 @@
fun execute name (task, group, jobs) =
let
val _ = trace_active ();
- val valid = null (Task_Queue.group_exns group);
+ val valid = not (Task_Queue.is_canceled group);
val ok = setmp_thread_data (name, task, group) (fn () =>
fold (fn job => fn ok => job valid andalso ok) jobs true) ();
val _ = SYNCHRONIZED "execute" (fn () =>
@@ -279,7 +280,10 @@
let
val _ = scheduler_check "future check";
- val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
+ val group =
+ (case opt_group of
+ SOME group => group
+ | NONE => Task_Queue.new_group (worker_group ()));
val (result, job) = future_job group e;
val task = SYNCHRONIZED "future" (fn () =>
change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
@@ -289,7 +293,6 @@
fun fork_group group e = fork_future (SOME group) [] 0 e;
fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
fun fork_pri pri e = fork_future NONE [] pri e;
-fun fork_local pri e = fork_future (Option.map #3 (thread_data ())) [] pri e;
(* join *)
@@ -298,9 +301,10 @@
fun get_result x =
(case peek x of
- SOME (Exn.Exn Exn.Interrupt) => Exn.Exn (Exn.EXCEPTIONS (Task_Queue.group_exns (group_of x)))
- | SOME res => res
- | NONE => Exn.Exn (SYS_ERROR "unfinished future"));
+ NONE => Exn.Exn (SYS_ERROR "unfinished future")
+ | SOME (Exn.Exn Exn.Interrupt) =>
+ Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
+ | SOME res => res);
fun join_next deps = (*requires SYNCHRONIZED*)
if overloaded () then (worker_wait (); join_next deps)
@@ -345,7 +349,7 @@
val _ = scheduler_check "map_future check";
val task = task_of x;
- val group = Task_Queue.new_group ();
+ val group = Task_Queue.new_group (SOME (group_of x));
val (result, job) = future_job group (fn () => f (join x));
val extended = SYNCHRONIZED "map_future" (fn () =>