equal
deleted
inserted
replaced
114 |
114 |
115 fun wait () = (*requires SYNCHRONIZED*) |
115 fun wait () = (*requires SYNCHRONIZED*) |
116 ConditionVar.wait (cond, lock); |
116 ConditionVar.wait (cond, lock); |
117 |
117 |
118 fun wait_timeout timeout = (*requires SYNCHRONIZED*) |
118 fun wait_timeout timeout = (*requires SYNCHRONIZED*) |
119 ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)); |
119 ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout))); |
120 |
120 |
121 fun notify_all () = (*requires SYNCHRONIZED*) |
121 fun notify_all () = (*requires SYNCHRONIZED*) |
122 ConditionVar.broadcast cond; |
122 ConditionVar.broadcast cond; |
123 |
123 |
124 end; |
124 end; |
136 fun change_active active = (*requires SYNCHRONIZED*) |
136 fun change_active active = (*requires SYNCHRONIZED*) |
137 change workers (AList.update Thread.equal (Thread.self (), active)); |
137 change workers (AList.update Thread.equal (Thread.self (), active)); |
138 |
138 |
139 |
139 |
140 (* execute *) |
140 (* execute *) |
|
141 |
|
142 fun do_cancel group = (*requires SYNCHRONIZED*) |
|
143 change canceled (insert Task_Queue.eq_group group); |
141 |
144 |
142 fun execute name (task, group, run) = |
145 fun execute name (task, group, run) = |
143 let |
146 let |
144 val _ = trace_active (); |
147 val _ = trace_active (); |
145 val ok = setmp_thread_data (name, task) run (); |
148 val ok = setmp_thread_data (name, task) run (); |
146 val _ = SYNCHRONIZED "execute" (fn () => |
149 val _ = SYNCHRONIZED "execute" (fn () => |
147 (change queue (Task_Queue.finish task); |
150 (change queue (Task_Queue.finish task); |
148 if ok then () |
151 if ok then () |
149 else if Task_Queue.cancel (! queue) group then () |
152 else if Task_Queue.cancel (! queue) group then () |
150 else change canceled (cons group); |
153 else do_cancel group; |
151 notify_all ())); |
154 notify_all ())); |
152 in () end; |
155 in () end; |
153 |
156 |
154 |
157 |
155 (* worker threads *) |
158 (* worker threads *) |
203 (*shutdown*) |
206 (*shutdown*) |
204 val continue = not (! do_shutdown andalso null (! workers)); |
207 val continue = not (! do_shutdown andalso null (! workers)); |
205 val _ = if continue then () else scheduler := NONE; |
208 val _ = if continue then () else scheduler := NONE; |
206 |
209 |
207 val _ = notify_all (); |
210 val _ = notify_all (); |
208 val _ = wait_timeout (Time.fromSeconds 3); |
211 val _ = interruptible (fn () => wait_timeout (Time.fromSeconds 1)) () |
|
212 handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue)); |
209 in continue end; |
213 in continue end; |
210 |
214 |
211 fun scheduler_loop () = |
215 fun scheduler_loop () = |
212 while SYNCHRONIZED "scheduler" scheduler_next do (); |
216 while SYNCHRONIZED "scheduler" scheduler_next do (); |
213 |
217 |
304 (fn () => Task_Queue.interrupt_external (! queue) id); |
308 (fn () => Task_Queue.interrupt_external (! queue) id); |
305 |
309 |
306 (*cancel: present and future group members will be interrupted eventually*) |
310 (*cancel: present and future group members will be interrupted eventually*) |
307 fun cancel x = |
311 fun cancel x = |
308 (scheduler_check "cancel check"; |
312 (scheduler_check "cancel check"; |
309 SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ()))); |
313 SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ()))); |
310 |
314 |
311 |
315 |
312 (*global join and shutdown*) |
316 (*global join and shutdown*) |
313 fun shutdown () = |
317 fun shutdown () = |
314 if Multithreading.available then |
318 if Multithreading.available then |