src/Pure/Concurrent/future.ML
changeset 33407 1427333220bc
parent 33406 1ddcb8472bd2
child 33408 a69ddd7dce95
equal deleted inserted replaced
33406:1ddcb8472bd2 33407:1427333220bc
   203     val _ = active := false;
   203     val _ = active := false;
   204     val _ = wait cond;
   204     val _ = wait cond;
   205     val _ = active := true;
   205     val _ = active := true;
   206   in () end;
   206   in () end;
   207 
   207 
   208 fun worker_next () = (*requires SYNCHRONIZED*)
   208 fun worker_next has_work = (*requires SYNCHRONIZED*)
   209   if length (! workers) > ! max_workers then
   209   if length (! workers) > ! max_workers then
   210     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   210     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   211      broadcast scheduler_event;
   211      broadcast scheduler_event;
       
   212      if has_work then signal work_available else ();
   212      NONE)
   213      NONE)
   213   else if count_active () > ! max_active then
   214   else if count_active () > ! max_active then
   214     (worker_wait scheduler_event; worker_next ())
   215     (if has_work then signal work_available else ();
       
   216      worker_wait scheduler_event;
       
   217      worker_next false)
   215   else
   218   else
   216     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   219     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   217       NONE => (worker_wait work_available; worker_next ())
   220       NONE => (worker_wait work_available; worker_next true)
   218     | some => some);
   221     | some => some);
   219 
   222 
   220 fun worker_loop name =
   223 fun worker_loop name =
   221   (case SYNCHRONIZED name (fn () => worker_next ()) of
   224   (case SYNCHRONIZED name (fn () => worker_next false) of
   222     NONE => ()
   225     NONE => ()
   223   | SOME work => (execute name work; worker_loop name));
   226   | SOME work => (execute name work; worker_loop name));
   224 
   227 
   225 fun worker_start name =
   228 fun worker_start name = (*requires SYNCHRONIZED*)
   226   SimpleThread.fork false (fn () =>
   229   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
   227    (SYNCHRONIZED name (fn () =>
   230     Unsynchronized.ref true));
   228       Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
       
   229     broadcast scheduler_event;
       
   230     worker_loop name));
       
   231 
   231 
   232 
   232 
   233 (* scheduler *)
   233 (* scheduler *)
   234 
   234 
   235 val last_status = Unsynchronized.ref Time.zeroTime;
   235 val status_ticks = Unsynchronized.ref 0;
   236 val next_status = Time.fromMilliseconds 500;
   236 
       
   237 val last_round = Unsynchronized.ref Time.zeroTime;
   237 val next_round = Time.fromMilliseconds 50;
   238 val next_round = Time.fromMilliseconds 50;
   238 
   239 
   239 fun scheduler_next () = (*requires SYNCHRONIZED*)
   240 fun scheduler_next () = (*requires SYNCHRONIZED*)
   240   let
   241   let
       
   242     val now = Time.now ();
       
   243     val tick = Time.<= (Time.+ (! last_round, next_round), now);
       
   244     val _ = if tick then last_round := now else ();
       
   245 
   241     (*queue and worker status*)
   246     (*queue and worker status*)
   242     val _ =
   247     val _ =
   243       let val now = Time.now () in
   248       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   244         if Time.> (Time.+ (! last_status, next_status), now) then ()
   249     val _ =
   245         else
   250       if tick andalso ! status_ticks = 0 then
   246          (last_status := now; Multithreading.tracing 1 (fn () =>
   251         Multithreading.tracing 1 (fn () =>
   247             let
   252           let
   248               val {ready, pending, running} = Task_Queue.status (! queue);
   253             val {ready, pending, running} = Task_Queue.status (! queue);
   249               val total = length (! workers);
   254             val total = length (! workers);
   250               val active = count_active ();
   255             val active = count_active ();
   251             in
   256           in
   252               "SCHEDULE " ^ Time.toString now ^ ": " ^
   257             "SCHEDULE " ^ Time.toString now ^ ": " ^
   253                 string_of_int ready ^ " ready, " ^
   258               string_of_int ready ^ " ready, " ^
   254                 string_of_int pending ^ " pending, " ^
   259               string_of_int pending ^ " pending, " ^
   255                 string_of_int running ^ " running; " ^
   260               string_of_int running ^ " running; " ^
   256                 string_of_int total ^ " workers, " ^
   261               string_of_int total ^ " workers, " ^
   257                 string_of_int active ^ " active"
   262               string_of_int active ^ " active "
   258             end))
   263           end)
   259       end;
   264       else ();
   260 
   265 
   261     (*worker threads*)
   266     (*worker threads*)
   262     val _ =
   267     val _ =
   263       if forall (Thread.isActive o #1) (! workers) then ()
   268       if forall (Thread.isActive o #1) (! workers) then ()
   264       else
   269       else
   272     val _ = max_active := m;
   277     val _ = max_active := m;
   273 
   278 
   274     val mm = if m = 9999 then 1 else m * 2;
   279     val mm = if m = 9999 then 1 else m * 2;
   275     val _ = max_workers := mm;
   280     val _ = max_workers := mm;
   276 
   281 
   277     val l = length (! workers);
   282     val missing = ! max_workers - length (! workers);
   278     val _ =
   283     val _ =
   279       if mm > l then
   284       if missing > 0 then
   280         funpow (mm - l) (fn () =>
   285        (funpow missing (fn () =>
   281           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   286           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
       
   287         broadcast scheduler_event)
   282       else ();
   288       else ();
   283 
   289 
   284     (*canceled groups*)
   290     (*canceled groups*)
   285     val _ =
   291     val _ =
   286       if null (! canceled) then ()
   292       if null (! canceled) then ()