# HG changeset patch # User wenzelm # Date 1254407233 -7200 # Node ID 81897d30b97feb62c5440990e9c36dce71e24e3b # Parent dac196e23093addc0388a0f221d3bad006ae7c37 added Task_Queue.depend (again) -- light-weight version for transitive graph; Future.join_results: record explicit dependency, detect direct task-task join cycles; Future.join_results: no change of interruptibility, allows to interrupt wait; added Future.worker_task; ThyInfo.schedule_futures: uninterruptible outer join; diff -r dac196e23093 -r 81897d30b97f src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Thu Oct 01 16:09:47 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Thu Oct 01 16:27:13 2009 +0200 @@ -30,6 +30,7 @@ type task = Task_Queue.task type group = Task_Queue.group val is_worker: unit -> bool + val worker_task: unit -> Task_Queue.task option val worker_group: unit -> Task_Queue.group option type 'a future val task_of: 'a future -> task @@ -71,6 +72,7 @@ end; val is_worker = is_some o thread_data; +val worker_task = Option.map #2 o thread_data; val worker_group = Option.map #3 o thread_data; @@ -347,7 +349,8 @@ | SOME res => res); fun join_wait x = - Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some)); + Synchronized.guarded_access (result_of x) + (fn NONE => NONE | some => SOME ((), some)); fun join_next deps = (*requires SYNCHRONIZED*) if null deps then NONE @@ -357,10 +360,14 @@ | (NONE, deps') => (worker_wait work_finished; join_next deps') | (SOME work, deps') => SOME (work, deps')); -fun join_work deps = - (case SYNCHRONIZED "join" (fn () => join_next deps) of - NONE => () - | SOME (work, deps') => (execute "join" work; join_work deps')); +fun execute_work NONE = () + | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps') +and join_work deps = + execute_work (SYNCHRONIZED "join" (fn () => join_next deps)); + +fun join_depend task deps = + execute_work (SYNCHRONIZED "join" (fn () => + (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps))); in @@ -368,11 +375,11 @@ if forall is_finished xs then map get_result xs else if Multithreading.self_critical () then error "Cannot join future values within critical section" - else uninterruptible (fn _ => fn () => - (if is_worker () - then join_work (map task_of xs) - else List.app join_wait xs; - map get_result xs)) (); + else + (case worker_task () of + SOME task => join_depend task (map task_of xs) + | NONE => List.app join_wait xs; + map get_result xs); end; diff -r dac196e23093 -r 81897d30b97f src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Thu Oct 01 16:09:47 2009 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Thu Oct 01 16:27:13 2009 +0200 @@ -27,6 +27,7 @@ val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue val extend: task -> (bool -> bool) -> queue -> queue option val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue + val depend: task -> task list -> queue -> queue val dequeue_towards: Thread.thread -> task list -> queue -> (((task * group * (bool -> bool) list) option * task list) * queue) val finish: task -> queue -> bool * queue @@ -101,6 +102,11 @@ fun add_job task dep (jobs: jobs) = Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; +fun add_dep task dep (jobs: jobs) = + if Task_Graph.is_edge jobs (task, dep) then + raise Fail "Cyclic dependency of future tasks" + else add_job task dep jobs; + fun get_deps (jobs: jobs) task = Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => []; @@ -205,6 +211,9 @@ (* dequeue_towards -- adhoc dependencies *) +fun depend task deps (Queue {groups, jobs, ...}) = + make_queue groups (fold (add_dep task) deps jobs) Unknown; + fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) = let fun ready task = diff -r dac196e23093 -r 81897d30b97f src/Pure/Thy/thy_info.ML --- a/src/Pure/Thy/thy_info.ML Thu Oct 01 16:09:47 2009 +0200 +++ b/src/Pure/Thy/thy_info.ML Thu Oct 01 16:27:13 2009 +0200 @@ -364,7 +364,7 @@ local -fun schedule_futures task_graph = +fun schedule_futures task_graph = uninterruptible (fn _ => fn () => let val tasks = Graph.topological_order task_graph |> map_filter (fn name => (case Graph.get_node task_graph name of Task body => SOME (name, body) | _ => NONE)); @@ -397,7 +397,7 @@ val _ = after_load (); in [] end handle exn => (kill_thy name; [exn])); - in ignore (Exn.release_all (map Exn.Exn (rev exns))) end; + in ignore (Exn.release_all (map Exn.Exn (rev exns))) end) (); fun schedule_seq tasks = Graph.topological_order tasks