renamed structure TaskQueue to Task_Queue;
authorwenzelm
Tue, 16 Dec 2008 16:25:19 +0100
changeset 29121 ad73b63ae2c5
parent 29120 8a904ff43f28
child 29122 b3bae49a013a
renamed structure TaskQueue to Task_Queue; tasks are ordered according to priority, which has been generalized from bool to int; removed unused focus; tuned dequeue: single pass due to proper priority order; tuned dequeue_towards;
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/task_queue.ML	Tue Dec 16 16:25:19 2008 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Tue Dec 16 16:25:19 2008 +0100
@@ -1,5 +1,4 @@
 (*  Title:      Pure/Concurrent/task_queue.ML
-    ID:         $Id$
     Author:     Makarius
 
 Ordered queue of grouped tasks.
@@ -8,7 +7,8 @@
 signature TASK_QUEUE =
 sig
   eqtype task
-  val new_task: unit -> task
+  val new_task: int -> task
+  val pri_of_task: task -> int
   val str_of_task: task -> string
   eqtype group
   val new_group: unit -> group
@@ -17,9 +17,8 @@
   type queue
   val empty: queue
   val is_empty: queue -> bool
-  val enqueue: group -> task list -> bool -> (bool -> bool) -> queue -> task * queue
+  val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
   val depend: task list -> task -> queue -> queue
-  val focus: task list -> queue -> queue
   val dequeue: queue -> (task * group * (unit -> bool)) option * queue
   val dequeue_towards: task list -> queue ->
     (((task * group * (unit -> bool)) * task list) option * queue)
@@ -29,20 +28,27 @@
   val cancel: queue -> group -> bool
 end;
 
-structure TaskQueue: TASK_QUEUE =
+structure Task_Queue: TASK_QUEUE =
 struct
 
-(* identifiers *)
+(* tasks *)
+
+datatype task = Task of int * serial;
+fun new_task pri = Task (pri, serial ());
 
-datatype task = Task of serial;
-fun new_task () = Task (serial ());
+fun pri_of_task (Task (pri, _)) = pri;
+fun str_of_task (Task (_, i)) = string_of_int i;
 
-fun str_of_task (Task i) = string_of_int i;
+fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
+structure Task_Graph = GraphFun(type key = task val ord = task_ord);
 
 
+(* groups *)
+
 datatype group = Group of serial * bool ref;
 
 fun new_group () = Group (serial (), ref true);
+
 fun invalidate_group (Group (_, ok)) = ok := false;
 
 fun str_of_group (Group (i, ref ok)) =
@@ -52,53 +58,46 @@
 (* jobs *)
 
 datatype job =
-  Job of bool * (bool -> bool) |   (*priority, job: status -> status*)
+  Job of bool -> bool |
   Running of Thread.thread;
 
-type jobs = (group * job) IntGraph.T;
+type jobs = (group * job) Task_Graph.T;
 
-fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
-fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
-fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
+fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
+fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
+fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
 
-fun add_job (Task id) (Task dep) (jobs: jobs) =
-  IntGraph.add_edge (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
+fun add_job task dep (jobs: jobs) =
+  Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
 
-fun add_job_acyclic (Task id) (Task dep) (jobs: jobs) =
-  IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
-
-fun check_job (jobs: jobs) (task as Task id) =
-  if can (IntGraph.get_node jobs) id then SOME task else NONE;
+fun add_job_acyclic task dep (jobs: jobs) =
+  Task_Graph.add_edge_acyclic (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
 
 
 (* queue of grouped jobs *)
 
 datatype queue = Queue of
  {groups: task list Inttab.table,   (*groups with presently active members*)
-  jobs: jobs,                       (*job dependency graph*)
-  focus: task list};                (*particular collection of high-priority tasks*)
+  jobs: jobs};                      (*job dependency graph*)
 
-fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus};
+fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
 
-val empty = make_queue Inttab.empty IntGraph.empty [];
-fun is_empty (Queue {jobs, ...}) = IntGraph.is_empty jobs;
+val empty = make_queue Inttab.empty Task_Graph.empty;
+fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
 
 
 (* enqueue *)
 
-fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, focus}) =
+fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs}) =
   let
