tuned map: reduced overhead due to bulk jobs;
Tue, 06 Jan 2009 13:46:48 +0100
--- a/src/Pure/Concurrent/future.ML	Tue Jan 06 13:43:17 2009 +0100
+++ b/src/Pure/Concurrent/future.ML	Tue Jan 06 13:46:48 2009 +0100
@@ -137,15 +137,16 @@
   change workers (AList.update Thread.equal (Thread.self (), active));
-(* execute *)
+(* execute jobs *)
 fun do_cancel group = (*requires SYNCHRONIZED*)
   change canceled (insert Task_Queue.eq_group group);
-fun execute name (task, group, run) =
+fun execute name (task, group, jobs) =
     val _ = trace_active ();
-    val ok = setmp_thread_data (name, task) run ();
+    val ok = setmp_thread_data (name, task) (fn () =>
+      fold (fn job => fn ok => job ok) jobs (Task_Queue.is_valid group)) ();
     val _ = SYNCHRONIZED "execute" (fn () =>
      (change queue (Task_Queue.finish task);
       if ok then ()
@@ -225,16 +226,15 @@
   else ());
-(* future values: fork independent computation *)
+(** futures **)
-fun future opt_group deps pri (e: unit -> 'a) =
+(* future job: fill result *)
+fun future_job group (e: unit -> 'a) =
-    val _ = scheduler_check "future check";
-    val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
     val result = ref (NONE: 'a Exn.result option);
-    val run = Multithreading.with_attributes (Thread.getAttributes ())
+    val job = Multithreading.with_attributes (Thread.getAttributes ())
       (fn _ => fn ok =>
           val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt;
@@ -245,63 +245,88 @@
             | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
             | _ => false);
         in res_ok end);
+  in (result, job) end;
+(* fork *)
+fun fork_future opt_group deps pri e =
+  let
+    val _ = scheduler_check "future check";
+    val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
+    val (result, job) = future_job group e;
     val task = SYNCHRONIZED "future" (fn () =>
-      change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ());
+      change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
   in Future {task = task, group = group, result = result} end;
-fun fork e = future NONE [] 0 e;
-fun fork_group group e = future (SOME group) [] 0 e;
-fun fork_deps deps e = future NONE (map task_of deps) 0 e;
-fun fork_pri pri e = future NONE [] pri e;
+fun fork e = fork_future NONE [] 0 e;
+fun fork_group group e = fork_future (SOME group) [] 0 e;
+fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
+fun fork_pri pri e = fork_future NONE [] pri e;
-(* join: retrieve results *)
+(* join *)
+fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
-fun join_results [] = []
-  | join_results xs = uninterruptible (fn _ => fn () =>
-      let
-        val _ = scheduler_check "join check";
-        val _ = Multithreading.self_critical () andalso
-          exists (not o is_finished) xs andalso
-          error "Cannot join future values within critical section";
+fun join_results xs =
+  if forall is_finished xs then map get_result xs
+  else uninterruptible (fn _ => fn () =>
+    let
+      val _ = scheduler_check "join check";
+      val _ = Multithreading.self_critical () andalso
+        error "Cannot join future values within critical section";
-        fun join_loop _ [] = ()
-          | join_loop name tasks =
-              (case SYNCHRONIZED name (fn () =>
-                  change_result queue (Task_Queue.dequeue_towards tasks)) of
-                NONE => ()
-              | SOME (work, tasks') => (execute name work; join_loop name tasks'));
-        val _ =
-          (case thread_data () of
-            NONE =>
-              (*alien thread -- refrain from contending for resources*)
-              while exists (not o is_finished) xs
-              do SYNCHRONIZED "join_thread" (fn () => wait ())
-          | SOME (name, task) =>
-              (*proper task -- actively work towards results*)
-              let
-                val unfinished = xs |> map_filter
-                  (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
-                val _ = SYNCHRONIZED "join" (fn () =>
-                  (change queue (Task_Queue.depend unfinished task); notify_all ()));
-                val _ = join_loop ("join_loop: " ^ name) unfinished;
-                val _ =
-                  while exists (not o is_finished) xs
-                  do SYNCHRONIZED "join_task" (fn () => worker_wait ());
-              in () end);
+      fun join_loop _ [] = ()
+        | join_loop name deps =
+            (case SYNCHRONIZED name (fn () =>
+                change_result queue (Task_Queue.dequeue_towards deps)) of
+              NONE => ()
+            | SOME (work, deps') => (execute name work; join_loop name deps'));
+      val _ =
+        (case thread_data () of
+          NONE =>
+            (*alien thread -- refrain from contending for resources*)
+            while not (forall is_finished xs)
+            do SYNCHRONIZED "join_thread" (fn () => wait ())
+        | SOME (name, task) =>
+            (*proper task -- actively work towards results*)
+            let
+              val pending = filter_out is_finished xs;
+              val deps = map task_of pending;
+              val _ = SYNCHRONIZED "join" (fn () =>
+                (change queue (Task_Queue.depend deps task); notify_all ()));
+              val _ = join_loop ("join_loop: " ^ name) deps;
+              val _ =
+                while not (forall is_finished pending)
+                do SYNCHRONIZED "join_task" (fn () => worker_wait ());
+            in () end);
-      in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
+    in map get_result xs end) ();
 fun join_result x = singleton join_results x;
 fun join x = Exn.release (join_result x);
-fun map f x =
-  let val task = task_of x
-  in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end;
+(* map *)
+fun map_future f (x as Future {task, group, ...}) =
+  let
+    val _ = scheduler_check "map_future check";
+    val (result', job) = future_job group (fn () => f (join x));
+    val extended = SYNCHRONIZED "map_future" (fn () =>
+      (case Task_Queue.extend task job (! queue) of
+        SOME queue' => (queue := queue'; true)
+      | NONE => false));
+  in
+    if extended then Future {task = task, group = group, result = result'}
+    else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
+  end;
-(* misc operations *)
+(* cancel *)
 (*interrupt: permissive signal, may get ignored*)
 fun interrupt_task id = SYNCHRONIZED "interrupt"
@@ -313,7 +338,9 @@
   SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ())));
-(*global join and shutdown*)
+(** global join and shutdown **)
 fun shutdown () =
   if Multithreading.available then
    (scheduler_check "shutdown check";
@@ -327,6 +354,10 @@
       OS.Process.sleep (Time.fromMilliseconds 300))))
   else ();
+(*final declarations of this structure!*)
+val map = map_future;
 type 'a future = 'a Future.future;