diff -r a96d43a54650 -r afdbec23b92b src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Wed Feb 02 18:22:13 2011 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Wed Feb 02 20:32:50 2011 +0100 @@ -21,6 +21,9 @@ val pri_of_task: task -> int val str_of_task: task -> string val timing_of_task: task -> Time.time * Time.time * string list + val running: task -> (unit -> 'a) -> 'a + val joining: task -> (unit -> 'a) -> 'a + val waiting: task -> task list -> (unit -> 'a) -> 'a type queue val empty: queue val all_passive: queue -> bool @@ -34,14 +37,8 @@ val extend: task -> (bool -> bool) -> queue -> queue option val dequeue_passive: Thread.thread -> task -> queue -> bool * queue val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue - type deps - val init_deps: task list -> deps - val finished_deps: deps -> bool - val dequeue_deps: Thread.thread -> deps -> queue -> - (((task * (bool -> bool) list) option * deps) * queue) - val running: task -> (unit -> 'a) -> 'a - val joining: task -> (unit -> 'a) -> 'a - val waiting: task -> deps -> (unit -> 'a) -> 'a + val dequeue_deps: Thread.thread -> task list -> queue -> + (((task * (bool -> bool) list) option * task list) * queue) end; structure Task_Queue: TASK_QUEUE = @@ -140,6 +137,19 @@ structure Task_Graph = Graph(type key = task val ord = task_ord); +(* timing *) + +fun running task = + update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task; + +fun joining task = + update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task; + +fun waiting task deps = + update_timing (fn t => fn (a, b, ds) => + (Time.- (a, t), Time.+ (b, t), fold (insert (op =) o name_of_task) deps ds)) task; + + (** queue of jobs and groups **) @@ -165,7 +175,7 @@ (* queue *) datatype queue = Queue of - {groups: task list Inttab.table, (*groups with presently known members*) + {groups: task list Inttab.table, (*presently known group members*) jobs: jobs}; (*job dependency graph*) fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; @@ -291,15 +301,7 @@ (* dequeue wrt. dynamic dependencies *) -abstype deps = Deps of task list -with - -fun init_deps tasks = Deps tasks; -fun finished_deps (Deps tasks) = null tasks; - -fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks; - -fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) = +fun dequeue_deps thread deps (queue as Queue {groups, jobs}) = let fun ready [] rest = (NONE, rev rest) | ready (task :: tasks) rest = @@ -322,28 +324,14 @@ fun result (res as (task, _)) deps' = let val jobs' = set_job task (Running thread) jobs - in ((SOME res, Deps deps'), make_queue groups jobs') end; + in ((SOME res, deps'), make_queue groups jobs') end; in (case ready deps [] of (SOME res, deps') => result res deps' | (NONE, deps') => (case ready_dep [] deps' of SOME res => result res deps' - | NONE => ((NONE, Deps deps'), queue))) + | NONE => ((NONE, deps'), queue))) end; end; - - -(* timing *) - -fun running task = - update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task; - -fun joining task = - update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task; - -fun waiting task deps = - update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task; - -end;