added Task_Queue.depend (again) -- light-weight version for transitive graph;
Future.join_results: record explicit dependency, detect direct task-task join cycles;
Future.join_results: no change of interruptibility, allows to interrupt wait;
added Future.worker_task;
ThyInfo.schedule_futures: uninterruptible outer join;
--- a/src/Pure/Concurrent/future.ML Thu Oct 01 16:09:47 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Thu Oct 01 16:27:13 2009 +0200
@@ -30,6 +30,7 @@
type task = Task_Queue.task
type group = Task_Queue.group
val is_worker: unit -> bool
+ val worker_task: unit -> Task_Queue.task option
val worker_group: unit -> Task_Queue.group option
type 'a future
val task_of: 'a future -> task
@@ -71,6 +72,7 @@
end;
val is_worker = is_some o thread_data;
+val worker_task = Option.map #2 o thread_data;
val worker_group = Option.map #3 o thread_data;
@@ -347,7 +349,8 @@
| SOME res => res);
fun join_wait x =
- Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
+ Synchronized.guarded_access (result_of x)
+ (fn NONE => NONE | some => SOME ((), some));
fun join_next deps = (*requires SYNCHRONIZED*)
if null deps then NONE
@@ -357,10 +360,14 @@
| (NONE, deps') => (worker_wait work_finished; join_next deps')
| (SOME work, deps') => SOME (work, deps'));
-fun join_work deps =
- (case SYNCHRONIZED "join" (fn () => join_next deps) of
- NONE => ()
- | SOME (work, deps') => (execute "join" work; join_work deps'));
+fun execute_work NONE = ()
+ | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps')
+and join_work deps =
+ execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
+
+fun join_depend task deps =
+ execute_work (SYNCHRONIZED "join" (fn () =>
+ (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
in
@@ -368,11 +375,11 @@
if forall is_finished xs then map get_result xs
else if Multithreading.self_critical () then
error "Cannot join future values within critical section"
- else uninterruptible (fn _ => fn () =>
- (if is_worker ()
- then join_work (map task_of xs)
- else List.app join_wait xs;
- map get_result xs)) ();
+ else
+ (case worker_task () of
+ SOME task => join_depend task (map task_of xs)
+ | NONE => List.app join_wait xs;
+ map get_result xs);
end;
--- a/src/Pure/Concurrent/task_queue.ML Thu Oct 01 16:09:47 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Thu Oct 01 16:27:13 2009 +0200
@@ -27,6 +27,7 @@
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
val extend: task -> (bool -> bool) -> queue -> queue option
val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
+ val depend: task -> task list -> queue -> queue
val dequeue_towards: Thread.thread -> task list -> queue ->
(((task * group * (bool -> bool) list) option * task list) * queue)
val finish: task -> queue -> bool * queue
@@ -101,6 +102,11 @@
fun add_job task dep (jobs: jobs) =
Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
+fun add_dep task dep (jobs: jobs) =
+ if Task_Graph.is_edge jobs (task, dep) then
+ raise Fail "Cyclic dependency of future tasks"
+ else add_job task dep jobs;
+
fun get_deps (jobs: jobs) task =
Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
@@ -205,6 +211,9 @@
(* dequeue_towards -- adhoc dependencies *)
+fun depend task deps (Queue {groups, jobs, ...}) =
+ make_queue groups (fold (add_dep task) deps jobs) Unknown;
+
fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
let
fun ready task =
--- a/src/Pure/Thy/thy_info.ML Thu Oct 01 16:09:47 2009 +0200
+++ b/src/Pure/Thy/thy_info.ML Thu Oct 01 16:27:13 2009 +0200
@@ -364,7 +364,7 @@
local
-fun schedule_futures task_graph =
+fun schedule_futures task_graph = uninterruptible (fn _ => fn () =>
let
val tasks = Graph.topological_order task_graph |> map_filter (fn name =>
(case Graph.get_node task_graph name of Task body => SOME (name, body) | _ => NONE));
@@ -397,7 +397,7 @@
val _ = after_load ();
in [] end handle exn => (kill_thy name; [exn]));
- in ignore (Exn.release_all (map Exn.Exn (rev exns))) end;
+ in ignore (Exn.release_all (map Exn.Exn (rev exns))) end) ();
fun schedule_seq tasks =
Graph.topological_order tasks