# HG changeset patch # User wenzelm # Date 1221149278 -7200 # Node ID 23cb9a974630bb0257b11a664b9ee4a42cfd847f # Parent 7ae5cdb7b122d26cb04b01be7468e9e6b67bd2f1 added focus, which indicates a particular collection of high-priority tasks; tuned; diff -r 7ae5cdb7b122 -r 23cb9a974630 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Thu Sep 11 13:43:42 2008 +0200 +++ b/src/Pure/Concurrent/future.ML Thu Sep 11 18:07:58 2008 +0200 @@ -37,6 +37,7 @@ val fork: (unit -> 'a) -> 'a T val join_results: 'a T list -> 'a Exn.result list val join: 'a T -> 'a + val focus: task list -> unit val cancel: 'a T -> unit val interrupt_task: string -> unit end; @@ -266,12 +267,16 @@ fun join x = Exn.release (singleton join_results x); -(* termination *) +(* misc operations *) + +(*focus: collection of high-priority task*) +fun focus tasks = SYNCHRONIZED "interrupt" (fn () => + change queue (TaskQueue.focus tasks)); (*cancel: present and future group members will be interrupted eventually*) fun cancel x = (scheduler_check (); cancel_request (group_of x)); -(*interrupt: adhoc signal, permissive, may get ignored*) +(*interrupt: permissive signal, may get ignored*) fun interrupt_task id = SYNCHRONIZED "interrupt" (fn () => TaskQueue.interrupt_external (! queue) id); diff -r 7ae5cdb7b122 -r 23cb9a974630 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Thu Sep 11 13:43:42 2008 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Thu Sep 11 18:07:58 2008 +0200 @@ -16,6 +16,7 @@ val empty: queue val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue val depend: task list -> task -> queue -> queue + val focus: task list -> queue -> queue val dequeue: queue -> (task * group * (unit -> bool)) option * queue val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue val interrupt: queue -> task -> unit @@ -47,60 +48,81 @@ type jobs = (group * job) IntGraph.T; -fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id; fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id); fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id); fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs; + fun add_job (Task id) (Task dep) (jobs: jobs) = IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs; +fun check_job (jobs: jobs) (task as Task id) = + if can (IntGraph.get_node jobs) id then SOME task else NONE; + (* queue of grouped jobs *) datatype queue = Queue of {groups: task list Inttab.table, (*groups with presently active members*) - jobs: jobs}; (*job dependency graph*) + jobs: jobs, (*job dependency graph*) + focus: task list}; (*particular collection of high-priority tasks*) -fun make_queue groups jobs = Queue {groups = groups, jobs = jobs}; -val empty = make_queue Inttab.empty IntGraph.empty; +fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus}; +val empty = make_queue Inttab.empty IntGraph.empty []; (* enqueue *) -fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) = +fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs, focus}) = let val id = serial (); val task = Task id; val groups' = Inttab.cons_list (gid, task) groups; val jobs' = jobs |> IntGraph.new_node (id, (group, Job job)) |> fold (add_job task) deps; - in (task, make_queue groups' jobs') end; + in (task, make_queue groups' jobs' focus) end; -fun depend deps task (Queue {groups, jobs}) = - make_queue groups (fold (add_job task) deps jobs); +fun depend deps task (Queue {groups, jobs, focus}) = + make_queue groups (fold (add_job task) deps jobs) focus; + +fun focus tasks (Queue {groups, jobs, ...}) = + make_queue groups jobs (map_filter (check_job jobs) tasks); (* dequeue *) -fun dequeue_if P (queue as Queue {groups, jobs}) = +local + +fun dequeue_result NONE queue = (NONE, queue) + | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) = + (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus); + +fun dequeue_global (queue as Queue {jobs, ...}) = let fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) = - if P id then SOME (Task id, group, (fn () => job ok)) else NONE + SOME (Task id, group, (fn () => job ok)) | ready _ = NONE; - in - (case IntGraph.get_first ready jobs of - NONE => (NONE, queue) - | SOME result => - let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs - in (SOME result, make_queue groups jobs') end) - end; + in dequeue_result (IntGraph.get_first ready jobs) queue end; -val dequeue = dequeue_if (K true); +fun dequeue_local focus (queue as Queue {jobs, ...}) = + let + fun ready id = + (case IntGraph.get_node jobs id of + (group as Group (_, ref ok), Job job) => + if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok)) + else NONE + | _ => NONE); + val ids = map (fn Task id => id) focus; + in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end; + +in + +fun dequeue (queue as Queue {focus, ...}) = + (case dequeue_local focus queue of (NONE, _) => dequeue_global queue | res => res); fun dequeue_towards tasks (queue as Queue {jobs, ...}) = - let val ids = tasks - |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE) - in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end; + dequeue_local (map_filter (check_job jobs) tasks) queue; + +end; (* sporadic interrupts *) @@ -114,9 +136,9 @@ (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ()); -(* termination *) +(* misc operations *) -fun cancel (Queue {groups, jobs}) (Group (gid, ok)) = +fun cancel (Queue {groups, jobs, ...}) (Group (gid, ok)) = let val _ = ok := false; (*invalidate any future group members*) val tasks = Inttab.lookup_list groups gid; @@ -124,11 +146,12 @@ val _ = List.app interrupt_thread running; in null running end; -fun finish (task as Task id) (Queue {groups, jobs}) = +fun finish (task as Task id) (Queue {groups, jobs, focus}) = let val Group (gid, _) = get_group jobs task; val groups' = Inttab.remove_list (op =) (gid, task) groups; val jobs' = IntGraph.del_nodes [id] jobs; - in make_queue groups' jobs' end; + val focus' = remove (op =) task focus; + in make_queue groups' jobs' focus' end; end;