108 fun assign_result group result res = |
108 fun assign_result group result res = |
109 let |
109 let |
110 val _ = Single_Assignment.assign result res |
110 val _ = Single_Assignment.assign result res |
111 handle exn as Fail _ => |
111 handle exn as Fail _ => |
112 (case Single_Assignment.peek result of |
112 (case Single_Assignment.peek result of |
113 SOME (Exn.Exn Exn.Interrupt) => raise Exn.Interrupt |
113 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) |
114 | _ => reraise exn); |
114 | _ => reraise exn); |
115 val ok = |
115 val ok = |
116 (case the (Single_Assignment.peek result) of |
116 (case the (Single_Assignment.peek result) of |
117 Exn.Exn exn => (Task_Queue.cancel_group group exn; false) |
117 Exn.Exn exn => (Task_Queue.cancel_group group exn; false) |
118 | Exn.Result _ => true); |
118 | Exn.Result _ => true); |
182 val res = |
182 val res = |
183 if ok then |
183 if ok then |
184 Exn.capture (fn () => |
184 Exn.capture (fn () => |
185 Multithreading.with_attributes Multithreading.private_interrupts |
185 Multithreading.with_attributes Multithreading.private_interrupts |
186 (fn _ => Position.setmp_thread_data pos e ())) () |
186 (fn _ => Position.setmp_thread_data pos e ())) () |
187 else Exn.Exn Exn.Interrupt; |
187 else Exn.interrupt_exn; |
188 in assign_result group result res end; |
188 in assign_result group result res end; |
189 in (result, job) end; |
189 in (result, job) end; |
190 |
190 |
191 fun cancel_now group = (*requires SYNCHRONIZED*) |
191 fun cancel_now group = (*requires SYNCHRONIZED*) |
192 Task_Queue.cancel (! queue) group; |
192 Task_Queue.cancel (! queue) group; |
357 val continue = not (! do_shutdown andalso null (! workers)); |
357 val continue = not (! do_shutdown andalso null (! workers)); |
358 val _ = if continue then () else scheduler := NONE; |
358 val _ = if continue then () else scheduler := NONE; |
359 |
359 |
360 val _ = broadcast scheduler_event; |
360 val _ = broadcast scheduler_event; |
361 in continue end |
361 in continue end |
362 handle Exn.Interrupt => |
362 handle exn => |
363 (Multithreading.tracing 1 (fn () => "Interrupt"); |
363 if Exn.is_interrupt exn then |
364 List.app cancel_later (Task_Queue.cancel_all (! queue)); |
364 (Multithreading.tracing 1 (fn () => "Interrupt"); |
365 broadcast_work (); true); |
365 List.app cancel_later (Task_Queue.cancel_all (! queue)); |
|
366 broadcast_work (); true) |
|
367 else reraise exn; |
366 |
368 |
367 fun scheduler_loop () = |
369 fun scheduler_loop () = |
368 while |
370 while |
369 Multithreading.with_attributes |
371 Multithreading.with_attributes |
370 (Multithreading.sync_interrupts Multithreading.public_interrupts) |
372 (Multithreading.sync_interrupts Multithreading.public_interrupts) |
413 local |
415 local |
414 |
416 |
415 fun get_result x = |
417 fun get_result x = |
416 (case peek x of |
418 (case peek x of |
417 NONE => Exn.Exn (Fail "Unfinished future") |
419 NONE => Exn.Exn (Fail "Unfinished future") |
418 | SOME (exn as Exn.Exn Exn.Interrupt) => |
420 | SOME res => |
419 (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of |
421 if Exn.is_interrupt_exn res then |
420 [] => exn |
422 (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of |
421 | exns => Exn.Exn (Exn.EXCEPTIONS exns)) |
423 [] => res |
422 | SOME res => res); |
424 | exns => Exn.Exn (Exn.EXCEPTIONS exns)) |
|
425 else res); |
423 |
426 |
424 fun join_next deps = (*requires SYNCHRONIZED*) |
427 fun join_next deps = (*requires SYNCHRONIZED*) |
425 if null deps then NONE |
428 if null deps then NONE |
426 else |
429 else |
427 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of |
430 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of |
484 (* promised futures -- fulfilled by external means *) |
487 (* promised futures -- fulfilled by external means *) |
485 |
488 |
486 fun promise_group group : 'a future = |
489 fun promise_group group : 'a future = |
487 let |
490 let |
488 val result = Single_Assignment.var "promise" : 'a result; |
491 val result = Single_Assignment.var "promise" : 'a result; |
489 fun abort () = assign_result group result (Exn.Exn Exn.Interrupt) handle Fail _ => true; |
492 fun abort () = assign_result group result Exn.interrupt_exn handle Fail _ => true; |
490 val task = SYNCHRONIZED "enqueue_passive" (fn () => |
493 val task = SYNCHRONIZED "enqueue_passive" (fn () => |
491 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort)); |
494 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort)); |
492 in Future {promised = true, task = task, group = group, result = result} end; |
495 in Future {promised = true, task = task, group = group, result = result} end; |
493 |
496 |
494 fun promise () = promise_group (worker_subgroup ()); |
497 fun promise () = promise_group (worker_subgroup ()); |
495 |
498 |
496 fun fulfill_result (Future {promised, task, group, result}) res = |
499 fun fulfill_result (Future {promised, task, group, result}) res = |
497 let |
500 let |
498 val _ = promised orelse raise Fail "Not a promised future"; |
501 val _ = promised orelse raise Fail "Not a promised future"; |
499 fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt); |
502 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn); |
500 val _ = execute (task, group, [job]); |
503 val _ = execute (task, group, [job]); |
501 in () end; |
504 in () end; |
502 |
505 |
503 fun fulfill x res = fulfill_result x (Exn.Result res); |
506 fun fulfill x res = fulfill_result x (Exn.Result res); |
504 |
507 |