# HG changeset patch # User wenzelm # Date 1257332309 -3600 # Node ID e351f4c1f18c688c54525b753d5e5839ab58d56a # Parent 0a1c0c1209ec97aa52939b07e8ada495f82d8d7c worker activity: distinguish between waiting (formerly active) and sleeping; tuned; diff -r 0a1c0c1209ec -r e351f4c1f18c src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Nov 04 11:37:06 2009 +0100 +++ b/src/Pure/Concurrent/future.ML Wed Nov 04 11:58:29 2009 +0100 @@ -99,18 +99,6 @@ (** scheduling **) -(* global state *) - -val queue = Unsynchronized.ref Task_Queue.empty; -val next = Unsynchronized.ref 0; -val max_workers = Unsynchronized.ref 0; -val max_active = Unsynchronized.ref 0; -val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list); -val scheduler = Unsynchronized.ref (NONE: Thread.thread option); -val canceled = Unsynchronized.ref ([]: Task_Queue.group list); -val do_shutdown = Unsynchronized.ref false; - - (* synchronization *) val scheduler_event = ConditionVar.conditionVar (); @@ -142,6 +130,23 @@ end; +(* global state *) + +val queue = Unsynchronized.ref Task_Queue.empty; +val next = Unsynchronized.ref 0; +val scheduler = Unsynchronized.ref (NONE: Thread.thread option); +val canceled = Unsynchronized.ref ([]: Task_Queue.group list); +val do_shutdown = Unsynchronized.ref false; +val max_workers = Unsynchronized.ref 0; +val max_active = Unsynchronized.ref 0; + +datatype worker_state = Working | Waiting | Sleeping; +val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); + +fun count_workers state = (*requires SYNCHRONIZED*) + fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; + + (* execute future jobs *) fun future_job group (e: unit -> 'a) = @@ -184,25 +189,17 @@ in () end; -(* worker activity *) - -fun count_active () = (*requires SYNCHRONIZED*) - fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0; - -fun find_active () = (*requires SYNCHRONIZED*) - (case AList.lookup Thread.equal (! workers) (Thread.self ()) of - SOME active => active - | NONE => raise Fail "Unregistered worker thread"); - - (* worker threads *) -fun worker_wait cond = (*requires SYNCHRONIZED*) +fun worker_wait active cond = (*requires SYNCHRONIZED*) let - val active = find_active (); - val _ = active := false; + val state = + (case AList.lookup Thread.equal (! workers) (Thread.self ()) of + SOME state => state + | NONE => raise Fail "Unregistered worker thread"); + val _ = state := (if active then Waiting else Sleeping); val _ = wait cond; - val _ = active := true; + val _ = state := Working; in () end; fun worker_next have_work = (*requires SYNCHRONIZED*) @@ -211,13 +208,13 @@ if have_work then signal work_available else (); broadcast scheduler_event; NONE) - else if count_active () > ! max_active then + else if count_workers Working > ! max_active then (if have_work then signal work_available else (); - worker_wait scheduler_event; + worker_wait false scheduler_event; worker_next false) else (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of - NONE => (worker_wait work_available; worker_next true) + NONE => (worker_wait true work_available; worker_next true) | some => some); fun worker_loop name = @@ -227,7 +224,7 @@ fun worker_start name = (*requires SYNCHRONIZED*) Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name), - Unsynchronized.ref true)); + Unsynchronized.ref Working)); (* scheduler *) @@ -252,14 +249,16 @@ let val {ready, pending, running} = Task_Queue.status (! queue); val total = length (! workers); - val active = count_active (); + val active = count_workers Working; + val waiting = count_workers Waiting; in "SCHEDULE " ^ Time.toString now ^ ": " ^ string_of_int ready ^ " ready, " ^ string_of_int pending ^ " pending, " ^ string_of_int running ^ " running; " ^ string_of_int total ^ " workers, " ^ - string_of_int active ^ " active " + string_of_int active ^ " active, " ^ + string_of_int waiting ^ " waiting " end) else (); @@ -373,7 +372,7 @@ else (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of (NONE, []) => NONE - | (NONE, deps') => (worker_wait work_finished; join_next deps') + | (NONE, deps') => (worker_wait true work_finished; join_next deps') | (SOME work, deps') => SOME (work, deps')); fun execute_work NONE = ()