# HG changeset patch # User wenzelm # Date 1248182722 -7200 # Node ID 5382c93108db01d7659f77c82580041f0498c3e0 # Parent c1e280ab4746fefa6125170474a0ef310ca25107 propagate exceptions within future groups; Future.map: inherit group; diff -r c1e280ab4746 -r 5382c93108db src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jul 21 13:46:18 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Tue Jul 21 15:25:22 2009 +0200 @@ -151,7 +151,20 @@ count_active (! workers) > Multithreading.max_threads_value (); -(* execute jobs *) +(* execute future jobs *) + +fun future_job group (e: unit -> 'a) = + let + val result = ref (NONE: 'a Exn.result option); + val job = Multithreading.with_attributes Multithreading.restricted_interrupts + (fn _ => fn ok => + let + val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt; + val _ = result := SOME res; + val _ = (case res of Exn.Exn exn => Task_Queue.cancel_group group exn | _ => ()); + val res_ok = is_some (Exn.get_result res); + in res_ok end); + in (result, job) end; fun do_cancel group = (*requires SYNCHRONIZED*) change canceled (insert Task_Queue.eq_group group); @@ -159,7 +172,7 @@ fun execute name (task, group, jobs) = let val _ = trace_active (); - val valid = Task_Queue.is_valid group; + val valid = null (Task_Queue.group_exns group); val ok = setmp_thread_data (name, task, group) (fn () => fold (fn job => fn ok => job valid andalso ok) jobs true) (); val _ = SYNCHRONIZED "execute" (fn () => @@ -260,25 +273,6 @@ (** futures **) -(* future job: fill result *) - -fun future_job group (e: unit -> 'a) = - let - val result = ref (NONE: 'a Exn.result option); - val job = Multithreading.with_attributes Multithreading.restricted_interrupts - (fn _ => fn ok => - let - val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt; - val _ = result := SOME res; - val res_ok = - (case res of - Exn.Result _ => true - | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true) - | _ => false); - in res_ok end); - in (result, job) end; - - (* fork *) fun fork_future opt_group deps pri e = @@ -302,7 +296,11 @@ local -fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x); +fun get_result x = + (case peek x of + SOME (Exn.Exn Exn.Interrupt) => Exn.Exn (Exn.EXCEPTIONS (Task_Queue.group_exns (group_of x))) + | SOME res => res + | NONE => Exn.Exn (SYS_ERROR "unfinished future")); fun join_next deps = (*requires SYNCHRONIZED*) if overloaded () then (worker_wait (); join_next deps) @@ -356,7 +354,7 @@ | NONE => false)); in if extended then Future {task = task, group = group, result = result} - else fork_future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) + else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end; diff -r c1e280ab4746 -r 5382c93108db src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Tue Jul 21 13:46:18 2009 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Tue Jul 21 15:25:22 2009 +0200 @@ -14,8 +14,7 @@ val group_id: group -> int val eq_group: group * group -> bool val new_group: unit -> group - val is_valid: group -> bool - val invalidate_group: group -> unit + val group_exns: group -> exn list val str_of_group: group -> string type queue val empty: queue @@ -28,6 +27,7 @@ (((task * group * (bool -> bool) list) * task list) option * queue) val interrupt: queue -> task -> unit val interrupt_external: queue -> string -> unit + val cancel_group: group -> exn -> unit val cancel: queue -> group -> bool val cancel_all: queue -> group list val finish: task -> queue -> queue @@ -50,18 +50,17 @@ (* groups *) -datatype group = Group of serial * bool ref; +datatype group = Group of serial * exn list ref; fun group_id (Group (gid, _)) = gid; fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2; -fun new_group () = Group (serial (), ref true); +fun new_group () = Group (serial (), ref []); -fun is_valid (Group (_, ref ok)) = ok; -fun invalidate_group (Group (_, ok)) = ok := false; +fun group_exns (Group (_, ref exns)) = exns; -fun str_of_group (Group (i, ref ok)) = - if ok then string_of_int i else enclose "(" ")" (string_of_int i); +fun str_of_group (Group (i, ref exns)) = + if null exns then string_of_int i else enclose "(" ")" (string_of_int i); (* jobs *) @@ -195,9 +194,14 @@ (* termination *) +fun cancel_group (Group (_, r)) exn = CRITICAL (fn () => + (case exn of + Exn.Interrupt => if null (! r) then r := [exn] else () + | _ => change r (cons exn))); + fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) = let - val _ = invalidate_group group; + val _ = cancel_group group Exn.Interrupt; val tasks = Inttab.lookup_list groups gid; val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks []; val _ = List.app SimpleThread.interrupt running; @@ -206,7 +210,7 @@ fun cancel_all (Queue {jobs, ...}) = let fun cancel_job (group, job) (groups, running) = - (invalidate_group group; + (cancel_group group Exn.Interrupt; (case job of Running t => (insert eq_group group groups, insert Thread.equal t running) | _ => (groups, running))); val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);