recovered a version of dequeue_towards (cf. bb7b5a5942c7);
join_results: work only towards explicit dependencies -- otherwise could produce dynamic cycle (not recorded in queue);
--- a/src/Pure/Concurrent/future.ML Sat Jul 18 22:53:02 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Sun Jul 19 14:14:25 2009 +0200
@@ -285,21 +285,10 @@
fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
-fun join_wait x =
- if SYNCHRONIZED "join_wait" (fn () => is_finished x orelse (wait (); false))
- then () else join_wait x;
-
-fun join_next x = (*requires SYNCHRONIZED*)
- if is_finished x then NONE
- else
- (case change_result queue Task_Queue.dequeue of
- NONE => (worker_wait (); join_next x)
- | some => some);
-
-fun join_loop x =
- (case SYNCHRONIZED "join" (fn () => join_next x) of
+fun join_deps deps =
+ (case SYNCHRONIZED "join" (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of
NONE => ()
- | SOME work => (execute "join" work; join_loop x));
+ | SOME (work, deps') => (execute "join" work; join_deps deps'));
in
@@ -310,10 +299,16 @@
val _ = scheduler_check "join check";
val _ = Multithreading.self_critical () andalso
error "Cannot join future values within critical section";
- val _ =
- if is_some (thread_data ())
- then List.app join_loop xs (*proper task -- continue work*)
- else List.app join_wait xs; (*alien thread -- refrain from contending for resources*)
+
+ val is_worker = is_some (thread_data ());
+ fun join_wait x =
+ if SYNCHRONIZED "join_wait" (fn () =>
+ is_finished x orelse (if is_worker then worker_wait () else wait (); false))
+ then () else join_wait x;
+
+ val _ = if is_worker then join_deps (map task_of xs) else ();
+ val _ = List.app join_wait xs;
+
in map get_result xs end) ();
end;
--- a/src/Pure/Concurrent/task_queue.ML Sat Jul 18 22:53:02 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Sun Jul 19 14:14:25 2009 +0200
@@ -24,6 +24,8 @@
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
val extend: task -> (bool -> bool) -> queue -> queue option
val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
+ val dequeue_towards: task list -> queue ->
+ (((task * group * (bool -> bool) list) * task list) option * queue)
val interrupt: queue -> task -> unit
val interrupt_external: queue -> string -> unit
val cancel: queue -> group -> bool
@@ -150,6 +152,28 @@
end;
+(* dequeue_towards -- adhoc dependencies *)
+
+fun dequeue_towards deps (queue as Queue {groups, jobs, ...}) =
+ let
+ fun ready task =
+ (case Task_Graph.get_node jobs task of
+ (group, Job list) =>
+ if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
+ else NONE
+ | _ => NONE);
+ val tasks = filter (can (Task_Graph.get_node jobs)) deps;
+ in
+ (case get_first ready (Task_Graph.all_preds jobs tasks) of
+ NONE => (NONE, queue)
+ | SOME (result as (task, _, _)) =>
+ let
+ val jobs' = set_job task (Running (Thread.self ())) jobs;
+ val cache' = Unknown;
+ in (SOME (result, tasks), make_queue groups jobs' cache') end)
+ end;
+
+
(* sporadic interrupts *)
fun interrupt (Queue {jobs, ...}) task =