--- a/src/Pure/Concurrent/future.ML Sat Jun 13 19:19:14 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Sat Jun 13 19:40:37 2009 +0200
@@ -286,7 +286,7 @@
fun join_loop name pending =
(case SYNCHRONIZED name (fn () => join_next pending) of
NONE => ()
- | SOME work => (execute name work; join_loop name pending));
+ | SOME work => (execute name work; join_loop name (filter_out is_finished pending)));
in
@@ -298,13 +298,6 @@
val _ = Multithreading.self_critical () andalso
error "Cannot join future values within critical section";
- fun join_deps _ [] = ()
- | join_deps name deps =
- (case SYNCHRONIZED name (fn () =>
- change_result queue (Task_Queue.dequeue_towards deps)) of
- NONE => ()
- | SOME (work, deps') => (execute name work; join_deps name deps'));
-
val _ =
(case thread_data () of
NONE =>
@@ -312,14 +305,13 @@
while not (forall is_finished xs)
do SYNCHRONIZED "join_thread" (fn () => wait ())
| SOME (name, task) =>
- (*proper task -- actively work towards results*)
+ (*proper task -- continue work*)
let
val pending = filter_out is_finished xs;
val deps = map task_of pending;
val _ = SYNCHRONIZED "join" (fn () =>
(change queue (Task_Queue.depend deps task); notify_all ()));
- val _ = join_deps ("join_deps: " ^ name) deps;
- val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending);
+ val _ = join_loop ("join_loop: " ^ name) pending;
in () end);
in map get_result xs end) ();
--- a/src/Pure/Concurrent/task_queue.ML Sat Jun 13 19:19:14 2009 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Sat Jun 13 19:40:37 2009 +0200
@@ -23,8 +23,6 @@
val extend: task -> (bool -> bool) -> queue -> queue option
val depend: task list -> task -> queue -> queue
val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
- val dequeue_towards: task list -> queue ->
- (((task * group * (bool -> bool) list) * task list) option * queue)
val interrupt: queue -> task -> unit
val interrupt_external: queue -> string -> unit
val cancel: queue -> group -> bool
@@ -82,69 +80,59 @@
(* queue of grouped jobs *)
+datatype result = Unknown | Result of task | No_Result;
+
datatype queue = Queue of
{groups: task list Inttab.table, (*groups with presently active members*)
- jobs: jobs}; (*job dependency graph*)
+ jobs: jobs, (*job dependency graph*)
+ cache: result}; (*last dequeue result*)
-fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
+fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
-val empty = make_queue Inttab.empty Task_Graph.empty;
+val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
(* enqueue *)
-fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
+fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, ...}) =
let
val task = new_task pri;
val groups' = Inttab.cons_list (gid, task) groups;
val jobs' = jobs
|> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
- in (task, make_queue groups' jobs') end;
+ in (task, make_queue groups' jobs' Unknown) end;
-fun extend task job (Queue {groups, jobs}) =
+fun extend task job (Queue {groups, jobs, cache}) =
(case try (get_job jobs) task of
- SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
+ SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
| _ => NONE);
-fun depend deps task (Queue {groups, jobs}) =
- make_queue groups (fold (add_job_acyclic task) deps jobs);
+fun depend deps task (Queue {groups, jobs, ...}) =
+ make_queue groups (fold (add_job_acyclic task) deps jobs) Unknown;
(* dequeue *)
-local
-
-fun dequeue_result NONE queue = (NONE, queue)
- | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
- (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs));
-
-in
-
-fun dequeue (queue as Queue {jobs, ...}) =
+fun dequeue (queue as Queue {groups, jobs, cache}) =
let
fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
| ready _ = NONE;
- in dequeue_result (Task_Graph.get_first ready jobs) queue end;
-
-fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
- let
- val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
-
- fun ready task =
- (case Task_Graph.get_node jobs task of
- (group, Job list) =>
- if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
- else NONE
- | _ => NONE);
+ fun deq boundary =
+ (case Task_Graph.get_first boundary ready jobs of
+ NONE => (NONE, make_queue groups jobs No_Result)
+ | SOME (result as (task, _, _)) =>
+ let
+ val jobs' = set_job task (Running (Thread.self ())) jobs;
+ val cache' = Result task;
+ in (SOME result, make_queue groups jobs' cache') end);
in
- (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
- (NONE, queue') => (NONE, queue')
- | (SOME work, queue') => (SOME (work, tasks'), queue'))
+ (case cache of
+ Unknown => deq NONE
+ | Result last => deq (SOME last)
+ | No_Result => (NONE, queue))
end;
-end;
-
(* sporadic interrupts *)
@@ -154,7 +142,7 @@
fun interrupt_external (queue as Queue {jobs, ...}) str =
(case Int.fromString str of
SOME i =>
- (case Task_Graph.get_first
+ (case Task_Graph.get_first NONE
(fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
of SOME task => interrupt queue task | NONE => ())
| NONE => ());
@@ -180,11 +168,11 @@
val _ = List.app SimpleThread.interrupt running;
in groups end;
-fun finish task (Queue {groups, jobs}) =
+fun finish task (Queue {groups, jobs, ...}) =
let
val Group (gid, _) = get_group jobs task;
val groups' = Inttab.remove_list (op =) (gid, task) groups;
val jobs' = Task_Graph.del_node task jobs;
- in make_queue groups' jobs' end;
+ in make_queue groups' jobs' Unknown end;
end;