# HG changeset patch # User wenzelm # Date 1296675170 -3600 # Node ID afdbec23b92bbc99fe99580b0d05251ae78e864a # Parent a96d43a546503081cb9af74f0c0cbe42dcef2c7a eliminated slightly odd abstract type Task_Queue.deps; tuned signature; tuned; diff -r a96d43a54650 -r afdbec23b92b src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Feb 02 18:22:13 2011 +0100 +++ b/src/Pure/Concurrent/future.ML Wed Feb 02 20:32:50 2011 +0100 @@ -440,12 +440,12 @@ else res); fun join_next deps = (*requires SYNCHRONIZED*) - if Task_Queue.finished_deps deps then NONE + if null deps then NONE else (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of - (NONE, deps') => - if Task_Queue.finished_deps deps' then NONE - else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') + (NONE, []) => NONE + | (NONE, deps') => + (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') | (SOME work, deps') => SOME (work, deps')); fun execute_work NONE = () @@ -461,8 +461,7 @@ if forall is_finished xs then () else if Multithreading.self_critical () then error "Cannot join future values within critical section" - else if is_some (worker_task ()) then - join_work (Task_Queue.init_deps (map task_of xs)) + else if is_some (worker_task ()) then join_work (map task_of xs) else List.app (ignore o Single_Assignment.await o result_of) xs; in map get_result xs end; @@ -533,8 +532,8 @@ (Task_Queue.dequeue_passive (Thread.self ()) task)); in if still_passive then execute (task, [job]) else () end); val _ = - worker_waiting (Task_Queue.init_deps [task]) - (fn () => Single_Assignment.await result); + if is_some (Single_Assignment.peek result) then () + else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); in () end; fun fulfill x res = fulfill_result x (Exn.Result res); diff -r a96d43a54650 -r afdbec23b92b src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Wed Feb 02 18:22:13 2011 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Wed Feb 02 20:32:50 2011 +0100 @@ -21,6 +21,9 @@ val pri_of_task: task -> int val str_of_task: task -> string val timing_of_task: task -> Time.time * Time.time * string list + val running: task -> (unit -> 'a) -> 'a + val joining: task -> (unit -> 'a) -> 'a + val waiting: task -> task list -> (unit -> 'a) -> 'a type queue val empty: queue val all_passive: queue -> bool @@ -34,14 +37,8 @@ val extend: task -> (bool -> bool) -> queue -> queue option val dequeue_passive: Thread.thread -> task -> queue -> bool * queue val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue - type deps - val init_deps: task list -> deps - val finished_deps: deps -> bool - val dequeue_deps: Thread.thread -> deps -> queue -> - (((task * (bool -> bool) list) option * deps) * queue) - val running: task -> (unit -> 'a) -> 'a - val joining: task -> (unit -> 'a) -> 'a - val waiting: task -> deps -> (unit -> 'a) -> 'a + val dequeue_deps: Thread.thread -> task list -> queue -> + (((task * (bool -> bool) list) option * task list) * queue) end; structure Task_Queue: TASK_QUEUE = @@ -140,6 +137,19 @@ structure Task_Graph = Graph(type key = task val ord = task_ord); +(* timing *) + +fun running task = + update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task; + +fun joining task = + update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task; + +fun waiting task deps = + update_timing (fn t => fn (a, b, ds) => + (Time.- (a, t), Time.+ (b, t), fold (insert (op =) o name_of_task) deps ds)) task; + + (** queue of jobs and groups **) @@ -165,7 +175,7 @@ (* queue *) datatype queue = Queue of - {groups: task list Inttab.table, (*groups with presently known members*) + {groups: task list Inttab.table, (*presently known group members*) jobs: jobs}; (*job dependency graph*) fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; @@ -291,15 +301,7 @@ (* dequeue wrt. dynamic dependencies *) -abstype deps = Deps of task list -with - -fun init_deps tasks = Deps tasks; -fun finished_deps (Deps tasks) = null tasks; - -fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks; - -fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) = +fun dequeue_deps thread deps (queue as Queue {groups, jobs}) = let fun ready [] rest = (NONE, rev rest) | ready (task :: tasks) rest = @@ -322,28 +324,14 @@ fun result (res as (task, _)) deps' = let val jobs' = set_job task (Running thread) jobs - in ((SOME res, Deps deps'), make_queue groups jobs') end; + in ((SOME res, deps'), make_queue groups jobs') end; in (case ready deps [] of (SOME res, deps') => result res deps' | (NONE, deps') => (case ready_dep [] deps' of SOME res => result res deps' - | NONE => ((NONE, Deps deps'), queue))) + | NONE => ((NONE, deps'), queue))) end; end; - - -(* timing *) - -fun running task = - update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task; - -fun joining task = - update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task; - -fun waiting task deps = - update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task; - -end;