# HG changeset patch # User wenzelm # Date 1248005665 -7200 # Node ID 6a46898aa805df59e7dc5c1871f6b23de7607008 # Parent db50e76b0046374c050f005522514e19f0e134d1 recovered a version of dequeue_towards (cf. bb7b5a5942c7); join_results: work only towards explicit dependencies -- otherwise could produce dynamic cycle (not recorded in queue); diff -r db50e76b0046 -r 6a46898aa805 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Sat Jul 18 22:53:02 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Sun Jul 19 14:14:25 2009 +0200 @@ -285,21 +285,10 @@ fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); -fun join_wait x = - if SYNCHRONIZED "join_wait" (fn () => is_finished x orelse (wait (); false)) - then () else join_wait x; - -fun join_next x = (*requires SYNCHRONIZED*) - if is_finished x then NONE - else - (case change_result queue Task_Queue.dequeue of - NONE => (worker_wait (); join_next x) - | some => some); - -fun join_loop x = - (case SYNCHRONIZED "join" (fn () => join_next x) of +fun join_deps deps = + (case SYNCHRONIZED "join" (fn () => change_result queue (Task_Queue.dequeue_towards deps)) of NONE => () - | SOME work => (execute "join" work; join_loop x)); + | SOME (work, deps') => (execute "join" work; join_deps deps')); in @@ -310,10 +299,16 @@ val _ = scheduler_check "join check"; val _ = Multithreading.self_critical () andalso error "Cannot join future values within critical section"; - val _ = - if is_some (thread_data ()) - then List.app join_loop xs (*proper task -- continue work*) - else List.app join_wait xs; (*alien thread -- refrain from contending for resources*) + + val is_worker = is_some (thread_data ()); + fun join_wait x = + if SYNCHRONIZED "join_wait" (fn () => + is_finished x orelse (if is_worker then worker_wait () else wait (); false)) + then () else join_wait x; + + val _ = if is_worker then join_deps (map task_of xs) else (); + val _ = List.app join_wait xs; + in map get_result xs end) (); end; diff -r db50e76b0046 -r 6a46898aa805 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Sat Jul 18 22:53:02 2009 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Sun Jul 19 14:14:25 2009 +0200 @@ -24,6 +24,8 @@ val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue val extend: task -> (bool -> bool) -> queue -> queue option val dequeue: queue -> (task * group * (bool -> bool) list) option * queue + val dequeue_towards: task list -> queue -> + (((task * group * (bool -> bool) list) * task list) option * queue) val interrupt: queue -> task -> unit val interrupt_external: queue -> string -> unit val cancel: queue -> group -> bool @@ -150,6 +152,28 @@ end; +(* dequeue_towards -- adhoc dependencies *) + +fun dequeue_towards deps (queue as Queue {groups, jobs, ...}) = + let + fun ready task = + (case Task_Graph.get_node jobs task of + (group, Job list) => + if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list) + else NONE + | _ => NONE); + val tasks = filter (can (Task_Graph.get_node jobs)) deps; + in + (case get_first ready (Task_Graph.all_preds jobs tasks) of + NONE => (NONE, queue) + | SOME (result as (task, _, _)) => + let + val jobs' = set_job task (Running (Thread.self ())) jobs; + val cache' = Unknown; + in (SOME (result, tasks), make_queue groups jobs' cache') end) + end; + + (* sporadic interrupts *) fun interrupt (Queue {jobs, ...}) task =