equal
deleted
inserted
replaced
138 val scheduler = Unsynchronized.ref (NONE: Thread.thread option); |
138 val scheduler = Unsynchronized.ref (NONE: Thread.thread option); |
139 val canceled = Unsynchronized.ref ([]: group list); |
139 val canceled = Unsynchronized.ref ([]: group list); |
140 val do_shutdown = Unsynchronized.ref false; |
140 val do_shutdown = Unsynchronized.ref false; |
141 val max_workers = Unsynchronized.ref 0; |
141 val max_workers = Unsynchronized.ref 0; |
142 val max_active = Unsynchronized.ref 0; |
142 val max_active = Unsynchronized.ref 0; |
143 val worker_trend = Unsynchronized.ref 0; |
|
144 |
143 |
145 val status_ticks = Unsynchronized.ref 0; |
144 val status_ticks = Unsynchronized.ref 0; |
146 val last_round = Unsynchronized.ref Time.zeroTime; |
145 val last_round = Unsynchronized.ref Time.zeroTime; |
147 val next_round = seconds 0.05; |
146 val next_round = seconds 0.05; |
148 |
147 |
263 | SOME work => (worker_exec work; worker_loop name)); |
262 | SOME work => (worker_exec work; worker_loop name)); |
264 |
263 |
265 fun worker_start name = (*requires SYNCHRONIZED*) |
264 fun worker_start name = (*requires SYNCHRONIZED*) |
266 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), |
265 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), |
267 Unsynchronized.ref Working)) |
266 Unsynchronized.ref Working)) |
268 handle Fail msg => Multithreading.tracing 0 (fn () => msg); |
267 handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg); |
269 |
268 |
270 |
269 |
271 |
270 |
272 (* scheduler *) |
271 (* scheduler *) |
273 |
272 |
285 val _ = |
284 val _ = |
286 if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0 |
285 if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0 |
287 then report_status () else (); |
286 then report_status () else (); |
288 |
287 |
289 val _ = |
288 val _ = |
290 if forall (Thread.isActive o #1) (! workers) then () |
289 if not tick orelse forall (Thread.isActive o #1) (! workers) then () |
291 else |
290 else |
292 let |
291 let |
293 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); |
292 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); |
294 val _ = workers := alive; |
293 val _ = workers := alive; |
295 in |
294 in |
303 val max_active0 = ! max_active; |
302 val max_active0 = ! max_active; |
304 val max_workers0 = ! max_workers; |
303 val max_workers0 = ! max_workers; |
305 |
304 |
306 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
305 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
307 val _ = max_active := m; |
306 val _ = max_active := m; |
308 |
307 val _ = max_workers := 2 * m; |
309 val mm = |
|
310 if ! do_shutdown then 0 |
|
311 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m); |
|
312 val _ = |
|
313 if tick andalso mm > ! max_workers then |
|
314 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1) |
|
315 else if tick andalso mm < ! max_workers then |
|
316 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) |
|
317 else (); |
|
318 val _ = |
|
319 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then |
|
320 max_workers := mm |
|
321 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then |
|
322 max_workers := Int.min (mm, 2 * m) |
|
323 else (); |
|
324 |
308 |
325 val missing = ! max_workers - length (! workers); |
309 val missing = ! max_workers - length (! workers); |
326 val _ = |
310 val _ = |
327 if missing > 0 then |
311 if missing > 0 then |
328 funpow missing (fn () => |
312 funpow missing (fn () => |