--- a/src/Pure/Concurrent/future.ML Wed Feb 02 18:22:13 2011 +0100
+++ b/src/Pure/Concurrent/future.ML Wed Feb 02 20:32:50 2011 +0100
@@ -440,12 +440,12 @@
else res);
fun join_next deps = (*requires SYNCHRONIZED*)
- if Task_Queue.finished_deps deps then NONE
+ if null deps then NONE
else
(case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
- (NONE, deps') =>
- if Task_Queue.finished_deps deps' then NONE
- else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
+ (NONE, []) => NONE
+ | (NONE, deps') =>
+ (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
| (SOME work, deps') => SOME (work, deps'));
fun execute_work NONE = ()
@@ -461,8 +461,7 @@
if forall is_finished xs then ()
else if Multithreading.self_critical () then
error "Cannot join future values within critical section"
- else if is_some (worker_task ()) then
- join_work (Task_Queue.init_deps (map task_of xs))
+ else if is_some (worker_task ()) then join_work (map task_of xs)
else List.app (ignore o Single_Assignment.await o result_of) xs;
in map get_result xs end;
@@ -533,8 +532,8 @@
(Task_Queue.dequeue_passive (Thread.self ()) task));
in if still_passive then execute (task, [job]) else () end);
val _ =
- worker_waiting (Task_Queue.init_deps [task])
- (fn () => Single_Assignment.await result);
+ if is_some (Single_Assignment.peek result) then ()
+ else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
in () end;
fun fulfill x res = fulfill_result x (Exn.Result res);
--- a/src/Pure/Concurrent/task_queue.ML Wed Feb 02 18:22:13 2011 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Wed Feb 02 20:32:50 2011 +0100
@@ -21,6 +21,9 @@
val pri_of_task: task -> int
val str_of_task: task -> string
val timing_of_task: task -> Time.time * Time.time * string list
+ val running: task -> (unit -> 'a) -> 'a
+ val joining: task -> (unit -> 'a) -> 'a
+ val waiting: task -> task list -> (unit -> 'a) -> 'a
type queue
val empty: queue
val all_passive: queue -> bool
@@ -34,14 +37,8 @@
val extend: task -> (bool -> bool) -> queue -> queue option
val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
- type deps
- val init_deps: task list -> deps
- val finished_deps: deps -> bool
- val dequeue_deps: Thread.thread -> deps -> queue ->
- (((task * (bool -> bool) list) option * deps) * queue)
- val running: task -> (unit -> 'a) -> 'a
- val joining: task -> (unit -> 'a) -> 'a
- val waiting: task -> deps -> (unit -> 'a) -> 'a
+ val dequeue_deps: Thread.thread -> task list -> queue ->
+ (((task * (bool -> bool) list) option * task list) * queue)
end;
structure Task_Queue: TASK_QUEUE =
@@ -140,6 +137,19 @@
structure Task_Graph = Graph(type key = task val ord = task_ord);
+(* timing *)
+
+fun running task =
+ update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
+
+fun joining task =
+ update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
+
+fun waiting task deps =
+ update_timing (fn t => fn (a, b, ds) =>
+ (Time.- (a, t), Time.+ (b, t), fold (insert (op =) o name_of_task) deps ds)) task;
+
+
(** queue of jobs and groups **)
@@ -165,7 +175,7 @@
(* queue *)
datatype queue = Queue of
- {groups: task list Inttab.table, (*groups with presently known members*)
+ {groups: task list Inttab.table, (*presently known group members*)
jobs: jobs}; (*job dependency graph*)
fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
@@ -291,15 +301,7 @@
(* dequeue wrt. dynamic dependencies *)
-abstype deps = Deps of task list
-with
-
-fun init_deps tasks = Deps tasks;
-fun finished_deps (Deps tasks) = null tasks;
-
-fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks;
-
-fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) =
+fun dequeue_deps thread deps (queue as Queue {groups, jobs}) =
let
fun ready [] rest = (NONE, rev rest)
| ready (task :: tasks) rest =
@@ -322,28 +324,14 @@
fun result (res as (task, _)) deps' =
let val jobs' = set_job task (Running thread) jobs
- in ((SOME res, Deps deps'), make_queue groups jobs') end;
+ in ((SOME res, deps'), make_queue groups jobs') end;
in
(case ready deps [] of
(SOME res, deps') => result res deps'
| (NONE, deps') =>
(case ready_dep [] deps' of
SOME res => result res deps'
- | NONE => ((NONE, Deps deps'), queue)))
+ | NONE => ((NONE, deps'), queue)))
end;
end;
-
-
-(* timing *)
-
-fun running task =
- update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
-
-fun joining task =
- update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
-
-fun waiting task deps =
- update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task;
-
-end;