--- a/src/Pure/Concurrent/future.ML Thu Oct 01 16:09:47 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Thu Oct 01 16:27:13 2009 +0200
@@ -30,6 +30,7 @@
type task = Task_Queue.task
type group = Task_Queue.group
val is_worker: unit -> bool
+ val worker_task: unit -> Task_Queue.task option
val worker_group: unit -> Task_Queue.group option
type 'a future
val task_of: 'a future -> task
@@ -71,6 +72,7 @@
end;
val is_worker = is_some o thread_data;
+val worker_task = Option.map #2 o thread_data;
val worker_group = Option.map #3 o thread_data;
@@ -347,7 +349,8 @@
| SOME res => res);
fun join_wait x =
- Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
+ Synchronized.guarded_access (result_of x)
+ (fn NONE => NONE | some => SOME ((), some));
fun join_next deps = (*requires SYNCHRONIZED*)
if null deps then NONE
@@ -357,10 +360,14 @@
| (NONE, deps') => (worker_wait work_finished; join_next deps')
| (SOME work, deps') => SOME (work, deps'));
-fun join_work deps =
- (case SYNCHRONIZED "join" (fn () => join_next deps) of
- NONE => ()
- | SOME (work, deps') => (execute "join" work; join_work deps'));
+fun execute_work NONE = ()
+ | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps')
+and join_work deps =
+ execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
+
+fun join_depend task deps =
+ execute_work (SYNCHRONIZED "join" (fn () =>
+ (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
in
@@ -368,11 +375,11 @@
if forall is_finished xs then map get_result xs
else if Multithreading.self_critical () then
error "Cannot join future values within critical section"
- else uninterruptible (fn _ => fn () =>
- (if is_worker ()
- then join_work (map task_of xs)
- else List.app join_wait xs;
- map get_result xs)) ();
+ else
+ (case worker_task () of
+ SOME task => join_depend task (map task_of xs)
+ | NONE => List.app join_wait xs;
+ map get_result xs);
end;