49 val worker_subgroup: unit -> group |
49 val worker_subgroup: unit -> group |
50 type 'a future |
50 type 'a future |
51 val task_of: 'a future -> task |
51 val task_of: 'a future -> task |
52 val peek: 'a future -> 'a Exn.result option |
52 val peek: 'a future -> 'a Exn.result option |
53 val is_finished: 'a future -> bool |
53 val is_finished: 'a future -> bool |
|
54 val ML_statistics: bool Unsynchronized.ref |
54 val interruptible_task: ('a -> 'b) -> 'a -> 'b |
55 val interruptible_task: ('a -> 'b) -> 'a -> 'b |
55 val cancel_group: group -> unit |
56 val cancel_group: group -> unit |
56 val cancel: 'a future -> unit |
57 val cancel: 'a future -> unit |
57 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool} |
58 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool} |
58 val default_params: params |
59 val default_params: params |
167 val do_shutdown = Unsynchronized.ref false; |
168 val do_shutdown = Unsynchronized.ref false; |
168 val max_workers = Unsynchronized.ref 0; |
169 val max_workers = Unsynchronized.ref 0; |
169 val max_active = Unsynchronized.ref 0; |
170 val max_active = Unsynchronized.ref 0; |
170 val worker_trend = Unsynchronized.ref 0; |
171 val worker_trend = Unsynchronized.ref 0; |
171 |
172 |
|
173 val status_ticks = Unsynchronized.ref 0; |
|
174 val last_round = Unsynchronized.ref Time.zeroTime; |
|
175 val next_round = seconds 0.05; |
|
176 |
172 datatype worker_state = Working | Waiting | Sleeping; |
177 datatype worker_state = Working | Waiting | Sleeping; |
173 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); |
178 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); |
174 |
179 |
175 fun count_workers state = (*requires SYNCHRONIZED*) |
180 fun count_workers state = (*requires SYNCHRONIZED*) |
176 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; |
181 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; |
|
182 |
|
183 |
|
184 |
|
185 (* status *) |
|
186 |
|
187 val ML_statistics = Unsynchronized.ref false; |
|
188 |
|
189 fun report_status () = (*requires SYNCHRONIZED*) |
|
190 if ! ML_statistics then |
|
191 let |
|
192 val {ready, pending, running, passive} = Task_Queue.status (! queue); |
|
193 val total = length (! workers); |
|
194 val active = count_workers Working; |
|
195 val waiting = count_workers Waiting; |
|
196 val stats = |
|
197 [("now", signed_string_of_real (Time.toReal (Time.now ()))), |
|
198 ("tasks_ready", Markup.print_int ready), |
|
199 ("tasks_pending", Markup.print_int pending), |
|
200 ("tasks_running", Markup.print_int running), |
|
201 ("tasks_passive", Markup.print_int passive), |
|
202 ("workers_total", Markup.print_int total), |
|
203 ("workers_active", Markup.print_int active), |
|
204 ("workers_waiting", Markup.print_int waiting)] @ |
|
205 ML_Statistics.get (); |
|
206 in Output.protocol_message (Markup.ML_statistics @ stats) "" end |
|
207 else (); |
177 |
208 |
178 |
209 |
179 (* cancellation primitives *) |
210 (* cancellation primitives *) |
180 |
211 |
181 fun cancel_now group = (*requires SYNCHRONIZED*) |
212 fun cancel_now group = (*requires SYNCHRONIZED*) |
269 Unsynchronized.ref Working)); |
300 Unsynchronized.ref Working)); |
270 |
301 |
271 |
302 |
272 (* scheduler *) |
303 (* scheduler *) |
273 |
304 |
274 fun ML_statistics () = |
|
275 if ! ML_Statistics.enabled then |
|
276 (case ML_Statistics.get () of |
|
277 [] => () |
|
278 | stats => Output.protocol_message (Markup.ML_statistics @ stats) "") |
|
279 else (); |
|
280 |
|
281 val status_ticks = Unsynchronized.ref 0; |
|
282 |
|
283 val last_round = Unsynchronized.ref Time.zeroTime; |
|
284 val next_round = seconds 0.05; |
|
285 |
|
286 fun scheduler_next () = (*requires SYNCHRONIZED*) |
305 fun scheduler_next () = (*requires SYNCHRONIZED*) |
287 let |
306 let |
288 val now = Time.now (); |
307 val now = Time.now (); |
289 val tick = Time.<= (Time.+ (! last_round, next_round), now); |
308 val tick = Time.<= (Time.+ (! last_round, next_round), now); |
290 val _ = if tick then last_round := now else (); |
309 val _ = if tick then last_round := now else (); |
291 |
310 |
292 |
311 |
293 (* queue and worker status *) |
312 (* runtime status *) |
294 |
313 |
295 val _ = |
314 val _ = |
296 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); |
315 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); |
297 val _ = |
316 val _ = |
298 if tick andalso ! status_ticks = 0 then |
317 if tick andalso ! status_ticks = 0 then report_status () else (); |
299 (ML_statistics (); |
|
300 Multithreading.tracing 1 (fn () => |
|
301 let |
|
302 val {ready, pending, running, passive} = Task_Queue.status (! queue); |
|
303 val total = length (! workers); |
|
304 val active = count_workers Working; |
|
305 val waiting = count_workers Waiting; |
|
306 in |
|
307 "SCHEDULE " ^ Time.toString now ^ ": " ^ |
|
308 string_of_int ready ^ " ready, " ^ |
|
309 string_of_int pending ^ " pending, " ^ |
|
310 string_of_int running ^ " running, " ^ |
|
311 string_of_int passive ^ " passive; " ^ |
|
312 string_of_int total ^ " workers, " ^ |
|
313 string_of_int active ^ " active, " ^ |
|
314 string_of_int waiting ^ " waiting " |
|
315 end)) |
|
316 else (); |
|
317 |
318 |
318 val _ = |
319 val _ = |
319 if forall (Thread.isActive o #1) (! workers) then () |
320 if forall (Thread.isActive o #1) (! workers) then () |
320 else |
321 else |
321 let |
322 let |
398 fun scheduler_loop () = |
399 fun scheduler_loop () = |
399 (while |
400 (while |
400 Multithreading.with_attributes |
401 Multithreading.with_attributes |
401 (Multithreading.sync_interrupts Multithreading.public_interrupts) |
402 (Multithreading.sync_interrupts Multithreading.public_interrupts) |
402 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) |
403 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) |
403 do (); last_round := Time.zeroTime; ML_statistics ()); |
404 do (); last_round := Time.zeroTime; report_status ()); |
404 |
405 |
405 fun scheduler_active () = (*requires SYNCHRONIZED*) |
406 fun scheduler_active () = (*requires SYNCHRONIZED*) |
406 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); |
407 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); |
407 |
408 |
408 fun scheduler_check () = (*requires SYNCHRONIZED*) |
409 fun scheduler_check () = (*requires SYNCHRONIZED*) |