fork/map: no inheritance of group (structure is nested, not parallel);
authorwenzelm
Thu, 04 Dec 2008 23:46:20 +0100 (2008-12-04)
changeset 28979 3ce619d8d432
parent 28978 f3e37d78b4f7
child 28980 9d7ea903e877
fork/map: no inheritance of group (structure is nested, not parallel); removed group thread_data; refined Future.fork interfaces, no longer export Future.future;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Thu Dec 04 23:02:56 2008 +0100
+++ b/src/Pure/Concurrent/future.ML	Thu Dec 04 23:46:20 2008 +0100
@@ -30,14 +30,15 @@
   val enabled: unit -> bool
   type task = TaskQueue.task
   type group = TaskQueue.group
-  val thread_data: unit -> (string * task * group) option
+  val thread_data: unit -> (string * task) option
   type 'a future
   val task_of: 'a future -> task
   val group_of: 'a future -> group
   val peek: 'a future -> 'a Exn.result option
   val is_finished: 'a future -> bool
-  val future: group option -> task list -> bool -> (unit -> 'a) -> 'a future
   val fork: (unit -> 'a) -> 'a future
+  val fork_group: group -> (unit -> 'a) -> 'a future
+  val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
   val fork_background: (unit -> 'a) -> 'a future
   val join_results: 'a future list -> 'a Exn.result list
   val join_result: 'a future -> 'a Exn.result
@@ -64,7 +65,7 @@
 type task = TaskQueue.task;
 type group = TaskQueue.group;
 
-local val tag = Universal.tag () : (string * task * group) option Universal.tag in
+local val tag = Universal.tag () : (string * task) option Universal.tag in
   fun thread_data () = the_default NONE (Thread.getLocal tag);
   fun setmp_thread_data data f x = Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
 end;
@@ -141,7 +142,7 @@
 fun execute name (task, group, run) =
   let
     val _ = trace_active ();
-    val ok = setmp_thread_data (name, task, group) run ();
+    val ok = setmp_thread_data (name, task) run ();
     val _ = SYNCHRONIZED "execute" (fn () =>
      (change queue (TaskQueue.finish task);
       if ok then ()
@@ -247,10 +248,10 @@
       change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
   in Future {task = task, group = group, result = result} end;
 
-fun fork_common pri = future (Option.map #3 (thread_data ())) [] pri;
-
-fun fork e = fork_common true e;
-fun fork_background e = fork_common false e;
+fun fork e = future NONE [] true e;
+fun fork_group group e = future (SOME group) [] true e;
+fun fork_deps deps e = future NONE (map task_of deps) true e;
+fun fork_background e = future NONE [] false e;
 
 
 (* join: retrieve results *)
@@ -275,7 +276,7 @@
               (*alien thread -- refrain from contending for resources*)
               while exists (not o is_finished) xs
               do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
-          | SOME (name, task, _) =>
+          | SOME (name, task) =>
               (*proper task -- actively work towards results*)
               let
                 val unfinished = xs |> map_filter
@@ -293,12 +294,11 @@
 fun join_result x = singleton join_results x;
 fun join x = Exn.release (join_result x);
 
+fun map f x = fork_deps [x] (fn () => f (join x));
+
 
 (* misc operations *)
 
-(*map: dependent fork/join*)
-fun map f x = future (SOME (group_of x)) [task_of x] true (fn () => f (join x));
-
 (*focus: collection of high-priority task*)
 fun focus tasks = SYNCHRONIZED "focus" (fn () =>
   change queue (TaskQueue.focus tasks));