fork/map: no inheritance of group (structure is nested, not parallel);
removed group thread_data;
refined Future.fork interfaces, no longer export Future.future;
--- a/src/Pure/Concurrent/future.ML Thu Dec 04 23:02:56 2008 +0100
+++ b/src/Pure/Concurrent/future.ML Thu Dec 04 23:46:20 2008 +0100
@@ -30,14 +30,15 @@
val enabled: unit -> bool
type task = TaskQueue.task
type group = TaskQueue.group
- val thread_data: unit -> (string * task * group) option
+ val thread_data: unit -> (string * task) option
type 'a future
val task_of: 'a future -> task
val group_of: 'a future -> group
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
- val future: group option -> task list -> bool -> (unit -> 'a) -> 'a future
val fork: (unit -> 'a) -> 'a future
+ val fork_group: group -> (unit -> 'a) -> 'a future
+ val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
val fork_background: (unit -> 'a) -> 'a future
val join_results: 'a future list -> 'a Exn.result list
val join_result: 'a future -> 'a Exn.result
@@ -64,7 +65,7 @@
type task = TaskQueue.task;
type group = TaskQueue.group;
-local val tag = Universal.tag () : (string * task * group) option Universal.tag in
+local val tag = Universal.tag () : (string * task) option Universal.tag in
fun thread_data () = the_default NONE (Thread.getLocal tag);
fun setmp_thread_data data f x = Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
end;
@@ -141,7 +142,7 @@
fun execute name (task, group, run) =
let
val _ = trace_active ();
- val ok = setmp_thread_data (name, task, group) run ();
+ val ok = setmp_thread_data (name, task) run ();
val _ = SYNCHRONIZED "execute" (fn () =>
(change queue (TaskQueue.finish task);
if ok then ()
@@ -247,10 +248,10 @@
change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
in Future {task = task, group = group, result = result} end;
-fun fork_common pri = future (Option.map #3 (thread_data ())) [] pri;
-
-fun fork e = fork_common true e;
-fun fork_background e = fork_common false e;
+fun fork e = future NONE [] true e;
+fun fork_group group e = future (SOME group) [] true e;
+fun fork_deps deps e = future NONE (map task_of deps) true e;
+fun fork_background e = future NONE [] false e;
(* join: retrieve results *)
@@ -275,7 +276,7 @@
(*alien thread -- refrain from contending for resources*)
while exists (not o is_finished) xs
do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
- | SOME (name, task, _) =>
+ | SOME (name, task) =>
(*proper task -- actively work towards results*)
let
val unfinished = xs |> map_filter
@@ -293,12 +294,11 @@
fun join_result x = singleton join_results x;
fun join x = Exn.release (join_result x);
+fun map f x = fork_deps [x] (fn () => f (join x));
+
(* misc operations *)
-(*map: dependent fork/join*)
-fun map f x = future (SOME (group_of x)) [task_of x] true (fn () => f (join x));
-
(*focus: collection of high-priority task*)
fun focus tasks = SYNCHRONIZED "focus" (fn () =>
change queue (TaskQueue.focus tasks));