diff -r e25107ff4f56 -r 81d03a29980c src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Jul 21 20:37:31 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Tue Jul 21 20:37:31 2009 +0200 @@ -30,6 +30,7 @@ type task = Task_Queue.task type group = Task_Queue.group val is_worker: unit -> bool + val worker_group: unit -> Task_Queue.group option type 'a future val task_of: 'a future -> task val group_of: 'a future -> group @@ -40,7 +41,6 @@ val fork_group: group -> (unit -> 'a) -> 'a future val fork_deps: 'b future list -> (unit -> 'a) -> 'a future val fork_pri: int -> (unit -> 'a) -> 'a future - val fork_local: int -> (unit -> 'a) -> 'a future val join_results: 'a future list -> 'a Exn.result list val join_result: 'a future -> 'a Exn.result val join: 'a future -> 'a @@ -76,6 +76,7 @@ end; val is_worker = is_some o thread_data; +val worker_group = Option.map #3 o thread_data; (* datatype future *) @@ -93,7 +94,7 @@ fun value x = Future {task = Task_Queue.new_task 0, - group = Task_Queue.new_group (), + group = Task_Queue.new_group NONE, result = ref (SOME (Exn.Result x))}; @@ -172,7 +173,7 @@ fun execute name (task, group, jobs) = let val _ = trace_active (); - val valid = null (Task_Queue.group_exns group); + val valid = not (Task_Queue.is_canceled group); val ok = setmp_thread_data (name, task, group) (fn () => fold (fn job => fn ok => job valid andalso ok) jobs true) (); val _ = SYNCHRONIZED "execute" (fn () => @@ -279,7 +280,10 @@ let val _ = scheduler_check "future check"; - val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); + val group = + (case opt_group of + SOME group => group + | NONE => Task_Queue.new_group (worker_group ())); val (result, job) = future_job group e; val task = SYNCHRONIZED "future" (fn () => change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ()); @@ -289,7 +293,6 @@ fun fork_group group e = fork_future (SOME group) [] 0 e; fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e; fun fork_pri pri e = fork_future NONE [] pri e; -fun fork_local pri e = fork_future (Option.map #3 (thread_data ())) [] pri e; (* join *) @@ -298,9 +301,10 @@ fun get_result x = (case peek x of - SOME (Exn.Exn Exn.Interrupt) => Exn.Exn (Exn.EXCEPTIONS (Task_Queue.group_exns (group_of x))) - | SOME res => res - | NONE => Exn.Exn (SYS_ERROR "unfinished future")); + NONE => Exn.Exn (SYS_ERROR "unfinished future") + | SOME (Exn.Exn Exn.Interrupt) => + Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) + | SOME res => res); fun join_next deps = (*requires SYNCHRONIZED*) if overloaded () then (worker_wait (); join_next deps) @@ -345,7 +349,7 @@ val _ = scheduler_check "map_future check"; val task = task_of x; - val group = Task_Queue.new_group (); + val group = Task_Queue.new_group (SOME (group_of x)); val (result, job) = future_job group (fn () => f (join x)); val extended = SYNCHRONIZED "map_future" (fn () =>