tuned map: reduced overhead due to bulk jobs;
tuned join_results: reduced overhead for finished futures;
tuned;
--- a/src/Pure/Concurrent/future.ML Tue Jan 06 13:43:17 2009 +0100
+++ b/src/Pure/Concurrent/future.ML Tue Jan 06 13:46:48 2009 +0100
@@ -137,15 +137,16 @@
change workers (AList.update Thread.equal (Thread.self (), active));
-(* execute *)
+(* execute jobs *)
fun do_cancel group = (*requires SYNCHRONIZED*)
change canceled (insert Task_Queue.eq_group group);
-fun execute name (task, group, run) =
+fun execute name (task, group, jobs) =
let
val _ = trace_active ();
- val ok = setmp_thread_data (name, task) run ();
+ val ok = setmp_thread_data (name, task) (fn () =>
+ fold (fn job => fn ok => job ok) jobs (Task_Queue.is_valid group)) ();
val _ = SYNCHRONIZED "execute" (fn () =>
(change queue (Task_Queue.finish task);
if ok then ()
@@ -225,16 +226,15 @@
else ());
-(* future values: fork independent computation *)
+
+(** futures **)
-fun future opt_group deps pri (e: unit -> 'a) =
+(* future job: fill result *)
+
+fun future_job group (e: unit -> 'a) =
let
- val _ = scheduler_check "future check";
-
- val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
-
val result = ref (NONE: 'a Exn.result option);
- val run = Multithreading.with_attributes (Thread.getAttributes ())
+ val job = Multithreading.with_attributes (Thread.getAttributes ())
(fn _ => fn ok =>
let
val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt;
@@ -245,63 +245,88 @@
| Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
| _ => false);
in res_ok end);
+ in (result, job) end;
+
+(* fork *)
+
+fun fork_future opt_group deps pri e =
+ let
+ val _ = scheduler_check "future check";
+
+ val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
+ val (result, job) = future_job group e;
val task = SYNCHRONIZED "future" (fn () =>
- change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ());
+ change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
in Future {task = task, group = group, result = result} end;
-fun fork e = future NONE [] 0 e;
-fun fork_group group e = future (SOME group) [] 0 e;
-fun fork_deps deps e = future NONE (map task_of deps) 0 e;
-fun fork_pri pri e = future NONE [] pri e;
+fun fork e = fork_future NONE [] 0 e;
+fun fork_group group e = fork_future (SOME group) [] 0 e;
+fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
+fun fork_pri pri e = fork_future NONE [] pri e;
-(* join: retrieve results *)
+(* join *)
+
+fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
-fun join_results [] = []
- | join_results xs = uninterruptible (fn _ => fn () =>
- let
- val _ = scheduler_check "join check";
- val _ = Multithreading.self_critical () andalso
- exists (not o is_finished) xs andalso
- error "Cannot join future values within critical section";
+fun join_results xs =
+ if forall is_finished xs then map get_result xs
+ else uninterruptible (fn _ => fn () =>
+ let
+ val _ = scheduler_check "join check";
+ val _ = Multithreading.self_critical () andalso
+ error "Cannot join future values within critical section";
- fun join_loop _ [] = ()
- | join_loop name tasks =
- (case SYNCHRONIZED name (fn () =>
- change_result queue (Task_Queue.dequeue_towards tasks)) of
- NONE => ()
- | SOME (work, tasks') => (execute name work; join_loop name tasks'));
- val _ =
- (case thread_data () of
- NONE =>
- (*alien thread -- refrain from contending for resources*)
- while exists (not o is_finished) xs
- do SYNCHRONIZED "join_thread" (fn () => wait ())
- | SOME (name, task) =>
- (*proper task -- actively work towards results*)
- let
- val unfinished = xs |> map_filter
- (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
- val _ = SYNCHRONIZED "join" (fn () =>
- (change queue (Task_Queue.depend unfinished task); notify_all ()));
- val _ = join_loop ("join_loop: " ^ name) unfinished;
- val _ =
- while exists (not o is_finished) xs
- do SYNCHRONIZED "join_task" (fn () => worker_wait ());
- in () end);
+ fun join_loop _ [] = ()
+ | join_loop name deps =
+ (case SYNCHRONIZED name (fn () =>
+ change_result queue (Task_Queue.dequeue_towards deps)) of
+ NONE => ()
+ | SOME (work, deps') => (execute name work; join_loop name deps'));
+ val _ =
+ (case thread_data () of
+ NONE =>
+ (*alien thread -- refrain from contending for resources*)
+ while not (forall is_finished xs)
+ do SYNCHRONIZED "join_thread" (fn () => wait ())
+ | SOME (name, task) =>
+ (*proper task -- actively work towards results*)
+ let
+ val pending = filter_out is_finished xs;
+ val deps = map task_of pending;
+ val _ = SYNCHRONIZED "join" (fn () =>
+ (change queue (Task_Queue.depend deps task); notify_all ()));
+ val _ = join_loop ("join_loop: " ^ name) deps;
+ val _ =
+ while not (forall is_finished pending)
+ do SYNCHRONIZED "join_task" (fn () => worker_wait ());
+ in () end);
- in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
+ in map get_result xs end) ();
fun join_result x = singleton join_results x;
fun join x = Exn.release (join_result x);
-fun map f x =
- let val task = task_of x
- in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end;
+
+(* map *)
+
+fun map_future f (x as Future {task, group, ...}) =
+ let
+ val _ = scheduler_check "map_future check";
+
+ val (result', job) = future_job group (fn () => f (join x));
+ val extended = SYNCHRONIZED "map_future" (fn () =>
+ (case Task_Queue.extend task job (! queue) of
+ SOME queue' => (queue := queue'; true)
+ | NONE => false));
+ in
+ if extended then Future {task = task, group = group, result = result'}
+ else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
+ end;
-(* misc operations *)
+(* cancel *)
(*interrupt: permissive signal, may get ignored*)
fun interrupt_task id = SYNCHRONIZED "interrupt"
@@ -313,7 +338,9 @@
SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ())));
-(*global join and shutdown*)
+
+(** global join and shutdown **)
+
fun shutdown () =
if Multithreading.available then
(scheduler_check "shutdown check";
@@ -327,6 +354,10 @@
OS.Process.sleep (Time.fromMilliseconds 300))))
else ();
+
+(*final declarations of this structure!*)
+val map = map_future;
+
end;
type 'a future = 'a Future.future;