added worker_group;
authorwenzelm
Tue, 21 Jul 2009 20:37:31 +0200
changeset 32102 81d03a29980c
parent 32101 e25107ff4f56
child 32103 ebdcff2b9810
added worker_group; fork: default to subgroup of worker_group; removed obsolete fork_local; join/get_result: cumulative flattened exceptions; map: subgroup of worker_group;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Tue Jul 21 20:37:31 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Tue Jul 21 20:37:31 2009 +0200
@@ -30,6 +30,7 @@
   type task = Task_Queue.task
   type group = Task_Queue.group
   val is_worker: unit -> bool
+  val worker_group: unit -> Task_Queue.group option
   type 'a future
   val task_of: 'a future -> task
   val group_of: 'a future -> group
@@ -40,7 +41,6 @@
   val fork_group: group -> (unit -> 'a) -> 'a future
   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
   val fork_pri: int -> (unit -> 'a) -> 'a future
-  val fork_local: int -> (unit -> 'a) -> 'a future
   val join_results: 'a future list -> 'a Exn.result list
   val join_result: 'a future -> 'a Exn.result
   val join: 'a future -> 'a
@@ -76,6 +76,7 @@
 end;
 
 val is_worker = is_some o thread_data;
+val worker_group = Option.map #3 o thread_data;
 
 
 (* datatype future *)
@@ -93,7 +94,7 @@
 
 fun value x = Future
  {task = Task_Queue.new_task 0,
-  group = Task_Queue.new_group (),
+  group = Task_Queue.new_group NONE,
   result = ref (SOME (Exn.Result x))};
 
 
@@ -172,7 +173,7 @@
 fun execute name (task, group, jobs) =
   let
     val _ = trace_active ();
-    val valid = null (Task_Queue.group_exns group);
+    val valid = not (Task_Queue.is_canceled group);
     val ok = setmp_thread_data (name, task, group) (fn () =>
       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
     val _ = SYNCHRONIZED "execute" (fn () =>
@@ -279,7 +280,10 @@
   let
     val _ = scheduler_check "future check";
 
-    val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
+    val group =
+      (case opt_group of
+        SOME group => group
+      | NONE => Task_Queue.new_group (worker_group ()));
     val (result, job) = future_job group e;
     val task = SYNCHRONIZED "future" (fn () =>
       change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
@@ -289,7 +293,6 @@
 fun fork_group group e = fork_future (SOME group) [] 0 e;
 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
 fun fork_pri pri e = fork_future NONE [] pri e;
-fun fork_local pri e = fork_future (Option.map #3 (thread_data ())) [] pri e;
 
 
 (* join *)
@@ -298,9 +301,10 @@
 
 fun get_result x =
   (case peek x of
-    SOME (Exn.Exn Exn.Interrupt) => Exn.Exn (Exn.EXCEPTIONS (Task_Queue.group_exns (group_of x)))
-  | SOME res => res
-  | NONE => Exn.Exn (SYS_ERROR "unfinished future"));
+    NONE => Exn.Exn (SYS_ERROR "unfinished future")
+  | SOME (Exn.Exn Exn.Interrupt) =>
+      Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
+  | SOME res => res);
 
 fun join_next deps = (*requires SYNCHRONIZED*)
   if overloaded () then (worker_wait (); join_next deps)
@@ -345,7 +349,7 @@
     val _ = scheduler_check "map_future check";
 
     val task = task_of x;
-    val group = Task_Queue.new_group ();
+    val group = Task_Queue.new_group (SOME (group_of x));
     val (result, job) = future_job group (fn () => f (join x));
 
     val extended = SYNCHRONIZED "map_future" (fn () =>