# HG changeset patch # User wenzelm # Date 1257290998 -3600 # Node ID 1427333220bc1c7d4f29070344cefa2bfb002bd7 # Parent 1ddcb8472bd244f17811a9c27fb49abe62c08b37 worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock); worker_start: back to non-self version; scheduler: status output based on ticks; diff -r 1ddcb8472bd2 -r 1427333220bc src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Nov 03 19:52:09 2009 +0100 +++ b/src/Pure/Concurrent/future.ML Wed Nov 04 00:29:58 2009 +0100 @@ -205,58 +205,63 @@ val _ = active := true; in () end; -fun worker_next () = (*requires SYNCHRONIZED*) +fun worker_next has_work = (*requires SYNCHRONIZED*) if length (! workers) > ! max_workers then (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); broadcast scheduler_event; + if has_work then signal work_available else (); NONE) else if count_active () > ! max_active then - (worker_wait scheduler_event; worker_next ()) + (if has_work then signal work_available else (); + worker_wait scheduler_event; + worker_next false) else (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of - NONE => (worker_wait work_available; worker_next ()) + NONE => (worker_wait work_available; worker_next true) | some => some); fun worker_loop name = - (case SYNCHRONIZED name (fn () => worker_next ()) of + (case SYNCHRONIZED name (fn () => worker_next false) of NONE => () | SOME work => (execute name work; worker_loop name)); -fun worker_start name = - SimpleThread.fork false (fn () => - (SYNCHRONIZED name (fn () => - Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true))); - broadcast scheduler_event; - worker_loop name)); +fun worker_start name = (*requires SYNCHRONIZED*) + Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name), + Unsynchronized.ref true)); (* scheduler *) -val last_status = Unsynchronized.ref Time.zeroTime; -val next_status = Time.fromMilliseconds 500; +val status_ticks = Unsynchronized.ref 0; + +val last_round = Unsynchronized.ref Time.zeroTime; val next_round = Time.fromMilliseconds 50; fun scheduler_next () = (*requires SYNCHRONIZED*) let + val now = Time.now (); + val tick = Time.<= (Time.+ (! last_round, next_round), now); + val _ = if tick then last_round := now else (); + (*queue and worker status*) val _ = - let val now = Time.now () in - if Time.> (Time.+ (! last_status, next_status), now) then () - else - (last_status := now; Multithreading.tracing 1 (fn () => - let - val {ready, pending, running} = Task_Queue.status (! queue); - val total = length (! workers); - val active = count_active (); - in - "SCHEDULE " ^ Time.toString now ^ ": " ^ - string_of_int ready ^ " ready, " ^ - string_of_int pending ^ " pending, " ^ - string_of_int running ^ " running; " ^ - string_of_int total ^ " workers, " ^ - string_of_int active ^ " active" - end)) - end; + if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); + val _ = + if tick andalso ! status_ticks = 0 then + Multithreading.tracing 1 (fn () => + let + val {ready, pending, running} = Task_Queue.status (! queue); + val total = length (! workers); + val active = count_active (); + in + "SCHEDULE " ^ Time.toString now ^ ": " ^ + string_of_int ready ^ " ready, " ^ + string_of_int pending ^ " pending, " ^ + string_of_int running ^ " running; " ^ + string_of_int total ^ " workers, " ^ + string_of_int active ^ " active " + end) + else (); (*worker threads*) val _ = @@ -274,11 +279,12 @@ val mm = if m = 9999 then 1 else m * 2; val _ = max_workers := mm; - val l = length (! workers); + val missing = ! max_workers - length (! workers); val _ = - if mm > l then - funpow (mm - l) (fn () => - ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () + if missing > 0 then + (funpow missing (fn () => + ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) (); + broadcast scheduler_event) else (); (*canceled groups*)