# HG changeset patch # User wenzelm # Date 1262798056 -3600 # Node ID 16bf3e9786a35434370c9bbbe74761682a1a50a9 # Parent 02936e77a07c5c8a68d7384b7264ec791444185a eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead); diff -r 02936e77a07c -r 16bf3e9786a3 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Jan 06 15:07:56 2010 +0100 +++ b/src/Pure/Concurrent/future.ML Wed Jan 06 18:14:16 2010 +0100 @@ -179,7 +179,7 @@ in (result, job) end; fun cancel_now group = (*requires SYNCHRONIZED*) - Unsynchronized.change_result queue (Task_Queue.cancel group); + Task_Queue.cancel (! queue) group; fun cancel_later group = (*requires SYNCHRONIZED*) (Unsynchronized.change canceled (insert Task_Queue.eq_group group); @@ -351,7 +351,7 @@ in continue end handle Exn.Interrupt => (Multithreading.tracing 1 (fn () => "Interrupt"); - List.app cancel_later (Unsynchronized.change_result queue Task_Queue.cancel_all); + List.app cancel_later (Task_Queue.cancel_all (! queue)); broadcast_work (); true); fun scheduler_loop () = diff -r 02936e77a07c -r 16bf3e9786a3 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Wed Jan 06 15:07:56 2010 +0100 +++ b/src/Pure/Concurrent/task_queue.ML Wed Jan 06 18:14:16 2010 +0100 @@ -22,8 +22,8 @@ val empty: queue val all_passive: queue -> bool val status: queue -> {ready: int, pending: int, running: int, passive: int} - val cancel: group -> queue -> bool * queue - val cancel_all: queue -> group list * queue + val cancel: queue -> group -> bool + val cancel_all: queue -> group list val enqueue_passive: group -> queue -> task * queue val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue val extend: task -> (bool -> bool) -> queue -> queue option @@ -118,16 +118,13 @@ (* queue of grouped jobs *) -datatype result = Unknown | Result of task | No_Result; - datatype queue = Queue of {groups: task list Inttab.table, (*groups with presently active members*) - jobs: jobs, (*job dependency graph*) - cache: result}; (*last dequeue result*) + jobs: jobs}; (*job dependency graph*) -fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache}; +fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; -val empty = make_queue Inttab.empty Task_Graph.empty No_Result; +val empty = make_queue Inttab.empty Task_Graph.empty; fun all_passive (Queue {jobs, ...}) = Task_Graph.get_first NONE @@ -150,16 +147,16 @@ (* cancel -- peers and sub-groups *) -fun cancel group (Queue {groups, jobs, ...}) = +fun cancel (Queue {groups, jobs}) group = let val _ = cancel_group group Exn.Interrupt; val tasks = Inttab.lookup_list groups (group_id group); val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; val _ = List.app SimpleThread.interrupt running; - in (null running, make_queue groups jobs Unknown) end; + in null running end; -fun cancel_all (Queue {groups, jobs, ...}) = +fun cancel_all (Queue {groups, jobs}) = let fun cancel_job (group, job) (groups, running) = (cancel_group group Exn.Interrupt; @@ -168,20 +165,20 @@ | _ => (groups, running))); val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); val _ = List.app SimpleThread.interrupt running; - in (running_groups, make_queue groups jobs Unknown) end; + in running_groups end; (* enqueue *) -fun enqueue_passive group (Queue {groups, jobs, cache}) = +fun enqueue_passive group (Queue {groups, jobs}) = let val task = new_task NONE; val groups' = groups |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive)); - in (task, make_queue groups' jobs' cache) end; + in (task, make_queue groups' jobs') end; -fun enqueue group deps pri job (Queue {groups, jobs, cache}) = +fun enqueue group deps pri job (Queue {groups, jobs}) = let val task = new_task (SOME pri); val groups' = groups @@ -191,49 +188,36 @@ |> fold (add_job task) deps |> fold (fold (add_job task) o get_deps jobs) deps; val minimal = null (get_deps jobs' task); - val cache' = - (case cache of - Result last => - if task_ord (last, task) = LESS - then cache else Unknown - | _ => Unknown); - in ((task, minimal), make_queue groups' jobs' cache') end; + in ((task, minimal), make_queue groups' jobs') end; -fun extend task job (Queue {groups, jobs, cache}) = +fun extend task job (Queue {groups, jobs}) = (case try (get_job jobs) task of - SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache) + SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs)) | _ => NONE); (* dequeue *) -fun dequeue thread (queue as Queue {groups, jobs, cache}) = +fun dequeue thread (queue as Queue {groups, jobs}) = let fun ready (task, ((group, Job list), (deps, _))) = if is_ready deps group then SOME (task, group, rev list) else NONE | ready _ = NONE; - fun deq boundary = - (case Task_Graph.get_first boundary ready jobs of - NONE => (NONE, make_queue groups jobs No_Result) - | SOME (result as (task, _, _)) => - let - val jobs' = set_job task (Running thread) jobs; - val cache' = Result task; - in (SOME result, make_queue groups jobs' cache') end); in - (case cache of - Unknown => deq NONE - | Result last => deq (SOME last) - | No_Result => (NONE, queue)) + (case Task_Graph.get_first NONE ready jobs of + NONE => (NONE, queue) + | SOME (result as (task, _, _)) => + let val jobs' = set_job task (Running thread) jobs + in (SOME result, make_queue groups jobs') end) end; (* dequeue_towards -- adhoc dependencies *) -fun depend task deps (Queue {groups, jobs, ...}) = - make_queue groups (fold (add_dep task) deps jobs) Unknown; +fun depend task deps (Queue {groups, jobs}) = + make_queue groups (fold (add_dep task) deps jobs); -fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) = +fun dequeue_towards thread deps (queue as Queue {groups, jobs}) = let fun ready task = (case Task_Graph.get_node jobs task of @@ -244,10 +228,8 @@ | _ => NONE); val tasks = filter (can (Task_Graph.get_node jobs)) deps; fun result (res as (task, _, _)) = - let - val jobs' = set_job task (Running thread) jobs; - val cache' = Unknown; - in ((SOME res, tasks), make_queue groups jobs' cache') end; + let val jobs' = set_job task (Running thread) jobs + in ((SOME res, tasks), make_queue groups jobs') end; in (case get_first ready tasks of SOME res => result res @@ -260,14 +242,13 @@ (* finish *) -fun finish task (Queue {groups, jobs, cache}) = +fun finish task (Queue {groups, jobs}) = let val group = get_group jobs task; val groups' = groups |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group); val jobs' = Task_Graph.del_node task jobs; val maximal = null (Task_Graph.imm_succs jobs task); - val cache' = if maximal then cache else Unknown; - in (maximal, make_queue groups' jobs' cache') end; + in (maximal, make_queue groups' jobs') end; end;