src/Pure/Concurrent/future.ML
changeset 49935 d1ecb3554b25
parent 49910 db618c65a01d
child 50255 d0ec1f0d1d7d
equal deleted inserted replaced
49934:6f7985a42889 49935:d1ecb3554b25
   495 fun fork e = fork_pri 0 e;
   495 fun fork e = fork_pri 0 e;
   496 
   496 
   497 
   497 
   498 (* join *)
   498 (* join *)
   499 
   499 
   500 local
       
   501 
       
   502 fun get_result x =
   500 fun get_result x =
   503   (case peek x of
   501   (case peek x of
   504     NONE => Exn.Exn (Fail "Unfinished future")
   502     NONE => Exn.Exn (Fail "Unfinished future")
   505   | SOME res =>
   503   | SOME res =>
   506       if Exn.is_interrupt_exn res then
   504       if Exn.is_interrupt_exn res then
   507         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   505         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   508           NONE => res
   506           NONE => res
   509         | SOME exn => Exn.Exn exn)
   507         | SOME exn => Exn.Exn exn)
   510       else res);
   508       else res);
   511 
   509 
       
   510 local
       
   511 
   512 fun join_next deps = (*requires SYNCHRONIZED*)
   512 fun join_next deps = (*requires SYNCHRONIZED*)
   513   if null deps then NONE
   513   if null deps then NONE
   514   else
   514   else
   515     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   515     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   516       (NONE, []) => NONE
   516       (NONE, []) => NONE
   559 fun cond_forks args es =
   559 fun cond_forks args es =
   560   if Multithreading.enabled () then forks args es
   560   if Multithreading.enabled () then forks args es
   561   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   561   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   562 
   562 
   563 fun map_future f x =
   563 fun map_future f x =
   564   let
   564   if is_finished x then value (f (join x))
   565     val task = task_of x;
   565   else
   566     val group = Task_Queue.group_of_task task;
   566     let
   567     val (result, job) = future_job group true (fn () => f (join x));
   567       val task = task_of x;
   568 
   568       val group = Task_Queue.group_of_task task;
   569     val extended = SYNCHRONIZED "extend" (fn () =>
   569       val (result, job) = future_job group true (fn () => f (join x));
   570       (case Task_Queue.extend task job (! queue) of
   570 
   571         SOME queue' => (queue := queue'; true)
   571       val extended = SYNCHRONIZED "extend" (fn () =>
   572       | NONE => false));
   572         (case Task_Queue.extend task job (! queue) of
   573   in
   573           SOME queue' => (queue := queue'; true)
   574     if extended then Future {promised = false, task = task, result = result}
   574         | NONE => false));
   575     else
   575     in
   576       (singleton o cond_forks)
   576       if extended then Future {promised = false, task = task, result = result}
   577         {name = "map_future", group = SOME group, deps = [task],
   577       else
   578           pri = Task_Queue.pri_of_task task, interrupts = true}
   578         (singleton o cond_forks)
   579         (fn () => f (join x))
   579           {name = "map_future", group = SOME group, deps = [task],
   580   end;
   580             pri = Task_Queue.pri_of_task task, interrupts = true}
       
   581           (fn () => f (join x))
       
   582     end;
   581 
   583 
   582 
   584 
   583 (* promised futures -- fulfilled by external means *)
   585 (* promised futures -- fulfilled by external means *)
   584 
   586 
   585 fun promise_group group abort : 'a future =
   587 fun promise_group group abort : 'a future =