97 |
97 |
98 (** scheduling **) |
98 (** scheduling **) |
99 |
99 |
100 (* global state *) |
100 (* global state *) |
101 |
101 |
102 val queue = ref Task_Queue.empty; |
102 val queue = Unsynchronized.ref Task_Queue.empty; |
103 val next = ref 0; |
103 val next = Unsynchronized.ref 0; |
104 val workers = ref ([]: (Thread.thread * bool) list); |
104 val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list); |
105 val scheduler = ref (NONE: Thread.thread option); |
105 val scheduler = Unsynchronized.ref (NONE: Thread.thread option); |
106 val excessive = ref 0; |
106 val excessive = Unsynchronized.ref 0; |
107 val canceled = ref ([]: Task_Queue.group list); |
107 val canceled = Unsynchronized.ref ([]: Task_Queue.group list); |
108 val do_shutdown = ref false; |
108 val do_shutdown = Unsynchronized.ref false; |
109 |
109 |
110 |
110 |
111 (* synchronization *) |
111 (* synchronization *) |
112 |
112 |
113 val scheduler_event = ConditionVar.conditionVar (); |
113 val scheduler_event = ConditionVar.conditionVar (); |
160 | Exn.Result _ => true) |
160 | Exn.Result _ => true) |
161 end; |
161 end; |
162 in (result, job) end; |
162 in (result, job) end; |
163 |
163 |
164 fun do_cancel group = (*requires SYNCHRONIZED*) |
164 fun do_cancel group = (*requires SYNCHRONIZED*) |
165 (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event); |
165 (Unsynchronized.change canceled (insert Task_Queue.eq_group group); |
|
166 broadcast scheduler_event); |
166 |
167 |
167 fun execute name (task, group, jobs) = |
168 fun execute name (task, group, jobs) = |
168 let |
169 let |
169 val valid = not (Task_Queue.is_canceled group); |
170 val valid = not (Task_Queue.is_canceled group); |
170 val ok = setmp_thread_data (name, task, group) (fn () => |
171 val ok = setmp_thread_data (name, task, group) (fn () => |
171 fold (fn job => fn ok => job valid andalso ok) jobs true) (); |
172 fold (fn job => fn ok => job valid andalso ok) jobs true) (); |
172 val _ = SYNCHRONIZED "finish" (fn () => |
173 val _ = SYNCHRONIZED "finish" (fn () => |
173 let |
174 let |
174 val maximal = change_result queue (Task_Queue.finish task); |
175 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); |
175 val _ = |
176 val _ = |
176 if ok then () |
177 if ok then () |
177 else if Task_Queue.cancel (! queue) group then () |
178 else if Task_Queue.cancel (! queue) group then () |
178 else do_cancel group; |
179 else do_cancel group; |
179 val _ = broadcast work_finished; |
180 val _ = broadcast work_finished; |
186 |
187 |
187 fun count_active () = (*requires SYNCHRONIZED*) |
188 fun count_active () = (*requires SYNCHRONIZED*) |
188 fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0; |
189 fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0; |
189 |
190 |
190 fun change_active active = (*requires SYNCHRONIZED*) |
191 fun change_active active = (*requires SYNCHRONIZED*) |
191 change workers (AList.update Thread.equal (Thread.self (), active)); |
192 Unsynchronized.change workers |
|
193 (AList.update Thread.equal (Thread.self (), active)); |
192 |
194 |
193 |
195 |
194 (* worker threads *) |
196 (* worker threads *) |
195 |
197 |
196 fun worker_wait cond = (*requires SYNCHRONIZED*) |
198 fun worker_wait cond = (*requires SYNCHRONIZED*) |
197 (change_active false; wait cond; change_active true); |
199 (change_active false; wait cond; change_active true); |
198 |
200 |
199 fun worker_next () = (*requires SYNCHRONIZED*) |
201 fun worker_next () = (*requires SYNCHRONIZED*) |
200 if ! excessive > 0 then |
202 if ! excessive > 0 then |
201 (dec excessive; |
203 (Unsynchronized.dec excessive; |
202 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
204 Unsynchronized.change workers |
|
205 (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
203 broadcast scheduler_event; |
206 broadcast scheduler_event; |
204 NONE) |
207 NONE) |
205 else if count_active () > Multithreading.max_threads_value () then |
208 else if count_active () > Multithreading.max_threads_value () then |
206 (worker_wait scheduler_event; worker_next ()) |
209 (worker_wait scheduler_event; worker_next ()) |
207 else |
210 else |
208 (case change_result queue (Task_Queue.dequeue (Thread.self ())) of |
211 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of |
209 NONE => (worker_wait work_available; worker_next ()) |
212 NONE => (worker_wait work_available; worker_next ()) |
210 | some => some); |
213 | some => some); |
211 |
214 |
212 fun worker_loop name = |
215 fun worker_loop name = |
213 (case SYNCHRONIZED name (fn () => worker_next ()) of |
216 (case SYNCHRONIZED name (fn () => worker_next ()) of |
214 NONE => () |
217 NONE => () |
215 | SOME work => (execute name work; worker_loop name)); |
218 | SOME work => (execute name work; worker_loop name)); |
216 |
219 |
217 fun worker_start name = (*requires SYNCHRONIZED*) |
220 fun worker_start name = (*requires SYNCHRONIZED*) |
218 change workers (cons (SimpleThread.fork false (fn () => |
221 Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => |
219 (broadcast scheduler_event; worker_loop name)), true)); |
222 (broadcast scheduler_event; worker_loop name)), true)); |
220 |
223 |
221 |
224 |
222 (* scheduler *) |
225 (* scheduler *) |
223 |
226 |
224 val last_status = ref Time.zeroTime; |
227 val last_status = Unsynchronized.ref Time.zeroTime; |
225 val next_status = Time.fromMilliseconds 500; |
228 val next_status = Time.fromMilliseconds 500; |
226 val next_round = Time.fromMilliseconds 50; |
229 val next_round = Time.fromMilliseconds 50; |
227 |
230 |
228 fun scheduler_next () = (*requires SYNCHRONIZED*) |
231 fun scheduler_next () = (*requires SYNCHRONIZED*) |
229 let |
232 let |
261 val mm = if m = 9999 then 1 else m * 2; |
264 val mm = if m = 9999 then 1 else m * 2; |
262 val l = length (! workers); |
265 val l = length (! workers); |
263 val _ = excessive := l - mm; |
266 val _ = excessive := l - mm; |
264 val _ = |
267 val _ = |
265 if mm > l then |
268 if mm > l then |
266 funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) () |
269 funpow (mm - l) (fn () => |
|
270 worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) () |
267 else (); |
271 else (); |
268 |
272 |
269 (*canceled groups*) |
273 (*canceled groups*) |
270 val _ = |
274 val _ = |
271 if null (! canceled) then () |
275 if null (! canceled) then () |
272 else |
276 else |
273 (Multithreading.tracing 1 (fn () => |
277 (Multithreading.tracing 1 (fn () => |
274 string_of_int (length (! canceled)) ^ " canceled groups"); |
278 string_of_int (length (! canceled)) ^ " canceled groups"); |
275 change canceled (filter_out (Task_Queue.cancel (! queue))); |
279 Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue))); |
276 broadcast_work ()); |
280 broadcast_work ()); |
277 |
281 |
278 (*delay loop*) |
282 (*delay loop*) |
279 val _ = Exn.release (wait_timeout next_round scheduler_event); |
283 val _ = Exn.release (wait_timeout next_round scheduler_event); |
280 |
284 |
315 SOME group => group |
319 SOME group => group |
316 | NONE => Task_Queue.new_group (worker_group ())); |
320 | NONE => Task_Queue.new_group (worker_group ())); |
317 val (result, job) = future_job group e; |
321 val (result, job) = future_job group e; |
318 val task = SYNCHRONIZED "enqueue" (fn () => |
322 val task = SYNCHRONIZED "enqueue" (fn () => |
319 let |
323 let |
320 val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job); |
324 val (task, minimal) = |
|
325 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job); |
321 val _ = if minimal then signal work_available else (); |
326 val _ = if minimal then signal work_available else (); |
322 val _ = scheduler_check (); |
327 val _ = scheduler_check (); |
323 in task end); |
328 in task end); |
324 in Future {task = task, group = group, result = result} end; |
329 in Future {task = task, group = group, result = result} end; |
325 |
330 |
345 Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some)); |
350 Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some)); |
346 |
351 |
347 fun join_next deps = (*requires SYNCHRONIZED*) |
352 fun join_next deps = (*requires SYNCHRONIZED*) |
348 if null deps then NONE |
353 if null deps then NONE |
349 else |
354 else |
350 (case change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of |
355 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of |
351 (NONE, []) => NONE |
356 (NONE, []) => NONE |
352 | (NONE, deps') => (worker_wait work_finished; join_next deps') |
357 | (NONE, deps') => (worker_wait work_finished; join_next deps') |
353 | (SOME work, deps') => SOME (work, deps')); |
358 | (SOME work, deps') => SOME (work, deps')); |
354 |
359 |
355 fun join_work deps = |
360 fun join_work deps = |