added focus, which indicates a particular collection of high-priority tasks;
authorwenzelm
Thu, 11 Sep 2008 18:07:58 +0200
changeset 28202 23cb9a974630
parent 28201 7ae5cdb7b122
child 28203 88f18387f1c9
added focus, which indicates a particular collection of high-priority tasks; tuned;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/future.ML	Thu Sep 11 13:43:42 2008 +0200
+++ b/src/Pure/Concurrent/future.ML	Thu Sep 11 18:07:58 2008 +0200
@@ -37,6 +37,7 @@
   val fork: (unit -> 'a) -> 'a T
   val join_results: 'a T list -> 'a Exn.result list
   val join: 'a T -> 'a
+  val focus: task list -> unit
   val cancel: 'a T -> unit
   val interrupt_task: string -> unit
 end;
@@ -266,12 +267,16 @@
 fun join x = Exn.release (singleton join_results x);
 
 
-(* termination *)
+(* misc operations *)
+
+(*focus: collection of high-priority task*)
+fun focus tasks = SYNCHRONIZED "interrupt" (fn () =>
+  change queue (TaskQueue.focus tasks));
 
 (*cancel: present and future group members will be interrupted eventually*)
 fun cancel x = (scheduler_check (); cancel_request (group_of x));
 
-(*interrupt: adhoc signal, permissive, may get ignored*)
+(*interrupt: permissive signal, may get ignored*)
 fun interrupt_task id = SYNCHRONIZED "interrupt"
   (fn () => TaskQueue.interrupt_external (! queue) id);
 
--- a/src/Pure/Concurrent/task_queue.ML	Thu Sep 11 13:43:42 2008 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Thu Sep 11 18:07:58 2008 +0200
@@ -16,6 +16,7 @@
   val empty: queue
   val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
   val depend: task list -> task -> queue -> queue
+  val focus: task list -> queue -> queue
   val dequeue: queue -> (task * group * (unit -> bool)) option * queue
   val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
   val interrupt: queue -> task -> unit
@@ -47,60 +48,81 @@
 
 type jobs = (group * job) IntGraph.T;
 
-fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id;
 fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
 fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
 fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
+
 fun add_job (Task id) (Task dep) (jobs: jobs) =
   IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
 
+fun check_job (jobs: jobs) (task as Task id) =
+  if can (IntGraph.get_node jobs) id then SOME task else NONE;
+
 
 (* queue of grouped jobs *)
 
 datatype queue = Queue of
  {groups: task list Inttab.table,   (*groups with presently active members*)
-  jobs: jobs};                      (*job dependency graph*)
+  jobs: jobs,                       (*job dependency graph*)
+  focus: task list};                (*particular collection of high-priority tasks*)
 
-fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
-val empty = make_queue Inttab.empty IntGraph.empty;
+fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus};
+val empty = make_queue Inttab.empty IntGraph.empty [];
 
 
 (* enqueue *)
 
-fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) =
+fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs, focus}) =
   let
     val id = serial ();
     val task = Task id;
     val groups' = Inttab.cons_list (gid, task) groups;
     val jobs' = jobs
       |> IntGraph.new_node (id, (group, Job job)) |> fold (add_job task) deps;
-  in (task, make_queue groups' jobs') end;
+  in (task, make_queue groups' jobs' focus) end;
 
-fun depend deps task (Queue {groups, jobs}) =
-  make_queue groups (fold (add_job task) deps jobs);
+fun depend deps task (Queue {groups, jobs, focus}) =
+  make_queue groups (fold (add_job task) deps jobs) focus;
+
+fun focus tasks (Queue {groups, jobs, ...}) =
+  make_queue groups jobs (map_filter (check_job jobs) tasks);
 
 
 (* dequeue *)
 
-fun dequeue_if P (queue as Queue {groups, jobs}) =
+local
+
+fun dequeue_result NONE queue = (NONE, queue)
+  | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) =
+      (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus);
+
+fun dequeue_global (queue as Queue {jobs, ...}) =
   let
     fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) =
-          if P id then SOME (Task id, group, (fn () => job ok)) else NONE
+          SOME (Task id, group, (fn () => job ok))
       | ready _ = NONE;
-  in
-    (case IntGraph.get_first ready jobs of
-      NONE => (NONE, queue)
-    | SOME result =>
-        let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs
-        in (SOME result, make_queue groups jobs') end)
-  end;
+  in dequeue_result (IntGraph.get_first ready jobs) queue end;
 
-val dequeue = dequeue_if (K true);
+fun dequeue_local focus (queue as Queue {jobs, ...}) =
+  let
+    fun ready id =
+      (case IntGraph.get_node jobs id of
+        (group as Group (_, ref ok), Job job) =>
+          if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok))
+          else NONE
+      | _ => NONE);
+    val ids = map (fn Task id => id) focus;
+  in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end;
+
+in
+
+fun dequeue (queue as Queue {focus, ...}) =
+  (case dequeue_local focus queue of (NONE, _) => dequeue_global queue | res => res);
 
 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
-  let val ids = tasks
-    |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE)
-  in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
+  dequeue_local (map_filter (check_job jobs) tasks) queue;
+
+end;
 
 
 (* sporadic interrupts *)
@@ -114,9 +136,9 @@
   (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
 
 
-(* termination *)
+(* misc operations *)
 
-fun cancel (Queue {groups, jobs}) (Group (gid, ok)) =
+fun cancel (Queue {groups, jobs, ...}) (Group (gid, ok)) =
   let
     val _ = ok := false;  (*invalidate any future group members*)
     val tasks = Inttab.lookup_list groups gid;
@@ -124,11 +146,12 @@
     val _ = List.app interrupt_thread running;
   in null running end;
 
-fun finish (task as Task id) (Queue {groups, jobs}) =
+fun finish (task as Task id) (Queue {groups, jobs, focus}) =
   let
     val Group (gid, _) = get_group jobs task;
     val groups' = Inttab.remove_list (op =) (gid, task) groups;
     val jobs' = IntGraph.del_nodes [id] jobs;
-  in make_queue groups' jobs' end;
+    val focus' = remove (op =) task focus;
+  in make_queue groups' jobs' focus' end;
 
 end;