# HG changeset patch # User wenzelm # Date 1248201451 -7200 # Node ID e25107ff4f566e4d3a75937d0e06da57bed82df3 # Parent 8ac6b1102f167cb51e9cd33728860e9da152d846 support for nested groups -- cancellation is propagated to peers and subgroups; added is_canceled; tuned; diff -r 8ac6b1102f16 -r e25107ff4f56 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Jul 21 20:24:02 2009 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Tue Jul 21 20:37:31 2009 +0200 @@ -13,8 +13,8 @@ type group val group_id: group -> int val eq_group: group * group -> bool - val new_group: unit -> group - val group_exns: group -> exn list + val new_group: group option -> group + val group_status: group -> exn list val str_of_group: group -> string type queue val empty: queue @@ -27,6 +27,7 @@ (((task * group * (bool -> bool) list) * task list) option * queue) val interrupt: queue -> task -> unit val interrupt_external: queue -> string -> unit + val is_canceled: group -> bool val cancel_group: group -> exn -> unit val cancel: queue -> group -> bool val cancel_all: queue -> group list @@ -48,19 +49,37 @@ structure Task_Graph = Graph(type key = task val ord = task_ord); -(* groups *) +(* nested groups *) + +datatype group = Group of + {parent: group option, + id: serial, + status: exn list ref}; -datatype group = Group of serial * exn list ref; +fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status}; -fun group_id (Group (gid, _)) = gid; -fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2; +fun new_group parent = make_group (parent, serial (), ref []); + +fun group_id (Group {id, ...}) = id; +fun eq_group (group1, group2) = group_id group1 = group_id group2; -fun new_group () = Group (serial (), ref []); +fun group_ancestry (Group {parent, id, ...}) = + id :: (case parent of NONE => [] | SOME group => group_ancestry group); + + +fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () => + (case exn of + Exn.Interrupt => if null (! status) then status := [exn] else () + | _ => change status (cons exn))); -fun group_exns (Group (_, ref exns)) = exns; +fun group_status (Group {parent, status, ...}) = (*non-critical*) + ! status @ (case parent of NONE => [] | SOME group => group_status group); -fun str_of_group (Group (i, ref exns)) = - if null exns then string_of_int i else enclose "(" ")" (string_of_int i); +fun is_canceled (Group {parent, status, ...}) = (*non-critical*) + not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group); + +fun str_of_group group = + (is_canceled group ? enclose "(" ")") (string_of_int (group_id group)); (* jobs *) @@ -94,7 +113,7 @@ fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs; -(* status *) +(* queue status *) fun status (Queue {jobs, ...}) = let @@ -107,14 +126,38 @@ in {ready = x, pending = y, running = z} end; +(* cancel -- peers and sub-groups *) + +fun cancel (Queue {groups, jobs, ...}) group = + let + val _ = cancel_group group Exn.Interrupt; + val tasks = Inttab.lookup_list groups (group_id group); + val running = + fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; + val _ = List.app SimpleThread.interrupt running; + in null running end; + +fun cancel_all (Queue {jobs, ...}) = + let + fun cancel_job (group, job) (groups, running) = + (cancel_group group Exn.Interrupt; + (case job of Running t => (insert eq_group group groups, insert Thread.equal t running) + | _ => (groups, running))); + val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); + val _ = List.app SimpleThread.interrupt running; + in groups end; + + (* enqueue *) -fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, cache}) = +fun enqueue group deps pri job (Queue {groups, jobs, cache}) = let val task = new_task pri; - val groups' = Inttab.cons_list (gid, task) groups; + val groups' = groups + |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); val jobs' = jobs - |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps; + |> Task_Graph.new_node (task, (group, Job [job])) + |> fold (add_job task) deps; val cache' = (case cache of Result last => @@ -158,7 +201,8 @@ fun ready task = (case Task_Graph.get_node jobs task of (group, Job list) => - if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list) + if null (Task_Graph.imm_preds jobs task) + then SOME (task, group, rev list) else NONE | _ => NONE); @@ -181,7 +225,9 @@ (* sporadic interrupts *) fun interrupt (Queue {jobs, ...}) task = - (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ()); + (case try (get_job jobs) task of + SOME (Running thread) => SimpleThread.interrupt thread + | _ => ()); fun interrupt_external (queue as Queue {jobs, ...}) str = (case Int.fromString str of @@ -194,33 +240,11 @@ (* termination *) -fun cancel_group (Group (_, r)) exn = CRITICAL (fn () => - (case exn of - Exn.Interrupt => if null (! r) then r := [exn] else () - | _ => change r (cons exn))); - -fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) = - let - val _ = cancel_group group Exn.Interrupt; - val tasks = Inttab.lookup_list groups gid; - val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; - val _ = List.app SimpleThread.interrupt running; - in null running end; - -fun cancel_all (Queue {jobs, ...}) = - let - fun cancel_job (group, job) (groups, running) = - (cancel_group group Exn.Interrupt; - (case job of Running t => (insert eq_group group groups, insert Thread.equal t running) - | _ => (groups, running))); - val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []); - val _ = List.app SimpleThread.interrupt running; - in groups end; - fun finish task (Queue {groups, jobs, cache}) = let - val Group (gid, _) = get_group jobs task; - val groups' = Inttab.remove_list (op =) (gid, task) groups; + val group = get_group jobs task; + val groups' = groups + |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group); val jobs' = Task_Graph.del_node task jobs; val cache' = if null (Task_Graph.imm_succs jobs task) then cache