# HG changeset patch # User wenzelm # Date 1296650289 -3600 # Node ID b5d7b15166bf82be530a449c39c5476091917735 # Parent a4c822915eaa5afbdfe1056b4c0d687b16273624 Future.join_results: discontinued post-hoc recording of dynamic dependencies; abstract Task_Queue.deps; tuned signature; tuned; diff -r a4c822915eaa -r b5d7b15166bf src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Feb 01 22:24:28 2011 +0100 +++ b/src/Pure/Concurrent/future.ML Wed Feb 02 13:38:09 2011 +0100 @@ -449,12 +449,12 @@ else res); fun join_next deps = (*requires SYNCHRONIZED*) - if null deps then NONE + if Task_Queue.finished_deps deps then NONE else - (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of - (NONE, []) => NONE - | (NONE, deps') => - (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') + (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of + (NONE, deps') => + if Task_Queue.finished_deps deps' then NONE + else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') | (SOME work, deps') => SOME (work, deps')); fun execute_work NONE = () @@ -462,10 +462,6 @@ and join_work deps = execute_work (SYNCHRONIZED "join" (fn () => join_next deps)); -fun join_depend task deps = - execute_work (SYNCHRONIZED "join" (fn () => - (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps))); - in fun join_results xs = @@ -474,10 +470,9 @@ if forall is_finished xs then () else if Multithreading.self_critical () then error "Cannot join future values within critical section" - else - (case worker_task () of - SOME task => join_depend task (map task_of xs) - | NONE => List.app (ignore o Single_Assignment.await o result_of) xs); + else if is_some (thread_data ()) then + join_work (Task_Queue.init_deps (map task_of xs)) + else List.app (ignore o Single_Assignment.await o result_of) xs; in map get_result xs end; end; @@ -544,7 +539,9 @@ Unsynchronized.change_result queue (Task_Queue.dequeue_passive (Thread.self ()) task)); in if still_passive then execute (task, group, [job]) else () end); - val _ = worker_waiting [task] (fn () => Single_Assignment.await result); + val _ = + worker_waiting (Task_Queue.init_deps [task]) + (fn () => Single_Assignment.await result); in () end; fun fulfill x res = fulfill_result x (Exn.Result res); diff -r a4c822915eaa -r b5d7b15166bf src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Feb 01 22:24:28 2011 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Wed Feb 02 13:38:09 2011 +0100 @@ -8,12 +8,10 @@ sig type task val dummy_task: task + val name_of_task: task -> string val pri_of_task: task -> int val str_of_task: task -> string val timing_of_task: task -> Time.time * Time.time * string list - val running: task -> (unit -> 'a) -> 'a - val joining: task -> (unit -> 'a) -> 'a - val waiting: task -> task list -> (unit -> 'a) -> 'a type group val new_group: group option -> group val group_id: group -> int @@ -28,16 +26,21 @@ val status: queue -> {ready: int, pending: int, running: int, passive: int} val cancel: queue -> group -> bool val cancel_all: queue -> group list + val finish: task -> queue -> bool * queue val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue val extend: task -> (bool -> bool) -> queue -> queue option val dequeue_passive: Thread.thread -> task -> queue -> bool * queue val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue - val depend: task -> task list -> queue -> queue - val dequeue_towards: Thread.thread -> task list -> queue -> - (((task * group * (bool -> bool) list) option * task list) * queue) - val finish: task -> queue -> bool * queue + type deps + val init_deps: task list -> deps + val finished_deps: deps -> bool + val dequeue_deps: Thread.thread -> deps -> queue -> + (((task * group * (bool -> bool) list) option * deps) * queue) + val running: task -> (unit -> 'a) -> 'a + val joining: task -> (unit -> 'a) -> 'a + val waiting: task -> deps -> (unit -> 'a) -> 'a end; structure Task_Queue: TASK_QUEUE = @@ -46,24 +49,15 @@ val new_id = Synchronized.counter (); -(* timing *) +(** grouped tasks **) -type timing = Time.time * Time.time * string list; +(* tasks *) + +type timing = Time.time * Time.time * string list; (*run, wait, wait dependencies*) fun new_timing () = Synchronized.var "timing" ((Time.zeroTime, Time.zeroTime, []): timing); -fun gen_timing account timing e = - let - val start = Time.now (); - val result = Exn.capture e (); - val t = Time.- (Time.now (), start); - val _ = Synchronized.change timing (account t); - in Exn.release result end; - - -(* tasks *) - abstype task = Task of {name: string, id: int, @@ -74,21 +68,20 @@ val dummy_task = Task {name = "", id = ~1, pri = NONE, timing = new_timing ()}; fun new_task name pri = Task {name = name, id = new_id (), pri = pri, timing = new_timing ()}; +fun name_of_task (Task {name, ...}) = name; fun pri_of_task (Task {pri, ...}) = the_default 0 pri; fun str_of_task (Task {name, id, ...}) = if name = "" then string_of_int id else string_of_int id ^ " (" ^ name ^ ")"; fun timing_of_task (Task {timing, ...}) = Synchronized.value timing; -fun running (Task {timing, ...}) = - gen_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) timing; - -fun joining (Task {timing, ...}) = - gen_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) timing; - -fun waiting (Task {timing, ...}) deps = - timing |> gen_timing (fn t => fn (a, b, ds) => - (Time.- (a, t), Time.+ (b, t), fold (fn Task {name, ...} => insert (op =) name) deps ds)); +fun update_timing update (Task {timing, ...}) e = + let + val start = Time.now (); + val result = Exn.capture e (); + val t = Time.- (Time.now (), start); + val _ = Synchronized.change timing (update t); + in Exn.release result end; fun task_ord (Task {id = id1, pri = pri1, ...}, Task {id = id2, pri = pri2, ...}) = prod_ord (rev_order o option_ord int_ord) int_ord ((pri1, id1), (pri2, id2)); @@ -141,6 +134,8 @@ end; +(** queue of jobs and groups **) + (* jobs *) datatype job = @@ -157,16 +152,11 @@ fun add_job task dep (jobs: jobs) = Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; -fun add_dep task dep (jobs: jobs) = - if Task_Graph.is_edge jobs (task, dep) then - raise Fail "Cyclic dependency of future tasks" - else add_job task dep jobs; - fun get_deps (jobs: jobs) task = Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => []; -(* queue of grouped jobs *) +(* queue *) datatype queue = Queue of {groups: task list Inttab.table, (*groups with presently active members*) @@ -207,6 +197,9 @@ in {ready = x, pending = y, running = z, passive = w} end; + +(** task queue operations **) + (* cancel -- peers and sub-groups *) fun cancel (Queue {groups, jobs}) group = @@ -230,6 +223,18 @@ in running_groups end; +(* finish *) + +fun finish task (Queue {groups, jobs}) = + let + val group = get_group jobs task; + val groups' = groups + |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group); + val jobs' = Task_Graph.del_node task jobs; + val maximal = null (Task_Graph.imm_succs jobs task); + in (maximal, make_queue groups' jobs') end; + + (* enqueue *) fun enqueue_passive group abort (Queue {groups, jobs}) = @@ -275,37 +280,44 @@ | NONE => (NONE, queue)); -(* dequeue_towards -- adhoc dependencies *) +(* dequeue wrt. dynamic dependencies *) + +abstype deps = Deps of task list +with -fun depend task deps (Queue {groups, jobs}) = - make_queue groups (fold (add_dep task) deps jobs); +fun init_deps tasks = Deps tasks; +fun finished_deps (Deps tasks) = null tasks; -fun dequeue_towards thread deps (queue as Queue {groups, jobs}) = +fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks; + +fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) = let fun ready task = ready_job task (Task_Graph.get_entry jobs task); val tasks = filter (can (Task_Graph.get_node jobs)) deps; fun result (res as (task, _, _)) = let val jobs' = set_job task (Running thread) jobs - in ((SOME res, tasks), make_queue groups jobs') end; + in ((SOME res, Deps tasks), make_queue groups jobs') end; in (case get_first ready tasks of SOME res => result res | NONE => (case get_first (get_first ready o get_deps jobs) tasks of SOME res => result res - | NONE => ((NONE, tasks), queue))) + | NONE => ((NONE, Deps tasks), queue))) end; +end; + -(* finish *) +(* timing *) + +fun running task = + update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task; -fun finish task (Queue {groups, jobs}) = - let - val group = get_group jobs task; - val groups' = groups - |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group); - val jobs' = Task_Graph.del_node task jobs; - val maximal = null (Task_Graph.imm_succs jobs task); - in (maximal, make_queue groups' jobs') end; +fun joining task = + update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task; + +fun waiting task deps = + update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task; end;