# HG changeset patch # User wenzelm # Date 1248694349 -7200 # Node ID f5f46d6eb95b28f4a480154d5a40660b27254f7f # Parent 420108dd7dfe27ef97dc30db131139b0559721f2# Parent 572b923624963c478cfb2c464154440964598a32 merged diff -r 420108dd7dfe -r f5f46d6eb95b src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Mon Jul 27 09:01:13 2009 +0200 +++ b/src/Pure/Concurrent/future.ML Mon Jul 27 13:32:29 2009 +0200 @@ -11,7 +11,7 @@ processes. * Futures are grouped; failure of one group member causes the whole - group to be interrupted eventually. + group to be interrupted eventually. Groups are block-structured. * Forked futures are evaluated spontaneously by a farm of worker threads in the background; join resynchronizes the computation and @@ -46,7 +46,6 @@ val join: 'a future -> 'a val map: ('a -> 'b) -> 'a future -> 'b future val interruptible_task: ('a -> 'b) -> 'a -> 'b - val interrupt_task: string -> unit val cancel_group: group -> unit val cancel: 'a future -> unit val shutdown: unit -> unit @@ -114,20 +113,26 @@ (* synchronization *) +val scheduler_event = ConditionVar.conditionVar (); +val work_available = ConditionVar.conditionVar (); +val work_finished = ConditionVar.conditionVar (); + local val lock = Mutex.mutex (); - val cond = ConditionVar.conditionVar (); in fun SYNCHRONIZED name = SimpleThread.synchronized name lock; -fun wait () = (*requires SYNCHRONIZED*) +fun wait cond = (*requires SYNCHRONIZED*) ConditionVar.wait (cond, lock); -fun wait_timeout timeout = (*requires SYNCHRONIZED*) +fun wait_timeout cond timeout = (*requires SYNCHRONIZED*) ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout))); -fun notify_all () = (*requires SYNCHRONIZED*) +fun signal cond = (*requires SYNCHRONIZED*) + ConditionVar.signal cond; + +fun broadcast cond = (*requires SYNCHRONIZED*) ConditionVar.broadcast cond; end; @@ -183,29 +188,35 @@ val ok = setmp_thread_data (name, task, group) (fn () => fold (fn job => fn ok => job valid andalso ok) jobs true) (); val _ = SYNCHRONIZED "execute" (fn () => - (change queue (Task_Queue.finish task); - if ok then () - else if Task_Queue.cancel (! queue) group then () - else do_cancel group; - notify_all ())); + let + val maximal = change_result queue (Task_Queue.finish task); + val _ = + if ok then () + else if Task_Queue.cancel (! queue) group then () + else do_cancel group; + val _ = broadcast work_finished; + val _ = if maximal then () else broadcast work_available; + in () end); in () end; (* worker threads *) -fun worker_wait () = (*requires SYNCHRONIZED*) - (change_active false; wait (); change_active true); +fun worker_wait cond = (*requires SYNCHRONIZED*) + (change_active false; broadcast scheduler_event; + wait cond; + change_active true; broadcast scheduler_event); fun worker_next () = (*requires SYNCHRONIZED*) if ! excessive > 0 then (dec excessive; change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); - notify_all (); + broadcast scheduler_event; NONE) - else if overloaded () then (worker_wait (); worker_next ()) + else if overloaded () then (worker_wait scheduler_event; worker_next ()) else (case change_result queue Task_Queue.dequeue of - NONE => (worker_wait (); worker_next ()) + NONE => (worker_wait work_available; worker_next ()) | some => some); fun worker_loop name = @@ -231,37 +242,38 @@ end); (*worker threads*) - val ws = ! workers; val _ = - if forall (Thread.isActive o #1) ws then () + if forall (Thread.isActive o #1) (! workers) then () else - (case List.partition (Thread.isActive o #1) ws of + (case List.partition (Thread.isActive o #1) (! workers) of (_, []) => () - | (active, inactive) => - (workers := active; Multithreading.tracing 0 (fn () => - "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); + | (alive, dead) => + (workers := alive; Multithreading.tracing 0 (fn () => + "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads"))); val _ = trace_active (); val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); val mm = (m * 3) div 2; - val l = length ws; + val l = length (! workers); val _ = excessive := l - mm; val _ = if mm > l then - funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) () + (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) (); + broadcast scheduler_event) else (); (*canceled groups*) val _ = change canceled (filter_out (Task_Queue.cancel (! queue))); - (*shutdown*) - val continue = not (! do_shutdown andalso null ws); - val _ = if continue then () else scheduler := NONE; + val timeout = + Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50); + val _ = interruptible (fn () => wait_timeout scheduler_event timeout) () + handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue)); - val _ = notify_all (); - val _ = interruptible (fn () => - wait_timeout (Time.fromMilliseconds (if null (! canceled) then 1000 else 50))) () - handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue)); + (*shutdown*) + val continue = not (! do_shutdown andalso null (! workers)); + val _ = if continue then () else scheduler := NONE; + val _ = broadcast scheduler_event; in continue end; fun scheduler_loop () = @@ -272,7 +284,8 @@ fun scheduler_check name = SYNCHRONIZED name (fn () => if not (scheduler_active ()) then - (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop)) + (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop); + broadcast scheduler_event) else if ! do_shutdown then error "Scheduler shutdown in progress" else ()); @@ -292,7 +305,10 @@ | NONE => Task_Queue.new_group (worker_group ())); val (result, job) = future_job group e; val task = SYNCHRONIZED "future" (fn () => - change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ()); + let + val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job); + val _ = if minimal then signal work_available else (); + in task end); in Future {task = task, group = group, result = result} end; fun fork e = fork_future NONE [] 0 e; @@ -313,7 +329,7 @@ | SOME res => res); fun join_next deps = (*requires SYNCHRONIZED*) - if overloaded () then (worker_wait (); join_next deps) + if overloaded () then (worker_wait scheduler_event; join_next deps) else change_result queue (Task_Queue.dequeue_towards deps); fun join_deps deps = @@ -336,7 +352,7 @@ fun join_wait x = if SYNCHRONIZED "join_wait" (fn () => - is_finished x orelse (if worker then worker_wait () else wait (); false)) + is_finished x orelse ((if worker then worker_wait else wait) work_finished; false)) then () else join_wait x; val _ = xs |> List.app (fn x => @@ -383,14 +399,10 @@ (fn _ => f) x else interruptible f x; -(*interrupt: permissive signal, may get ignored*) -fun interrupt_task id = SYNCHRONIZED "interrupt" - (fn () => Task_Queue.interrupt_external (! queue) id); - -(*cancel: present and future group members will be interrupted eventually*) +(*cancel_group: present and future group members will be interrupted eventually*) fun cancel_group group = (scheduler_check "cancel check"; - SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ()))); + SYNCHRONIZED "cancel" (fn () => (do_cancel group; broadcast scheduler_event))); fun cancel x = cancel_group (group_of x); @@ -401,13 +413,13 @@ if Multithreading.available then (scheduler_check "shutdown check"; SYNCHRONIZED "shutdown" (fn () => - (while not (scheduler_active ()) do wait (); - while not (Task_Queue.is_empty (! queue)) do wait (); + (while not (scheduler_active ()) do wait scheduler_event; + while not (Task_Queue.is_empty (! queue)) do wait scheduler_event; do_shutdown := true; - notify_all (); - while not (null (! workers)) do wait (); - while scheduler_active () do wait (); - OS.Process.sleep (Time.fromMilliseconds 300)))) + while scheduler_active () do + (broadcast work_available; + broadcast scheduler_event; + wait scheduler_event)))) else (); diff -r 420108dd7dfe -r f5f46d6eb95b src/Pure/Concurrent/task_queue.ML --- a/src/Pure/Concurrent/task_queue.ML Mon Jul 27 09:01:13 2009 +0200 +++ b/src/Pure/Concurrent/task_queue.ML Mon Jul 27 13:32:29 2009 +0200 @@ -11,27 +11,25 @@ val pri_of_task: task -> int val str_of_task: task -> string type group + val new_group: group option -> group val group_id: group -> int val eq_group: group * group -> bool - val new_group: group option -> group + val cancel_group: group -> exn -> unit + val is_canceled: group -> bool val group_status: group -> exn list val str_of_group: group -> string type queue val empty: queue val is_empty: queue -> bool val status: queue -> {ready: int, pending: int, running: int} - val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> task * queue + val cancel: queue -> group -> bool + val cancel_all: queue -> group list + val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue val extend: task -> (bool -> bool) -> queue -> queue option val dequeue: queue -> (task * group * (bool -> bool) list) option * queue val dequeue_towards: task list -> queue -> (((task * group * (bool -> bool) list) * task list) option * queue) - val interrupt: queue -> task -> unit - val interrupt_external: queue -> string -> unit - val is_canceled: group -> bool - val cancel_group: group -> exn -> unit - val cancel: queue -> group -> bool - val cancel_all: queue -> group list - val finish: task -> queue -> queue + val finish: task -> queue -> bool * queue end; structure Task_Queue:> TASK_QUEUE = @@ -67,17 +65,19 @@ id :: (case parent of NONE => [] | SOME group => group_ancestry group); +(* group status *) + fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () => (case exn of Exn.Interrupt => if null (! status) then status := [exn] else () | _ => change status (cons exn))); +fun is_canceled (Group {parent, status, ...}) = (*non-critical*) + not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group); + fun group_status (Group {parent, status, ...}) = (*non-critical*) ! status @ (case parent of NONE => [] | SOME group => group_status group); -fun is_canceled (Group {parent, status, ...}) = (*non-critical*) - not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group); - fun str_of_group group = (is_canceled group ? enclose "(" ")") (string_of_int (group_id group)); @@ -162,13 +162,14 @@ |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps |> fold (fold (add_job task) o get_deps jobs) deps; + val minimal = null (get_deps jobs' task); val cache' = (case cache of Result last => if task_ord (last, task) = LESS then cache else Unknown | _ => Unknown); - in (task, make_queue groups' jobs' cache') end; + in ((task, minimal), make_queue groups' jobs' cache') end; fun extend task job (Queue {groups, jobs, cache}) = (case try (get_job jobs) task of @@ -225,23 +226,7 @@ end; -(* sporadic interrupts *) - -fun interrupt (Queue {jobs, ...}) task = - (case try (get_job jobs) task of - SOME (Running thread) => SimpleThread.interrupt thread - | _ => ()); - -fun interrupt_external (queue as Queue {jobs, ...}) str = - (case Int.fromString str of - SOME i => - (case Task_Graph.get_first NONE - (fn (task as Task (_, j), _) => if i = j then SOME task else NONE) jobs - of SOME task => interrupt queue task | NONE => ()) - | NONE => ()); - - -(* termination *) +(* finish *) fun finish task (Queue {groups, jobs, cache}) = let @@ -249,9 +234,8 @@ val groups' = groups |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group); val jobs' = Task_Graph.del_node task jobs; - val cache' = - if null (Task_Graph.imm_succs jobs task) then cache - else Unknown; - in make_queue groups' jobs' cache' end; + val maximal = null (Task_Graph.imm_succs jobs task); + val cache' = if maximal then cache else Unknown; + in (maximal, make_queue groups' jobs' cache') end; end;