--- a/src/Pure/Concurrent/task_queue.ML Wed Sep 10 11:36:37 2008 +0200
+++ b/src/Pure/Concurrent/task_queue.ML Wed Sep 10 19:44:28 2008 +0200
@@ -18,8 +18,10 @@
val depend: task list -> task -> queue -> queue
val dequeue: queue -> (task * group * (unit -> bool)) option * queue
val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
- val cancel: group -> queue -> bool * queue
+ val interrupt: queue -> task -> unit
+ val interrupt_external: queue -> string -> unit
val finish: task -> queue -> queue
+ val cancel: queue -> group -> bool
end;
structure TaskQueue: TASK_QUEUE =
@@ -28,18 +30,19 @@
(* identifiers *)
datatype task = Task of serial;
+fun str_of_task (Task i) = string_of_int i;
-datatype group = Group of serial;
-fun new_group () = Group (serial ());
+datatype group = Group of serial * bool ref;
+fun new_group () = Group (serial (), ref true);
-fun str_of_task (Task i) = string_of_int i;
-fun str_of_group (Group i) = string_of_int i;
+fun str_of_group (Group (i, ref ok)) =
+ if ok then string_of_int i else enclose "(" ")" (string_of_int i);
(* jobs *)
datatype job =
- Job of bool * (bool -> bool) |
+ Job of bool -> bool |
Running of Thread.thread;
type jobs = (group * job) IntGraph.T;
@@ -64,13 +67,13 @@
(* enqueue *)
-fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) =
+fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) =
let
val id = serial ();
val task = Task id;
val groups' = Inttab.cons_list (gid, task) groups;
val jobs' = jobs
- |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps;
+ |> IntGraph.new_node (id, (group, Job job)) |> fold (add_job task) deps;
in (task, make_queue groups' jobs') end;
fun depend deps task (Queue {groups, jobs}) =
@@ -81,7 +84,7 @@
fun dequeue_if P (queue as Queue {groups, jobs}) =
let
- fun ready (id, ((group, Job (ok, job)), ([], _))) =
+ fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) =
if P id then SOME (Task id, group, (fn () => job ok)) else NONE
| ready _ = NONE;
in
@@ -100,22 +103,30 @@
in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
+(* sporadic interrupts *)
+
+fun interrupt_thread thread = Thread.interrupt thread handle Thread _ => ();
+
+fun interrupt (Queue {jobs, ...}) task =
+ (case try (get_job jobs) task of SOME (Running thread) => interrupt_thread thread | _ => ());
+
+fun interrupt_external queue str =
+ (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
+
+
(* termination *)
-fun cancel (group as Group gid) (Queue {groups, jobs}) =
+fun cancel (Queue {groups, jobs}) (group as Group (gid, ok)) =
let
+ val _ = ok := false; (*invalidate any future group members*)
val tasks = Inttab.lookup_list groups gid;
val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks [];
- val jobs' = fold (fn task =>
- (case get_job jobs task of
- Job (true, job) => map_job task (K (Job (false, job)))
- | _ => I)) tasks jobs;
- val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running;
- in (null running, make_queue groups jobs') end;
+ val _ = List.app interrupt_thread running;
+ in null running end;
fun finish (task as Task id) (Queue {groups, jobs}) =
let
- val Group gid = get_group jobs task;
+ val Group (gid, _) = get_group jobs task;
val groups' = Inttab.remove_list (op =) (gid, task) groups;
val jobs' = IntGraph.del_nodes [id] jobs;
in make_queue groups' jobs' end;