equal
deleted
inserted
replaced
172 |
172 |
173 |
173 |
174 (* cancellation primitives *) |
174 (* cancellation primitives *) |
175 |
175 |
176 fun interruptible_task f x = |
176 fun interruptible_task f x = |
177 if Multithreading.available then |
177 (if Multithreading.available then |
178 Multithreading.with_attributes |
178 Multithreading.with_attributes |
179 (if is_some (worker_task ()) |
179 (if is_some (worker_task ()) |
180 then Multithreading.private_interrupts |
180 then Multithreading.private_interrupts |
181 else Multithreading.public_interrupts) |
181 else Multithreading.public_interrupts) |
182 (fn _ => f x) |
182 (fn _ => f x) |
183 else interruptible f x; |
183 else interruptible f x) |
|
184 before Multithreading.interrupted (); |
184 |
185 |
185 fun cancel_now group = (*requires SYNCHRONIZED*) |
186 fun cancel_now group = (*requires SYNCHRONIZED*) |
186 Task_Queue.cancel (! queue) group; |
187 Task_Queue.cancel (! queue) group; |
187 |
188 |
188 fun cancel_later group = (*requires SYNCHRONIZED*) |
189 fun cancel_later group = (*requires SYNCHRONIZED*) |
207 val (run, wait, deps) = Task_Queue.timing_of_task task; |
208 val (run, wait, deps) = Task_Queue.timing_of_task task; |
208 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end); |
209 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end); |
209 val _ = SYNCHRONIZED "finish" (fn () => |
210 val _ = SYNCHRONIZED "finish" (fn () => |
210 let |
211 let |
211 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); |
212 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); |
212 val _ = Exn.capture Multithreading.interrupted (); |
213 val test = Exn.capture Multithreading.interrupted (); |
213 val _ = |
214 val _ = |
214 if ok then () |
215 if ok andalso not (Exn.is_interrupt_exn test) then () |
215 else if cancel_now group then () |
216 else if cancel_now group then () |
216 else cancel_later group; |
217 else cancel_later group; |
217 val _ = broadcast work_finished; |
218 val _ = broadcast work_finished; |
218 val _ = if maximal then () else signal work_available; |
219 val _ = if maximal then () else signal work_available; |
219 in () end); |
220 in () end); |
243 | some => (signal work_available; some)); |
244 | some => (signal work_available; some)); |
244 |
245 |
245 fun worker_loop name = |
246 fun worker_loop name = |
246 (case SYNCHRONIZED name (fn () => worker_next ()) of |
247 (case SYNCHRONIZED name (fn () => worker_next ()) of |
247 NONE => () |
248 NONE => () |
248 | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name)); |
249 | SOME work => (worker_exec work; worker_loop name)); |
249 |
250 |
250 fun worker_start name = (*requires SYNCHRONIZED*) |
251 fun worker_start name = (*requires SYNCHRONIZED*) |
251 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), |
252 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), |
252 Unsynchronized.ref Working)); |
253 Unsynchronized.ref Working)); |
253 |
254 |
429 if ok then |
430 if ok then |
430 Exn.capture (fn () => |
431 Exn.capture (fn () => |
431 Multithreading.with_attributes |
432 Multithreading.with_attributes |
432 (if interrupts |
433 (if interrupts |
433 then Multithreading.private_interrupts else Multithreading.no_interrupts) |
434 then Multithreading.private_interrupts else Multithreading.no_interrupts) |
434 (fn _ => Position.setmp_thread_data pos e ()) before |
435 (fn _ => Position.setmp_thread_data pos e ())) () |
435 Multithreading.interrupted ()) () |
|
436 else Exn.interrupt_exn; |
436 else Exn.interrupt_exn; |
437 in assign_result group result res end; |
437 in assign_result group result res end; |
438 in (result, job) end; |
438 in (result, job) end; |
439 |
439 |
440 |
440 |