203 val _ = active := false; |
203 val _ = active := false; |
204 val _ = wait cond; |
204 val _ = wait cond; |
205 val _ = active := true; |
205 val _ = active := true; |
206 in () end; |
206 in () end; |
207 |
207 |
208 fun worker_next () = (*requires SYNCHRONIZED*) |
208 fun worker_next has_work = (*requires SYNCHRONIZED*) |
209 if length (! workers) > ! max_workers then |
209 if length (! workers) > ! max_workers then |
210 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); |
210 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); |
211 broadcast scheduler_event; |
211 broadcast scheduler_event; |
|
212 if has_work then signal work_available else (); |
212 NONE) |
213 NONE) |
213 else if count_active () > ! max_active then |
214 else if count_active () > ! max_active then |
214 (worker_wait scheduler_event; worker_next ()) |
215 (if has_work then signal work_available else (); |
|
216 worker_wait scheduler_event; |
|
217 worker_next false) |
215 else |
218 else |
216 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of |
219 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of |
217 NONE => (worker_wait work_available; worker_next ()) |
220 NONE => (worker_wait work_available; worker_next true) |
218 | some => some); |
221 | some => some); |
219 |
222 |
220 fun worker_loop name = |
223 fun worker_loop name = |
221 (case SYNCHRONIZED name (fn () => worker_next ()) of |
224 (case SYNCHRONIZED name (fn () => worker_next false) of |
222 NONE => () |
225 NONE => () |
223 | SOME work => (execute name work; worker_loop name)); |
226 | SOME work => (execute name work; worker_loop name)); |
224 |
227 |
225 fun worker_start name = |
228 fun worker_start name = (*requires SYNCHRONIZED*) |
226 SimpleThread.fork false (fn () => |
229 Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name), |
227 (SYNCHRONIZED name (fn () => |
230 Unsynchronized.ref true)); |
228 Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true))); |
|
229 broadcast scheduler_event; |
|
230 worker_loop name)); |
|
231 |
231 |
232 |
232 |
233 (* scheduler *) |
233 (* scheduler *) |
234 |
234 |
235 val last_status = Unsynchronized.ref Time.zeroTime; |
235 val status_ticks = Unsynchronized.ref 0; |
236 val next_status = Time.fromMilliseconds 500; |
236 |
|
237 val last_round = Unsynchronized.ref Time.zeroTime; |
237 val next_round = Time.fromMilliseconds 50; |
238 val next_round = Time.fromMilliseconds 50; |
238 |
239 |
239 fun scheduler_next () = (*requires SYNCHRONIZED*) |
240 fun scheduler_next () = (*requires SYNCHRONIZED*) |
240 let |
241 let |
|
242 val now = Time.now (); |
|
243 val tick = Time.<= (Time.+ (! last_round, next_round), now); |
|
244 val _ = if tick then last_round := now else (); |
|
245 |
241 (*queue and worker status*) |
246 (*queue and worker status*) |
242 val _ = |
247 val _ = |
243 let val now = Time.now () in |
248 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); |
244 if Time.> (Time.+ (! last_status, next_status), now) then () |
249 val _ = |
245 else |
250 if tick andalso ! status_ticks = 0 then |
246 (last_status := now; Multithreading.tracing 1 (fn () => |
251 Multithreading.tracing 1 (fn () => |
247 let |
252 let |
248 val {ready, pending, running} = Task_Queue.status (! queue); |
253 val {ready, pending, running} = Task_Queue.status (! queue); |
249 val total = length (! workers); |
254 val total = length (! workers); |
250 val active = count_active (); |
255 val active = count_active (); |
251 in |
256 in |
252 "SCHEDULE " ^ Time.toString now ^ ": " ^ |
257 "SCHEDULE " ^ Time.toString now ^ ": " ^ |
253 string_of_int ready ^ " ready, " ^ |
258 string_of_int ready ^ " ready, " ^ |
254 string_of_int pending ^ " pending, " ^ |
259 string_of_int pending ^ " pending, " ^ |
255 string_of_int running ^ " running; " ^ |
260 string_of_int running ^ " running; " ^ |
256 string_of_int total ^ " workers, " ^ |
261 string_of_int total ^ " workers, " ^ |
257 string_of_int active ^ " active" |
262 string_of_int active ^ " active " |
258 end)) |
263 end) |
259 end; |
264 else (); |
260 |
265 |
261 (*worker threads*) |
266 (*worker threads*) |
262 val _ = |
267 val _ = |
263 if forall (Thread.isActive o #1) (! workers) then () |
268 if forall (Thread.isActive o #1) (! workers) then () |
264 else |
269 else |