# HG changeset patch # User wenzelm # Date 1221068668 -7200 # Node ID 0a2434cf38c9b0a2ff835d0e84402547a04898ec # Parent fbad2eb5be9eb76c8b8c85ef5c3f600ee5ac6ff9 cancel: invalidate group implicitly, via bool ref; job: moved ok flag into group; added interrupt, interrupt_external for tasks; diff -r fbad2eb5be9e -r 0a2434cf38c9 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Wed Sep 10 11:36:37 2008 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Wed Sep 10 19:44:28 2008 +0200 @@ -18,8 +18,10 @@ val depend: task list -> task -> queue -> queue val dequeue: queue -> (task * group * (unit -> bool)) option * queue val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue - val cancel: group -> queue -> bool * queue + val interrupt: queue -> task -> unit + val interrupt_external: queue -> string -> unit val finish: task -> queue -> queue + val cancel: queue -> group -> bool end; structure TaskQueue: TASK_QUEUE = @@ -28,18 +30,19 @@ (* identifiers *) datatype task = Task of serial; +fun str_of_task (Task i) = string_of_int i; -datatype group = Group of serial; -fun new_group () = Group (serial ()); +datatype group = Group of serial * bool ref; +fun new_group () = Group (serial (), ref true); -fun str_of_task (Task i) = string_of_int i; -fun str_of_group (Group i) = string_of_int i; +fun str_of_group (Group (i, ref ok)) = + if ok then string_of_int i else enclose "(" ")" (string_of_int i); (* jobs *) datatype job = - Job of bool * (bool -> bool) | + Job of bool -> bool | Running of Thread.thread; type jobs = (group * job) IntGraph.T; @@ -64,13 +67,13 @@ (* enqueue *) -fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) = +fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) = let val id = serial (); val task = Task id; val groups' = Inttab.cons_list (gid, task) groups; val jobs' = jobs - |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps; + |> IntGraph.new_node (id, (group, Job job)) |> fold (add_job task) deps; in (task, make_queue groups' jobs') end; fun depend deps task (Queue {groups, jobs}) = @@ -81,7 +84,7 @@ fun dequeue_if P (queue as Queue {groups, jobs}) = let - fun ready (id, ((group, Job (ok, job)), ([], _))) = + fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) = if P id then SOME (Task id, group, (fn () => job ok)) else NONE | ready _ = NONE; in @@ -100,22 +103,30 @@ in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end; +(* sporadic interrupts *) + +fun interrupt_thread thread = Thread.interrupt thread handle Thread _ => (); + +fun interrupt (Queue {jobs, ...}) task = + (case try (get_job jobs) task of SOME (Running thread) => interrupt_thread thread | _ => ()); + +fun interrupt_external queue str = + (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ()); + + (* termination *) -fun cancel (group as Group gid) (Queue {groups, jobs}) = +fun cancel (Queue {groups, jobs}) (group as Group (gid, ok)) = let + val _ = ok := false; (*invalidate any future group members*) val tasks = Inttab.lookup_list groups gid; val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks []; - val jobs' = fold (fn task => - (case get_job jobs task of - Job (true, job) => map_job task (K (Job (false, job))) - | _ => I)) tasks jobs; - val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running; - in (null running, make_queue groups jobs') end; + val _ = List.app interrupt_thread running; + in null running end; fun finish (task as Task id) (Queue {groups, jobs}) = let - val Group gid = get_group jobs task; + val Group (gid, _) = get_group jobs task; val groups' = Inttab.remove_list (op =) (gid, task) groups; val jobs' = IntGraph.del_nodes [id] jobs; in make_queue groups' jobs' end;