cancel: invalidate group implicitly, via bool ref;
authorwenzelm
Wed, 10 Sep 2008 19:44:28 +0200
changeset 28190 0a2434cf38c9
parent 28189 fbad2eb5be9e
child 28191 9e5f556409c6
cancel: invalidate group implicitly, via bool ref; job: moved ok flag into group; added interrupt, interrupt_external for tasks;
src/Pure/Concurrent/task_queue.ML
--- a/src/Pure/Concurrent/task_queue.ML	Wed Sep 10 11:36:37 2008 +0200
+++ b/src/Pure/Concurrent/task_queue.ML	Wed Sep 10 19:44:28 2008 +0200
@@ -18,8 +18,10 @@
   val depend: task list -> task -> queue -> queue
   val dequeue: queue -> (task * group * (unit -> bool)) option * queue
   val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
-  val cancel: group -> queue -> bool * queue
+  val interrupt: queue -> task -> unit
+  val interrupt_external: queue -> string -> unit
   val finish: task -> queue -> queue
+  val cancel: queue -> group -> bool
 end;
 
 structure TaskQueue: TASK_QUEUE =
@@ -28,18 +30,19 @@
 (* identifiers *)
 
 datatype task = Task of serial;
+fun str_of_task (Task i) = string_of_int i;
 
-datatype group = Group of serial;
-fun new_group () = Group (serial ());
+datatype group = Group of serial * bool ref;
+fun new_group () = Group (serial (), ref true);
 
-fun str_of_task (Task i) = string_of_int i;
-fun str_of_group (Group i) = string_of_int i;
+fun str_of_group (Group (i, ref ok)) =
+  if ok then string_of_int i else enclose "(" ")" (string_of_int i);
 
 
 (* jobs *)
 
 datatype job =
-  Job of bool * (bool -> bool) |
+  Job of bool -> bool |
   Running of Thread.thread;
 
 type jobs = (group * job) IntGraph.T;
@@ -64,13 +67,13 @@
 
 (* enqueue *)
 
-fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) =
+fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) =
   let
     val id = serial ();
     val task = Task id;
     val groups' = Inttab.cons_list (gid, task) groups;
     val jobs' = jobs
-      |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps;
+      |> IntGraph.new_node (id, (group, Job job)) |> fold (add_job task) deps;
   in (task, make_queue groups' jobs') end;
 
 fun depend deps task (Queue {groups, jobs}) =
@@ -81,7 +84,7 @@
 
 fun dequeue_if P (queue as Queue {groups, jobs}) =
   let
-    fun ready (id, ((group, Job (ok, job)), ([], _))) =
+    fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) =
           if P id then SOME (Task id, group, (fn () => job ok)) else NONE
       | ready _ = NONE;
   in
@@ -100,22 +103,30 @@
   in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
 
 
+(* sporadic interrupts *)
+
+fun interrupt_thread thread = Thread.interrupt thread handle Thread _ => ();
+
+fun interrupt (Queue {jobs, ...}) task =
+  (case try (get_job jobs) task of SOME (Running thread) => interrupt_thread thread | _ => ());
+
+fun interrupt_external queue str =
+  (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
+
+
 (* termination *)
 
-fun cancel (group as Group gid) (Queue {groups, jobs}) =
+fun cancel (Queue {groups, jobs}) (group as Group (gid, ok)) =
   let
+    val _ = ok := false;  (*invalidate any future group members*)
     val tasks = Inttab.lookup_list groups gid;
     val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks [];
-    val jobs' = fold (fn task =>
-        (case get_job jobs task of
-          Job (true, job) => map_job task (K (Job (false, job)))
-        | _ => I)) tasks jobs;
-    val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running;
-  in (null running, make_queue groups jobs') end;
+    val _ = List.app interrupt_thread running;
+  in null running end;
 
 fun finish (task as Task id) (Queue {groups, jobs}) =
   let
-    val Group gid = get_group jobs task;
+    val Group (gid, _) = get_group jobs task;
     val groups' = Inttab.remove_list (op =) (gid, task) groups;
     val jobs' = IntGraph.del_nodes [id] jobs;
   in make_queue groups' jobs' end;