more robust propagation of errors through bulk jobs;
authorwenzelm
Wed, 07 Jan 2009 17:26:03 +0100
changeset 29384 a3c7e9ae9b71
parent 29383 223f18cfbb32
child 29385 c96b91bab2e6
more robust propagation of errors through bulk jobs;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Wed Jan 07 16:22:10 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Wed Jan 07 17:26:03 2009 +0100
@@ -145,8 +145,9 @@
 fun execute name (task, group, jobs) =
   let
     val _ = trace_active ();
+    val valid = Task_Queue.is_valid group;
     val ok = setmp_thread_data (name, task) (fn () =>
-      fold (fn job => fn ok => job ok) jobs (Task_Queue.is_valid group)) ();
+      fold (fn job => fn ok => job valid andalso ok) jobs true) ();
     val _ = SYNCHRONIZED "execute" (fn () =>
      (change queue (Task_Queue.finish task);
       if ok then ()
@@ -311,18 +312,21 @@
 
 (* map *)
 
-fun map_future f (x as Future {task, group, ...}) =
+fun map_future f x =
   let
     val _ = scheduler_check "map_future check";
 
-    val (result', job) = future_job group (fn () => f (join x));
+    val task = task_of x;
+    val group = Task_Queue.new_group ();
+    val (result, job) = future_job group (fn () => f (join x));
+
     val extended = SYNCHRONIZED "map_future" (fn () =>
       (case Task_Queue.extend task job (! queue) of
         SOME queue' => (queue := queue'; true)
       | NONE => false));
   in
-    if extended then Future {task = task, group = group, result = result'}
-    else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
+    if extended then Future {task = task, group = group, result = result}
+    else fork_future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   end;