--- a/src/Pure/Concurrent/task_queue.ML Mon Jan 05 18:13:26 2009 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Tue Jan 06 13:43:17 2009 +0100
@@ -13,16 +13,18 @@
type group
val eq_group: group * group -> bool
val new_group: unit -> group
+ val is_valid: group -> bool
val invalidate_group: group -> unit
val str_of_group: group -> string
type queue
val empty: queue
val is_empty: queue -> bool
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue
+ val extend: task -> (bool -> bool) -> queue -> queue option
val depend: task list -> task -> queue -> queue
- val dequeue: queue -> (task * group * (unit -> bool)) option * queue
+ val dequeue: queue -> (task * group * (bool -> bool) list) option * queue
val dequeue_towards: task list -> queue ->
- (((task * group * (unit -> bool)) * task list) option * queue)
+ (((task * group * (bool -> bool) list) * task list) option * queue)
val interrupt: queue -> task -> unit
val interrupt_external: queue -> string -> unit
val cancel: queue -> group -> bool
@@ -52,6 +54,7 @@
fun new_group () = Group (serial (), ref true);
+fun is_valid (Group (_, ref ok)) = ok;
fun invalidate_group (Group (_, ok)) = ok := false;
fun str_of_group (Group (i, ref ok)) =
@@ -61,14 +64,14 @@
(* jobs *)
datatype job =
- Job of bool -> bool |
+ Job of (bool -> bool) list |
Running of Thread.thread;
type jobs = (group * job) Task_Graph.T;
fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
-fun map_job task f (jobs: jobs) = Task_Graph.map_node task (apsnd f) jobs;
+fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
fun add_job task dep (jobs: jobs) =
Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
@@ -96,9 +99,14 @@
val task = new_task pri;
val groups' = Inttab.cons_list (gid, task) groups;
val jobs' = jobs
- |> Task_Graph.new_node (task, (group, Job job)) |> fold (add_job task) deps;
+ |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
in (task, make_queue groups' jobs') end;
+fun extend task job (Queue {groups, jobs}) =
+ (case try (get_job jobs) task of
+ SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
+ | _ => NONE);
+
fun depend deps task (Queue {groups, jobs}) =
make_queue groups (fold (add_job_acyclic task) deps jobs);
@@ -109,14 +117,13 @@
fun dequeue_result NONE queue = (NONE, queue)
| dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs}) =
- (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs));
+ (SOME result, make_queue groups (set_job task (Running (Thread.self ())) jobs));
in
fun dequeue (queue as Queue {jobs, ...}) =
let
- fun ready (task, ((group as Group (_, ref ok), Job job), ([], _))) =
- SOME (task, group, (fn () => job ok))
+ fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
| ready _ = NONE;
in dequeue_result (Task_Graph.get_first ready jobs) queue end;
@@ -126,8 +133,8 @@
fun ready task =
(case Task_Graph.get_node jobs task of
- (group as Group (_, ref ok), Job job) =>
- if null (Task_Graph.imm_preds jobs task) then SOME (task, group, (fn () => job ok))
+ (group, Job list) =>
+ if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
else NONE
| _ => NONE);
in