# HG changeset patch # User wenzelm # Date 1221068669 -7200 # Node ID 9e5f556409c637c4c22089f71d6f78e177bec619 # Parent 0a2434cf38c9b0a2ff835d0e84402547a04898ec future: allow explicit group; cancel: invalidate group identifier for all future members; tuned comments; tuned; diff -r 0a2434cf38c9 -r 9e5f556409c6 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Sep 10 19:44:28 2008 +0200 +++ b/src/Pure/Concurrent/future.ML Wed Sep 10 19:44:29 2008 +0200 @@ -13,7 +13,7 @@ val task_of: 'a T -> task val group_of: 'a T -> group val shutdown_request: unit -> unit - val future: bool -> task list -> (unit -> 'a) -> 'a T + val future: group option -> task list -> (unit -> 'a) -> 'a T val fork: (unit -> 'a) -> 'a T val cancel: 'a T -> unit val join_all: 'a T list -> 'a list @@ -111,7 +111,7 @@ val _ = SYNCHRONIZED (fn () => (change queue (TaskQueue.finish task); if ok then () - else if change_result queue (TaskQueue.cancel group) then () + else if TaskQueue.cancel (! queue) group then () else cancel_request group; notify_all ())); in () end; @@ -148,41 +148,45 @@ (* scheduler *) -fun scheduler_fork () = SYNCHRONIZED (fn () => +fun scheduler_fork shutdown = SYNCHRONIZED (fn () => let val _ = trace_active (); - val m = Multithreading.max_threads_value (); + val _ = + (case List.partition Thread.isActive (! workers) of + (_, []) => () + | (active, inactive) => + (workers := active; Multithreading.tracing 0 (fn () => + "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " inactive worker threads"))); + + val m = if shutdown then 0 else Multithreading.max_threads_value (); val l = length (! workers); val _ = excessive := l - m; - in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end); + val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1); + in null (! workers) end); -fun scheduler_loop canceled = - let - val canceled' = SYNCHRONIZED (fn () => - filter_out (change_result queue o TaskQueue.cancel) canceled); - val _ = scheduler_fork (); - in - (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of - SOME Shutdown => () (* FIXME proper worker shutdown *) - | SOME (Cancel group) => scheduler_loop (group :: canceled') - | NONE => scheduler_loop canceled') - end; +fun scheduler_loop (shutdown, canceled) = + if scheduler_fork shutdown then () + else + let val canceled' = SYNCHRONIZED (fn () => filter_out (TaskQueue.cancel (! queue)) canceled) in + (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of + SOME Shutdown => scheduler_loop (true, canceled') + | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled') + | NONE => scheduler_loop (shutdown, canceled')) + end; -fun check_scheduler () = SYNCHRONIZED (fn () => +fun scheduler_check () = SYNCHRONIZED (fn () => if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () - else scheduler := SOME (Thread.fork (fn () => scheduler_loop [], Multithreading.no_interrupts))); + else scheduler := + SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts))); -(* future values *) +(* future values: fork independent computation *) -fun future new_group deps (e: unit -> 'a) = +fun future opt_group deps (e: unit -> 'a) = let - val _ = check_scheduler (); + val _ = scheduler_check (); - val group = - (case (new_group, thread_data ()) of - (false, SOME (_, group)) => group - | _ => TaskQueue.new_group ()); + val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ()); val result = ref (NONE: 'a Exn.result option); val run = Multithreading.with_attributes (Thread.getAttributes ()) @@ -194,16 +198,14 @@ change_result queue (TaskQueue.enqueue group deps run) before notify_all ()); in Future {task = task, group = group, result = result} end; -fun fork e = future false [] e; - -fun cancel x = (check_scheduler (); cancel_request (group_of x)); +fun fork e = future (Option.map #2 (thread_data ())) [] e; -(* join *) +(* join: retrieve results *) fun join_all xs = let - val _ = check_scheduler (); + val _ = scheduler_check (); fun unfinished () = xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); @@ -236,4 +238,13 @@ fun join x = singleton join_all x; + +(* termination *) + +(*cancel: present and future group members will be interrupted eventually*) +fun cancel x = (scheduler_check (); cancel_request (group_of x)); + +(*interrupt: adhoc signal, permissive, may get ignored*) +fun interrupt_task id = SYNCHRONIZED (fn () => TaskQueue.interrupt (! queue) id); + end;