--- a/src/Pure/Concurrent/future.ML Thu Sep 11 13:43:42 2008 +0200
+++ b/src/Pure/Concurrent/future.ML Thu Sep 11 18:07:58 2008 +0200
@@ -37,6 +37,7 @@
val fork: (unit -> 'a) -> 'a T
val join_results: 'a T list -> 'a Exn.result list
val join: 'a T -> 'a
+ val focus: task list -> unit
val cancel: 'a T -> unit
val interrupt_task: string -> unit
end;
@@ -266,12 +267,16 @@
fun join x = Exn.release (singleton join_results x);
-(* termination *)
+(* misc operations *)
+
+(*focus: collection of high-priority task*)
+fun focus tasks = SYNCHRONIZED "interrupt" (fn () =>
+ change queue (TaskQueue.focus tasks));
(*cancel: present and future group members will be interrupted eventually*)
fun cancel x = (scheduler_check (); cancel_request (group_of x));
-(*interrupt: adhoc signal, permissive, may get ignored*)
+(*interrupt: permissive signal, may get ignored*)
fun interrupt_task id = SYNCHRONIZED "interrupt"
(fn () => TaskQueue.interrupt_external (! queue) id);
--- a/src/Pure/Concurrent/task_queue.ML Thu Sep 11 13:43:42 2008 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Thu Sep 11 18:07:58 2008 +0200
@@ -16,6 +16,7 @@
val empty: queue
val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
val depend: task list -> task -> queue -> queue
+ val focus: task list -> queue -> queue
val dequeue: queue -> (task * group * (unit -> bool)) option * queue
val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
val interrupt: queue -> task -> unit
@@ -47,60 +48,81 @@
type jobs = (group * job) IntGraph.T;
-fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id;
fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
+
fun add_job (Task id) (Task dep) (jobs: jobs) =
IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
+fun check_job (jobs: jobs) (task as Task id) =
+ if can (IntGraph.get_node jobs) id then SOME task else NONE;
+
(* queue of grouped jobs *)
datatype queue = Queue of
{groups: task list Inttab.table, (*groups with presently active members*)
- jobs: jobs}; (*job dependency graph*)
+ jobs: jobs, (*job dependency graph*)
+ focus: task list}; (*particular collection of high-priority tasks*)
-fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
-val empty = make_queue Inttab.empty IntGraph.empty;
+fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus};
+val empty = make_queue Inttab.empty IntGraph.empty [];
(* enqueue *)
-fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) =
+fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs, focus}) =
let
val id = serial ();
val task = Task id;
val groups' = Inttab.cons_list (gid, task) groups;
val jobs' = jobs
|> IntGraph.new_node (id, (group, Job job)) |> fold (add_job task) deps;
- in (task, make_queue groups' jobs') end;
+ in (task, make_queue groups' jobs' focus) end;
-fun depend deps task (Queue {groups, jobs}) =
- make_queue groups (fold (add_job task) deps jobs);
+fun depend deps task (Queue {groups, jobs, focus}) =
+ make_queue groups (fold (add_job task) deps jobs) focus;
+
+fun focus tasks (Queue {groups, jobs, ...}) =
+ make_queue groups jobs (map_filter (check_job jobs) tasks);
(* dequeue *)
-fun dequeue_if P (queue as Queue {groups, jobs}) =
+local
+
+fun dequeue_result NONE queue = (NONE, queue)
+ | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) =
+ (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus);
+
+fun dequeue_global (queue as Queue {jobs, ...}) =
let
fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) =
- if P id then SOME (Task id, group, (fn () => job ok)) else NONE
+ SOME (Task id, group, (fn () => job ok))
| ready _ = NONE;
- in
- (case IntGraph.get_first ready jobs of
- NONE => (NONE, queue)
- | SOME result =>
- let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs
- in (SOME result, make_queue groups jobs') end)
- end;
+ in dequeue_result (IntGraph.get_first ready jobs) queue end;
-val dequeue = dequeue_if (K true);
+fun dequeue_local focus (queue as Queue {jobs, ...}) =
+ let
+ fun ready id =
+ (case IntGraph.get_node jobs id of
+ (group as Group (_, ref ok), Job job) =>
+ if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok))
+ else NONE
+ | _ => NONE);
+ val ids = map (fn Task id => id) focus;
+ in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end;
+
+in
+
+fun dequeue (queue as Queue {focus, ...}) =
+ (case dequeue_local focus queue of (NONE, _) => dequeue_global queue | res => res);
fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
- let val ids = tasks
- |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE)
- in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
+ dequeue_local (map_filter (check_job jobs) tasks) queue;
+
+end;
(* sporadic interrupts *)
@@ -114,9 +136,9 @@
(case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
-(* termination *)
+(* misc operations *)
-fun cancel (Queue {groups, jobs}) (Group (gid, ok)) =
+fun cancel (Queue {groups, jobs, ...}) (Group (gid, ok)) =
let
val _ = ok := false; (*invalidate any future group members*)
val tasks = Inttab.lookup_list groups gid;
@@ -124,11 +146,12 @@
val _ = List.app interrupt_thread running;
in null running end;
-fun finish (task as Task id) (Queue {groups, jobs}) =
+fun finish (task as Task id) (Queue {groups, jobs, focus}) =
let
val Group (gid, _) = get_group jobs task;
val groups' = Inttab.remove_list (op =) (gid, task) groups;
val jobs' = IntGraph.del_nodes [id] jobs;
- in make_queue groups' jobs' end;
+ val focus' = remove (op =) task focus;
+ in make_queue groups' jobs' focus' end;
end;