diff -r 44a2e0db281f -r 73dde8006820 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Feb 02 13:44:40 2011 +0100 +++ b/src/Pure/Concurrent/future.ML Wed Feb 02 15:04:09 2011 +0100 @@ -32,19 +32,16 @@ signature FUTURE = sig - type task = Task_Queue.task - type group = Task_Queue.group - val is_worker: unit -> bool val worker_task: unit -> Task_Queue.task option val worker_group: unit -> Task_Queue.group option val worker_subgroup: unit -> Task_Queue.group type 'a future - val task_of: 'a future -> task - val group_of: 'a future -> group + val task_of: 'a future -> Task_Queue.task val peek: 'a future -> 'a Exn.result option val is_finished: 'a future -> bool - val forks: {name: string, group: group option, deps: task list, pri: int} -> - (unit -> 'a) list -> 'a future list + val forks: + {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} -> + (unit -> 'a) list -> 'a future list val fork_pri: int -> (unit -> 'a) -> 'a future val fork: (unit -> 'a) -> 'a future val join_results: 'a future list -> 'a Exn.result list @@ -52,12 +49,12 @@ val join: 'a future -> 'a val value: 'a -> 'a future val map: ('a -> 'b) -> 'a future -> 'b future - val promise_group: group -> 'a future + val promise_group: Task_Queue.group -> 'a future val promise: unit -> 'a future val fulfill_result: 'a future -> 'a Exn.result -> unit val fulfill: 'a future -> 'a -> unit val interruptible_task: ('a -> 'b) -> 'a -> 'b - val cancel_group: group -> unit + val cancel_group: Task_Queue.group -> unit val cancel: 'a future -> unit val shutdown: unit -> unit val status: (unit -> 'a) -> 'a @@ -70,20 +67,15 @@ (* identifiers *) -type task = Task_Queue.task; -type group = Task_Queue.group; - local - val tag = Universal.tag () : (task * group) option Universal.tag; + val tag = Universal.tag () : Task_Queue.task option Universal.tag; in - fun thread_data () = the_default NONE (Thread.getLocal tag); - fun setmp_thread_data data f x = - Library.setmp_thread_data tag (thread_data ()) (SOME data) f x; + fun worker_task () = the_default NONE (Thread.getLocal tag); + fun setmp_worker_task data f x = + Library.setmp_thread_data tag (worker_task ()) (SOME data) f x; end; -val is_worker = is_some o thread_data; -val worker_task = Option.map #1 o thread_data; -val worker_group = Option.map #2 o thread_data; +val worker_group = Option.map Task_Queue.group_of_task o worker_task; fun worker_subgroup () = Task_Queue.new_group (worker_group ()); fun worker_joining e = @@ -103,12 +95,10 @@ datatype 'a future = Future of {promised: bool, - task: task, - group: group, + task: Task_Queue.task, result: 'a result}; fun task_of (Future {task, ...}) = task; -fun group_of (Future {group, ...}) = group; fun result_of (Future {result, ...}) = result; fun peek x = Single_Assignment.peek (result_of x); @@ -204,12 +194,13 @@ (Unsynchronized.change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event); -fun execute (task, group, jobs) = +fun execute (task, jobs) = let + val group = Task_Queue.group_of_task task; val valid = not (Task_Queue.is_canceled group); val ok = Task_Queue.running task (fn () => - setmp_thread_data (task, group) (fn () => + setmp_worker_task task (fn () => fold (fn job => fn ok => job valid andalso ok) jobs true) ()); val _ = Multithreading.tracing 1 (fn () => let @@ -416,7 +407,7 @@ let val (result, job) = future_job grp e; val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue; - val future = Future {promised = false, task = task, group = grp, result = result}; + val future = Future {promised = false, task = task, result = result}; in (future, (minimal orelse minimal', queue')) end; in SYNCHRONIZED "enqueue" (fn () => @@ -443,7 +434,7 @@ NONE => Exn.Exn (Fail "Unfinished future") | SOME res => if Exn.is_interrupt_exn res then - (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of + (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of [] => res | exns => Exn.Exn (Exn.EXCEPTIONS exns)) else res); @@ -470,7 +461,7 @@ if forall is_finished xs then () else if Multithreading.self_critical () then error "Cannot join future values within critical section" - else if is_some (thread_data ()) then + else if is_some (worker_task ()) then join_work (Task_Queue.init_deps (map task_of xs)) else List.app (ignore o Single_Assignment.await o result_of) xs; in map get_result xs end; @@ -485,15 +476,16 @@ fun value (x: 'a) = let - val group = Task_Queue.new_group NONE; + val task = Task_Queue.dummy_task (); + val group = Task_Queue.group_of_task task; val result = Single_Assignment.var "value" : 'a result; val _ = assign_result group result (Exn.Result x); - in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end; + in Future {promised = false, task = task, result = result} end; fun map_future f x = let val task = task_of x; - val group = Task_Queue.new_group (SOME (group_of x)); + val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task)); val (result, job) = future_job group (fn () => f (join x)); val extended = SYNCHRONIZED "extend" (fn () => @@ -501,7 +493,7 @@ SOME queue' => (queue := queue'; true) | NONE => false)); in - if extended then Future {promised = false, task = task, group = group, result = result} + if extended then Future {promised = false, task = task, result = result} else singleton (forks {name = "Future.map", group = SOME group, @@ -522,14 +514,15 @@ else reraise exn; val task = SYNCHRONIZED "enqueue_passive" (fn () => Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort)); - in Future {promised = true, task = task, group = group, result = result} end; + in Future {promised = true, task = task, result = result} end; fun promise () = promise_group (worker_subgroup ()); -fun fulfill_result (Future {promised, task, group, result}) res = +fun fulfill_result (Future {promised, task, result}) res = if not promised then raise Fail "Not a promised future" else let + val group = Task_Queue.group_of_task task; fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn); val _ = Multithreading.with_attributes Multithreading.no_interrupts (fn _ => @@ -538,7 +531,7 @@ SYNCHRONIZED "fulfill_result" (fn () => Unsynchronized.change_result queue (Task_Queue.dequeue_passive (Thread.self ()) task)); - in if still_passive then execute (task, group, [job]) else () end); + in if still_passive then execute (task, [job]) else () end); val _ = worker_waiting (Task_Queue.init_deps [task]) (fn () => Single_Assignment.await result); @@ -552,7 +545,7 @@ fun interruptible_task f x = if Multithreading.available then Multithreading.with_attributes - (if is_worker () + (if is_some (worker_task ()) then Multithreading.private_interrupts else Multithreading.public_interrupts) (fn _ => f x) @@ -563,7 +556,7 @@ (if cancel_now group then () else cancel_later group; signal work_available; scheduler_check ())); -fun cancel x = cancel_group (group_of x); +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); (* shutdown *)