# HG changeset patch # User wenzelm # Date 1279640142 -7200 # Node ID 047c96f414554023ea4645e178d997320f466cd7 # Parent 26906cacbaae5f51778ecd6dc192808c00eb60ef back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c); cancel passive tasks more actively via Exn.Interrupt, by treating them like ragular jobs here; attempts to re-assign canceled futures/promises raise Exn.Interrupt; tuned; diff -r 26906cacbaae -r 047c96f41455 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jul 20 16:42:48 2010 +0200 +++ b/src/Pure/Concurrent/future.ML Tue Jul 20 17:35:42 2010 +0200 @@ -106,9 +106,13 @@ fun assign_result group result res = let - val _ = Single_Assignment.assign result res; + val _ = Single_Assignment.assign result res + handle exn as Fail _ => + (case Single_Assignment.peek result of + SOME (Exn.Exn Exn.Interrupt) => raise Exn.Interrupt + | _ => reraise exn); val ok = - (case res of + (case the (Single_Assignment.peek result) of Exn.Exn exn => (Task_Queue.cancel_group group exn; false) | Exn.Result _ => true); in ok end; @@ -481,8 +485,9 @@ fun promise_group group : 'a future = let val result = Single_Assignment.var "promise" : 'a result; - val task = SYNCHRONIZED "enqueue" (fn () => - Unsynchronized.change_result queue (Task_Queue.enqueue_passive group)); + fun abort () = assign_result group result (Exn.Exn Exn.Interrupt) handle Fail _ => true; + val task = SYNCHRONIZED "enqueue_passive" (fn () => + Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort)); in Future {promised = true, task = task, group = group, result = result} end; fun promise () = promise_group (new_group ()); @@ -509,8 +514,10 @@ else interruptible f x; (*cancel: present and future group members will be interrupted eventually*) -fun cancel_group group = - SYNCHRONIZED "cancel" (fn () => if cancel_now group then () else cancel_later group); +fun cancel_group group = SYNCHRONIZED "cancel" (fn () => + (if cancel_now group then () else cancel_later group; + signal work_available; scheduler_check ())); + fun cancel x = cancel_group (group_of x); diff -r 26906cacbaae -r 047c96f41455 src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Jul 20 16:42:48 2010 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Tue Jul 20 17:35:42 2010 +0200 @@ -24,7 +24,7 @@ val status: queue -> {ready: int, pending: int, running: int, passive: int} val cancel: queue -> group -> bool val cancel_all: queue -> group list - val enqueue_passive: group -> queue -> task * queue + val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue val extend: task -> (bool -> bool) -> queue -> queue option val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue @@ -88,8 +88,6 @@ not (null (Synchronized.value status)) orelse (case parent of NONE => false | SOME group => is_canceled group); -fun is_ready deps group = null deps orelse is_canceled group; - fun group_status (Group {parent, status, ...}) = Synchronized.value status @ (case parent of NONE => [] | SOME group => group_status group); @@ -105,7 +103,7 @@ datatype job = Job of (bool -> bool) list | Running of Thread.thread | - Passive; + Passive of unit -> bool; type jobs = (group * job) Task_Graph.T; @@ -135,9 +133,21 @@ val empty = make_queue Inttab.empty Task_Graph.empty; + +(* job status *) + +fun ready_job task ((group, Job list), ([], _)) = SOME (task, group, rev list) + | ready_job task ((group, Passive abort), ([], _)) = + if is_canceled group then SOME (task, group, [fn _ => abort ()]) + else NONE + | ready_job _ _ = NONE; + +fun active_job (_, Job _) = SOME () + | active_job (_, Running _) = SOME () + | active_job (group, Passive _) = if is_canceled group then SOME () else NONE; + fun all_passive (Queue {jobs, ...}) = - Task_Graph.get_first - ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none; + is_none (Task_Graph.get_first (active_job o #1 o #2) jobs); (* queue status *) @@ -145,11 +155,11 @@ fun status (Queue {jobs, ...}) = let val (x, y, z, w) = - Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) => + Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) => (case job of - Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w) + Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w) | Running _ => (x, y, z + 1, w) - | Passive => (x, y, z, w + 1))) + | Passive _ => (x, y, z, w + 1))) jobs (0, 0, 0, 0); in {ready = x, pending = y, running = z, passive = w} end; @@ -165,7 +175,7 @@ val _ = List.app Simple_Thread.interrupt running; in null running end; -fun cancel_all (Queue {groups, jobs}) = +fun cancel_all (Queue {jobs, ...}) = let fun cancel_job (group, job) (groups, running) = (cancel_group group Exn.Interrupt; @@ -179,12 +189,12 @@ (* enqueue *) -fun enqueue_passive group (Queue {groups, jobs}) = +fun enqueue_passive group abort (Queue {groups, jobs}) = let val task = new_task NONE; val groups' = groups |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group); - val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive)); + val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive abort)); in (task, make_queue groups' jobs') end; fun enqueue group deps pri job (Queue {groups, jobs}) = @@ -208,17 +218,11 @@ (* dequeue *) fun dequeue thread (queue as Queue {groups, jobs}) = - let - fun ready (task, ((group, Job list), (deps, _))) = - if is_ready deps group then SOME (task, group, rev list) else NONE - | ready _ = NONE; - in - (case Task_Graph.get_first ready jobs of - NONE => (NONE, queue) - | SOME (result as (task, _, _)) => - let val jobs' = set_job task (Running thread) jobs - in (SOME result, make_queue groups jobs') end) - end; + (case Task_Graph.get_first (uncurry ready_job) jobs of + NONE => (NONE, queue) + | SOME (result as (task, _, _)) => + let val jobs' = set_job task (Running thread) jobs + in (SOME result, make_queue groups jobs') end); (* dequeue_towards -- adhoc dependencies *) @@ -228,13 +232,7 @@ fun dequeue_towards thread deps (queue as Queue {groups, jobs}) = let - fun ready task = - (case Task_Graph.get_node jobs task of - (group, Job list) => - if is_ready (get_deps jobs task) group - then SOME (task, group, rev list) - else NONE - | _ => NONE); + fun ready task = ready_job task (Task_Graph.get_entry jobs task); val tasks = filter (can (Task_Graph.get_node jobs)) deps; fun result (res as (task, _, _)) = let val jobs' = set_job task (Running thread) jobs