added is_valid;
authorwenzelm
Tue, 06 Jan 2009 13:43:17 +0100
changeset 29365 5c5bc17d9135
parent 29355 642cac18e155
child 29366 1ffc8cbf39ec
added is_valid; added extend: support bulk jobs; tuned;
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/task_queue.ML	Mon Jan 05 18:13:26 2009 +0100
+++ b/src/Pure/Concurrent/task_queue.ML	Tue Jan 06 13:43:17 2009 +0100
@@ -13,16 +13,18 @@
   type group
   val eq_group: group * group -> bool
   val new_group: unit -> group
+  val is_valid: group -> bool
   val invalidate_group: group -> unit
   val str_of_group: group -> string
   type queue
   val empty: queue
   val is_empty: queue -> bool
   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
+  val extend: task -> (bool -> bool) -> queue -> queue option
   val depend: task list -> task -> queue -> queue
-  val dequeue: queue -> (task * group * (unit -> bool)) option * queue
+  val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
   val dequeue_towards: task list -> queue ->
-    (((task * group * (unit -> bool)) * task list) option * queue)
+    (((task * group * (bool -> bool) list) * task list) option * queue)
   val interrupt: queue -> task -> unit
   val interrupt_external: queue -> string -> unit
   val cancel: queue -> group -> bool
@@ -52,6 +54,7 @@
 
 fun new_group () = Group (serial (), ref true);
 
+fun is_valid (Group (_, ref ok)) = ok;
 fun invalidate_group (Group (_, ok)) = ok := false;
 
 fun str_of_group (Group (i, ref ok)) =
@@ -61,14 +64,14 @@
 (* jobs *)
 
 datatype job =
-  Job of bool -> bool |
+  Job of (bool -> bool) list |
   Running of Thread.thread;
 
 type jobs = (group * job) Task_Graph.T;
 
 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
-fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
+fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
 
 fun add_job task dep (jobs: jobs) =
   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
@@ -96,9 +99,14 @@
     val task = new_task pri;
     val groups' = Inttab.cons_list (gid, task) groups;
     val jobs' = jobs
-      |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
+      |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
   in (task, make_queue groups' jobs') end;
 
+fun extend task job (Queue {groups, jobs}) =
+  (case try (get_job jobs) task of
+    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
+  | _ => NONE);
+
 fun depend deps task (Queue {groups, jobs}) =
   make_queue groups (fold (add_job_acyclic task) deps jobs);
 
@@ -109,14 +117,13 @@
 
 fun dequeue_result NONE queue = (NONE, queue)
   | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
-      (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
+      (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs));
 
 in
 
 fun dequeue (queue as Queue {jobs, ...}) =
   let
-    fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
-          SOME (task, group, (fn () => job ok))
+    fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
       | ready _ = NONE;
   in dequeue_result (Task_Graph.get_first ready jobs) queue end;
 
@@ -126,8 +133,8 @@
 
     fun ready task =
       (case Task_Graph.get_node jobs task of
-        (group as Group (_, ref ok), Job job) =>
-          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
+        (group, Job list) =>
+          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
           else NONE
       | _ => NONE);
   in