# HG changeset patch # User wenzelm # Date 1221162833 -7200 # Node ID bcd48c6897d4a9ecdbd19ec5858080ed15212043 # Parent 17a81e48114289583b0b5243fa0b05fe378300cd eliminated requests, use global state variables uniformly; more robust shutdown; diff -r 17a81e481142 -r bcd48c6897d4 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Thu Sep 11 21:04:09 2008 +0200 +++ b/src/Pure/Concurrent/future.ML Thu Sep 11 21:53:53 2008 +0200 @@ -37,8 +37,8 @@ val join_results: 'a T list -> 'a Exn.result list val join: 'a T -> 'a val focus: task list -> unit + val interrupt_task: string -> unit val cancel: 'a T -> unit - val interrupt_task: string -> unit val shutdown: unit -> unit end; @@ -77,17 +77,9 @@ val queue = ref TaskQueue.empty; val workers = ref ([]: (Thread.thread * bool) list); val scheduler = ref (NONE: Thread.thread option); - val excessive = ref 0; - - -(* requests *) - -datatype request = Shutdown | Cancel of group; -val requests = Mailbox.create () : request Mailbox.T; - -fun shutdown_request () = Mailbox.send requests Shutdown; -fun cancel_request group = Mailbox.send requests (Cancel group); +val canceled = ref ([]: TaskQueue.group list); +val do_shutdown = ref false; (* synchronization *) @@ -108,11 +100,10 @@ in Exn.release result end) (); fun wait name = (*requires SYNCHRONIZED*) - let - val _ = Multithreading.tracing 4 (fn () => name ^ ": waiting"); - val _ = ConditionVar.wait (cond, lock); - val _ = Multithreading.tracing 4 (fn () => name ^ ": notified"); - in () end; + ConditionVar.wait (cond, lock); + +fun wait_timeout name timeout = (*requires SYNCHRONIZED*) + ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)); fun notify_all () = (*requires SYNCHRONIZED*) ConditionVar.broadcast cond; @@ -133,7 +124,7 @@ (change queue (TaskQueue.finish task); if ok then () else if TaskQueue.cancel (! queue) group then () - else cancel_request group; + else change canceled (cons group); notify_all ())); in () end; @@ -175,8 +166,9 @@ (* scheduler *) -fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () => +fun scheduler_next () = (*requires SYNCHRONIZED*) let + (*worker threads*) val _ = (case List.partition (Thread.isActive o #1) (! workers) of (_, []) => () @@ -184,37 +176,37 @@ (workers := active; Multithreading.tracing 0 (fn () => "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); - val m = if shutdown then 0 else Multithreading.max_threads_value (); + val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); val l = length (! workers); val _ = excessive := l - m; val _ = if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) () else (); - val terminate = shutdown andalso null (! workers); - val _ = if terminate then scheduler := NONE else (); - val _ = notify_all (); - in terminate end); + + (*canceled groups*) + val _ = change canceled (filter_out (TaskQueue.cancel (! queue))); + + (*shutdown*) + val continue = not (! do_shutdown andalso null (! workers)); + val _ = if continue then () else scheduler := NONE; -fun scheduler_loop (shutdown, canceled) = - if scheduler_fork shutdown then Multithreading.tracing 4 (fn () => "scheduler: exit") - else - let - val canceled' = SYNCHRONIZED "scheduler" - (fn () => filter_out (TaskQueue.cancel (! queue)) canceled); - in - (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of - SOME Shutdown => scheduler_loop (true, canceled') - | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled') - | NONE => scheduler_loop (shutdown, canceled')) - end; + val _ = notify_all (); + val _ = wait_timeout "scheduler" (Time.fromSeconds 1); + in continue end; + +fun scheduler_loop () = + (while SYNCHRONIZED "scheduler" scheduler_next do (); + Multithreading.tracing 4 (fn () => "scheduler: exit")); fun scheduler_active () = (*requires SYNCHRONIZED*) (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () => - if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () - else scheduler := - SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts))); + if not (scheduler_active ()) then + (do_shutdown := false; + scheduler := SOME (Thread.fork (scheduler_loop, Multithreading.no_interrupts))) + else if ! do_shutdown then error "Scheduler shutdown in progress" + else ()); (* future values: fork independent computation *) @@ -242,9 +234,9 @@ fun join_results xs = let + val _ = scheduler_check (); val _ = Multithreading.self_critical () andalso error "Cannot join future values within critical section"; - val _ = scheduler_check (); fun unfinished () = xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); @@ -279,20 +271,23 @@ fun focus tasks = SYNCHRONIZED "interrupt" (fn () => change queue (TaskQueue.focus tasks)); -(*cancel: present and future group members will be interrupted eventually*) -fun cancel x = (scheduler_check (); cancel_request (group_of x)); - (*interrupt: permissive signal, may get ignored*) fun interrupt_task id = SYNCHRONIZED "interrupt" (fn () => TaskQueue.interrupt_external (! queue) id); +(*cancel: present and future group members will be interrupted eventually*) +fun cancel x = + (scheduler_check (); SYNCHRONIZED "cancel" (fn () => change canceled (cons (group_of x)))); + + (*global join and shutdown*) fun shutdown () = (scheduler_check (); SYNCHRONIZED "shutdown" (fn () => - (while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join"; - shutdown_request (); + (while not (scheduler_active ()) do wait "shutdown: scheduler inactive"; + while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join"; + do_shutdown := true; while not (null (! workers)) do wait "shutdown: workers"; - while scheduler_active () do wait "shutdown: scheduler"))); + while scheduler_active () do wait "shutdown: scheduler still active"))); end;