495 fun fork e = fork_pri 0 e; |
495 fun fork e = fork_pri 0 e; |
496 |
496 |
497 |
497 |
498 (* join *) |
498 (* join *) |
499 |
499 |
500 local |
|
501 |
|
502 fun get_result x = |
500 fun get_result x = |
503 (case peek x of |
501 (case peek x of |
504 NONE => Exn.Exn (Fail "Unfinished future") |
502 NONE => Exn.Exn (Fail "Unfinished future") |
505 | SOME res => |
503 | SOME res => |
506 if Exn.is_interrupt_exn res then |
504 if Exn.is_interrupt_exn res then |
507 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of |
505 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of |
508 NONE => res |
506 NONE => res |
509 | SOME exn => Exn.Exn exn) |
507 | SOME exn => Exn.Exn exn) |
510 else res); |
508 else res); |
511 |
509 |
|
510 local |
|
511 |
512 fun join_next deps = (*requires SYNCHRONIZED*) |
512 fun join_next deps = (*requires SYNCHRONIZED*) |
513 if null deps then NONE |
513 if null deps then NONE |
514 else |
514 else |
515 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of |
515 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of |
516 (NONE, []) => NONE |
516 (NONE, []) => NONE |
559 fun cond_forks args es = |
559 fun cond_forks args es = |
560 if Multithreading.enabled () then forks args es |
560 if Multithreading.enabled () then forks args es |
561 else map (fn e => value_result (Exn.interruptible_capture e ())) es; |
561 else map (fn e => value_result (Exn.interruptible_capture e ())) es; |
562 |
562 |
563 fun map_future f x = |
563 fun map_future f x = |
564 let |
564 if is_finished x then value (f (join x)) |
565 val task = task_of x; |
565 else |
566 val group = Task_Queue.group_of_task task; |
566 let |
567 val (result, job) = future_job group true (fn () => f (join x)); |
567 val task = task_of x; |
568 |
568 val group = Task_Queue.group_of_task task; |
569 val extended = SYNCHRONIZED "extend" (fn () => |
569 val (result, job) = future_job group true (fn () => f (join x)); |
570 (case Task_Queue.extend task job (! queue) of |
570 |
571 SOME queue' => (queue := queue'; true) |
571 val extended = SYNCHRONIZED "extend" (fn () => |
572 | NONE => false)); |
572 (case Task_Queue.extend task job (! queue) of |
573 in |
573 SOME queue' => (queue := queue'; true) |
574 if extended then Future {promised = false, task = task, result = result} |
574 | NONE => false)); |
575 else |
575 in |
576 (singleton o cond_forks) |
576 if extended then Future {promised = false, task = task, result = result} |
577 {name = "map_future", group = SOME group, deps = [task], |
577 else |
578 pri = Task_Queue.pri_of_task task, interrupts = true} |
578 (singleton o cond_forks) |
579 (fn () => f (join x)) |
579 {name = "map_future", group = SOME group, deps = [task], |
580 end; |
580 pri = Task_Queue.pri_of_task task, interrupts = true} |
|
581 (fn () => f (join x)) |
|
582 end; |
581 |
583 |
582 |
584 |
583 (* promised futures -- fulfilled by external means *) |
585 (* promised futures -- fulfilled by external means *) |
584 |
586 |
585 fun promise_group group abort : 'a future = |
587 fun promise_group group abort : 'a future = |