# HG changeset patch # User wenzelm # Date 1312977885 -7200 # Node ID 058520fa03a8b4c5c36099767288dad943768b37 # Parent 7a44005dc2ec23021e46026c656fc0227b497642 tuned source structure; diff -r 7a44005dc2ec -r 058520fa03a8 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Aug 10 10:59:37 2011 +0200 +++ b/src/Pure/Concurrent/future.ML Wed Aug 10 14:04:45 2011 +0200 @@ -39,6 +39,8 @@ val task_of: 'a future -> Task_Queue.task val peek: 'a future -> 'a Exn.result option val is_finished: 'a future -> bool + val cancel_group: Task_Queue.group -> unit + val cancel: 'a future -> unit val forks: {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} -> (unit -> 'a) list -> 'a future list @@ -57,8 +59,6 @@ val fulfill_result: 'a future -> 'a Exn.result -> unit val fulfill: 'a future -> 'a -> unit val interruptible_task: ('a -> 'b) -> 'a -> 'b - val cancel_group: Task_Queue.group -> unit - val cancel: 'a future -> unit val shutdown: unit -> unit val status: (unit -> 'a) -> 'a end; @@ -74,8 +74,7 @@ val tag = Universal.tag () : Task_Queue.task option Universal.tag; in fun worker_task () = the_default NONE (Thread.getLocal tag); - fun setmp_worker_task data f x = - Library.setmp_thread_data tag (worker_task ()) (SOME data) f x; + fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x; end; val worker_group = Option.map Task_Queue.group_of_task o worker_task; @@ -107,19 +106,6 @@ fun peek x = Single_Assignment.peek (result_of x); fun is_finished x = is_some (peek x); -fun assign_result group result res = - let - val _ = Single_Assignment.assign result res - handle exn as Fail _ => - (case Single_Assignment.peek result of - SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) - | _ => reraise exn); - val ok = - (case the (Single_Assignment.peek result) of - Exn.Exn exn => (Task_Queue.cancel_group group exn; false) - | Exn.Res _ => true); - in ok end; - (** scheduling **) @@ -173,23 +159,16 @@ fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; -(* execute future jobs *) +(* cancellation primitives *) -fun future_job group (e: unit -> 'a) = - let - val result = Single_Assignment.var "future" : 'a result; - val pos = Position.thread_data (); - fun job ok = - let - val res = - if ok then - Exn.capture (fn () => - Multithreading.with_attributes Multithreading.private_interrupts - (fn _ => Position.setmp_thread_data pos e ()) before - Multithreading.interrupted ()) () - else Exn.interrupt_exn; - in assign_result group result res end; - in (result, job) end; +fun interruptible_task f x = + if Multithreading.available then + Multithreading.with_attributes + (if is_some (worker_task ()) + then Multithreading.private_interrupts + else Multithreading.public_interrupts) + (fn _ => f x) + else interruptible f x; fun cancel_now group = (*requires SYNCHRONIZED*) Task_Queue.cancel (! queue) group; @@ -198,7 +177,10 @@ (Unsynchronized.change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event); -fun execute (task, jobs) = + +(* worker threads *) + +fun worker_exec (task, jobs) = let val group = Task_Queue.group_of_task task; val valid = not (Task_Queue.is_canceled group); @@ -224,9 +206,6 @@ in () end); in () end; - -(* worker threads *) - fun worker_wait active cond = (*requires SYNCHRONIZED*) let val state = @@ -253,7 +232,7 @@ fun worker_loop name = (case SYNCHRONIZED name (fn () => worker_next ()) of NONE => () - | SOME work => (Exn.capture Multithreading.interrupted (); execute work; worker_loop name)); + | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name)); fun worker_start name = (*requires SYNCHRONIZED*) Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), @@ -397,6 +376,48 @@ (** futures **) +(* cancellation *) + +(*cancel: present and future group members will be interrupted eventually*) +fun cancel_group group = SYNCHRONIZED "cancel" (fn () => + (if cancel_now group then () else cancel_later group; + signal work_available; scheduler_check ())); + +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); + + +(* future jobs *) + +fun assign_result group result res = + let + val _ = Single_Assignment.assign result res + handle exn as Fail _ => + (case Single_Assignment.peek result of + SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) + | _ => reraise exn); + val ok = + (case the (Single_Assignment.peek result) of + Exn.Exn exn => (Task_Queue.cancel_group group exn; false) + | Exn.Res _ => true); + in ok end; + +fun future_job group (e: unit -> 'a) = + let + val result = Single_Assignment.var "future" : 'a result; + val pos = Position.thread_data (); + fun job ok = + let + val res = + if ok then + Exn.capture (fn () => + Multithreading.with_attributes Multithreading.private_interrupts + (fn _ => Position.setmp_thread_data pos e ()) before + Multithreading.interrupted ()) () + else Exn.interrupt_exn; + in assign_result group result res end; + in (result, job) end; + + (* fork *) fun forks {name, group, deps, pri} es = @@ -452,7 +473,8 @@ | (SOME work, deps') => SOME (work, deps')); fun execute_work NONE = () - | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps') + | execute_work (SOME (work, deps')) = + (worker_joining (fn () => worker_exec work); join_work deps') and join_work deps = Multithreading.with_attributes Multithreading.no_interrupts (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps))); @@ -475,7 +497,7 @@ fun join x = Exn.release (join_result x); -(* fast-path versions -- bypassing full task management *) +(* fast-path versions -- bypassing task queue *) fun value (x: 'a) = let @@ -538,7 +560,7 @@ SYNCHRONIZED "fulfill_result" (fn () => Unsynchronized.change_result queue (Task_Queue.dequeue_passive (Thread.self ()) task)); - in if still_passive then execute (task, [job]) else () end); + in if still_passive then worker_exec (task, [job]) else () end); val _ = if is_some (Single_Assignment.peek result) then () else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); @@ -547,25 +569,6 @@ fun fulfill x res = fulfill_result x (Exn.Res res); -(* cancellation *) - -fun interruptible_task f x = - if Multithreading.available then - Multithreading.with_attributes - (if is_some (worker_task ()) - then Multithreading.private_interrupts - else Multithreading.public_interrupts) - (fn _ => f x) - else interruptible f x; - -(*cancel: present and future group members will be interrupted eventually*) -fun cancel_group group = SYNCHRONIZED "cancel" (fn () => - (if cancel_now group then () else cancel_later group; - signal work_available; scheduler_check ())); - -fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); - - (* shutdown *) fun shutdown () =