--- a/src/Pure/Concurrent/future.ML Tue Feb 01 22:24:28 2011 +0100
+++ b/src/Pure/Concurrent/future.ML Wed Feb 02 13:38:09 2011 +0100
@@ -449,12 +449,12 @@
else res);
fun join_next deps = (*requires SYNCHRONIZED*)
- if null deps then NONE
+ if Task_Queue.finished_deps deps then NONE
else
- (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
- (NONE, []) => NONE
- | (NONE, deps') =>
- (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
+ (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
+ (NONE, deps') =>
+ if Task_Queue.finished_deps deps' then NONE
+ else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
| (SOME work, deps') => SOME (work, deps'));
fun execute_work NONE = ()
@@ -462,10 +462,6 @@
and join_work deps =
execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
-fun join_depend task deps =
- execute_work (SYNCHRONIZED "join" (fn () =>
- (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
-
in
fun join_results xs =
@@ -474,10 +470,9 @@
if forall is_finished xs then ()
else if Multithreading.self_critical () then
error "Cannot join future values within critical section"
- else
- (case worker_task () of
- SOME task => join_depend task (map task_of xs)
- | NONE => List.app (ignore o Single_Assignment.await o result_of) xs);
+ else if is_some (thread_data ()) then
+ join_work (Task_Queue.init_deps (map task_of xs))
+ else List.app (ignore o Single_Assignment.await o result_of) xs;
in map get_result xs end;
end;
@@ -544,7 +539,9 @@
Unsynchronized.change_result queue
(Task_Queue.dequeue_passive (Thread.self ()) task));
in if still_passive then execute (task, group, [job]) else () end);
- val _ = worker_waiting [task] (fn () => Single_Assignment.await result);
+ val _ =
+ worker_waiting (Task_Queue.init_deps [task])
+ (fn () => Single_Assignment.await result);
in () end;
fun fulfill x res = fulfill_result x (Exn.Result res);
--- a/src/Pure/Concurrent/task_queue.ML Tue Feb 01 22:24:28 2011 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Wed Feb 02 13:38:09 2011 +0100
@@ -8,12 +8,10 @@
sig
type task
val dummy_task: task
+ val name_of_task: task -> string
val pri_of_task: task -> int
val str_of_task: task -> string
val timing_of_task: task -> Time.time * Time.time * string list
- val running: task -> (unit -> 'a) -> 'a
- val joining: task -> (unit -> 'a) -> 'a
- val waiting: task -> task list -> (unit -> 'a) -> 'a
type group
val new_group: group option -> group
val group_id: group -> int
@@ -28,16 +26,21 @@
val status: queue -> {ready: int, pending: int, running: int, passive: int}
val cancel: queue -> group -> bool
val cancel_all: queue -> group list
+ val finish: task -> queue -> bool * queue
val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
val enqueue: string -> group -> task list -> int -> (bool -> bool) ->
queue -> (task * bool) * queue
val extend: task -> (bool -> bool) -> queue -> queue option
val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
- val depend: task -> task list -> queue -> queue
- val dequeue_towards: Thread.thread -> task list -> queue ->
- (((task * group * (bool -> bool) list) option * task list) * queue)
- val finish: task -> queue -> bool * queue
+ type deps
+ val init_deps: task list -> deps
+ val finished_deps: deps -> bool
+ val dequeue_deps: Thread.thread -> deps -> queue ->
+ (((task * group * (bool -> bool) list) option * deps) * queue)
+ val running: task -> (unit -> 'a) -> 'a
+ val joining: task -> (unit -> 'a) -> 'a
+ val waiting: task -> deps -> (unit -> 'a) -> 'a
end;
structure Task_Queue: TASK_QUEUE =
@@ -46,24 +49,15 @@
val new_id = Synchronized.counter ();
-(* timing *)
+(** grouped tasks **)
-type timing = Time.time * Time.time * string list;
+(* tasks *)
+
+type timing = Time.time * Time.time * string list; (*run, wait, wait dependencies*)
fun new_timing () =
Synchronized.var "timing" ((Time.zeroTime, Time.zeroTime, []): timing);
-fun gen_timing account timing e =
- let
- val start = Time.now ();
- val result = Exn.capture e ();
- val t = Time.- (Time.now (), start);
- val _ = Synchronized.change timing (account t);
- in Exn.release result end;
-
-
-(* tasks *)
-
abstype task = Task of
{name: string,
id: int,
@@ -74,21 +68,20 @@
val dummy_task = Task {name = "", id = ~1, pri = NONE, timing = new_timing ()};
fun new_task name pri = Task {name = name, id = new_id (), pri = pri, timing = new_timing ()};
+fun name_of_task (Task {name, ...}) = name;
fun pri_of_task (Task {pri, ...}) = the_default 0 pri;
fun str_of_task (Task {name, id, ...}) =
if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")";
fun timing_of_task (Task {timing, ...}) = Synchronized.value timing;
-fun running (Task {timing, ...}) =
- gen_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) timing;
-
-fun joining (Task {timing, ...}) =
- gen_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) timing;
-
-fun waiting (Task {timing, ...}) deps =
- timing |> gen_timing (fn t => fn (a, b, ds) =>
- (Time.- (a, t), Time.+ (b, t), fold (fn Task {name, ...} => insert (op =) name) deps ds));
+fun update_timing update (Task {timing, ...}) e =
+ let
+ val start = Time.now ();
+ val result = Exn.capture e ();
+ val t = Time.- (Time.now (), start);
+ val _ = Synchronized.change timing (update t);
+ in Exn.release result end;
fun task_ord (Task {id = id1, pri = pri1, ...}, Task {id = id2, pri = pri2, ...}) =
prod_ord (rev_order o option_ord int_ord) int_ord ((pri1, id1), (pri2, id2));
@@ -141,6 +134,8 @@
end;
+(** queue of jobs and groups **)
+
(* jobs *)
datatype job =
@@ -157,16 +152,11 @@
fun add_job task dep (jobs: jobs) =
Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
-fun add_dep task dep (jobs: jobs) =
- if Task_Graph.is_edge jobs (task, dep) then
- raise Fail "Cyclic dependency of future tasks"
- else add_job task dep jobs;
-
fun get_deps (jobs: jobs) task =
Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
-(* queue of grouped jobs *)
+(* queue *)
datatype queue = Queue of
{groups: task list Inttab.table, (*groups with presently active members*)
@@ -207,6 +197,9 @@
in {ready = x, pending = y, running = z, passive = w} end;
+
+(** task queue operations **)
+
(* cancel -- peers and sub-groups *)
fun cancel (Queue {groups, jobs}) group =
@@ -230,6 +223,18 @@
in running_groups end;
+(* finish *)
+
+fun finish task (Queue {groups, jobs}) =
+ let
+ val group = get_group jobs task;
+ val groups' = groups
+ |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
+ val jobs' = Task_Graph.del_node task jobs;
+ val maximal = null (Task_Graph.imm_succs jobs task);
+ in (maximal, make_queue groups' jobs') end;
+
+
(* enqueue *)
fun enqueue_passive group abort (Queue {groups, jobs}) =
@@ -275,37 +280,44 @@
| NONE => (NONE, queue));
-(* dequeue_towards -- adhoc dependencies *)
+(* dequeue wrt. dynamic dependencies *)
+
+abstype deps = Deps of task list
+with
-fun depend task deps (Queue {groups, jobs}) =
- make_queue groups (fold (add_dep task) deps jobs);
+fun init_deps tasks = Deps tasks;
+fun finished_deps (Deps tasks) = null tasks;
-fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
+fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks;
+
+fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) =
let
fun ready task = ready_job task (Task_Graph.get_entry jobs task);
val tasks = filter (can (Task_Graph.get_node jobs)) deps;
fun result (res as (task, _, _)) =
let val jobs' = set_job task (Running thread) jobs
- in ((SOME res, tasks), make_queue groups jobs') end;
+ in ((SOME res, Deps tasks), make_queue groups jobs') end;
in
(case get_first ready tasks of
SOME res => result res
| NONE =>
(case get_first (get_first ready o get_deps jobs) tasks of
SOME res => result res
- | NONE => ((NONE, tasks), queue)))
+ | NONE => ((NONE, Deps tasks), queue)))
end;
+end;
+
-(* finish *)
+(* timing *)
+
+fun running task =
+ update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
-fun finish task (Queue {groups, jobs}) =
- let
- val group = get_group jobs task;
- val groups' = groups
- |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
- val jobs' = Task_Graph.del_node task jobs;
- val maximal = null (Task_Graph.imm_succs jobs task);
- in (maximal, make_queue groups' jobs') end;
+fun joining task =
+ update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
+
+fun waiting task deps =
+ update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task;
end;