-    val task as Task id = new_task ();
+    val task = new_task pri;
     val groups' = Inttab.cons_list (gid, task) groups;
     val jobs' = jobs
-      |> IntGraph.new_node (id, (group, Job (pri, job))) |> fold (add_job task) deps;
-  in (task, make_queue groups' jobs' focus) end;
+      |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
+  in (task, make_queue groups' jobs') end;
 
-fun depend deps task (Queue {groups, jobs, focus}) =
-  make_queue groups (fold (add_job_acyclic task) deps jobs) focus;
-
-fun focus tasks (Queue {groups, jobs, ...}) =
-  make_queue groups jobs (map_filter (check_job jobs) tasks);
+fun depend deps task (Queue {groups, jobs}) =
+  make_queue groups (fold (add_job_acyclic task) deps jobs);
 
 
 (* dequeue *)
@@ -106,38 +105,30 @@
 local
 
 fun dequeue_result NONE queue = (NONE, queue)
-  | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) =
-      (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus);
-
-fun dequeue_global req_pri (queue as Queue {jobs, ...}) =
-  let
-    fun ready (id, ((group as Group (_, ref ok), Job (pri, job)), ([], _))) =
-          if pri = req_pri then SOME (Task id, group, (fn () => job ok)) else NONE
-      | ready _ = NONE;
-  in dequeue_result (IntGraph.get_first ready jobs) queue end;
-
-fun dequeue_local focus (queue as Queue {jobs, ...}) =
-  let
-    fun ready id =
-      (case IntGraph.get_node jobs id of
-        (group as Group (_, ref ok), Job (_, job)) =>
-          if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok))
-          else NONE
-      | _ => NONE);
-    val ids = map (fn Task id => id) focus;
-  in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end;
+  | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
+      (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
 
 in
 
-fun dequeue (queue as Queue {focus, ...}) =
-  (case dequeue_local focus queue of
-    (NONE, _) =>
-      (case dequeue_global true queue of (NONE, _) => dequeue_global false queue | res => res)
-  | res => res);
+fun dequeue (queue as Queue {jobs, ...}) =
+  let
+    fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
+          SOME (task, group, (fn () => job ok))
+      | ready _ = NONE;
+  in dequeue_result (Task_Graph.get_first ready jobs) queue end;
 
 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
-  let val tasks' = map_filter (check_job jobs) tasks in
-    (case dequeue_local tasks' queue of
+  let
+    val tasks' = filter (can (Task_Graph.get_node jobs)) tasks;
+
+    fun ready task =
+      (case Task_Graph.get_node jobs task of
+        (group as Group (_, ref ok), Job job) =>
+          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
+          else NONE
+      | _ => NONE);
+  in
+    (case dequeue_result (get_first ready (Task_Graph.all_preds jobs tasks')) queue of
       (NONE, queue') => (NONE, queue')
     | (SOME work, queue') => (SOME (work, tasks'), queue'))
   end;
@@ -150,8 +141,13 @@
 fun interrupt (Queue {jobs, ...}) task =
   (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
 
-fun interrupt_external queue str =
-  (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
+fun interrupt_external (queue as Queue {jobs, ...}) str =
+  (case Int.fromString str of
+    SOME i =>
+      (case Task_Graph.get_first
+          (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs
+        of SOME task => interrupt queue task | NONE => ())
+  | NONE => ());
 
 
 (* misc operations *)
@@ -164,12 +160,11 @@
     val _ = List.app SimpleThread.interrupt running;
   in null running end;
 
-fun finish (task as Task id) (Queue {groups, jobs, focus}) =
+fun finish task (Queue {groups, jobs}) =
   let
     val Group (gid, _) = get_group jobs task;
     val groups' = Inttab.remove_list (op =) (gid, task) groups;
-    val jobs' = IntGraph.del_node id jobs;
-    val focus' = remove (op =) task focus;
-  in make_queue groups' jobs' focus' end;
+    val jobs' = Task_Graph.del_node task jobs;
+  in make_queue groups' jobs' end;
 
 end;