src/Pure/Concurrent/future.ML
changeset 44295 e43f0ea90c9a
parent 44294 a0ddd5760444
child 44298 b8f8488704e2
equal deleted inserted replaced
44294:a0ddd5760444 44295:e43f0ea90c9a
   172 
   172 
   173 
   173 
   174 (* cancellation primitives *)
   174 (* cancellation primitives *)
   175 
   175 
   176 fun interruptible_task f x =
   176 fun interruptible_task f x =
   177   if Multithreading.available then
   177   (if Multithreading.available then
   178     Multithreading.with_attributes
   178     Multithreading.with_attributes
   179       (if is_some (worker_task ())
   179       (if is_some (worker_task ())
   180        then Multithreading.private_interrupts
   180        then Multithreading.private_interrupts
   181        else Multithreading.public_interrupts)
   181        else Multithreading.public_interrupts)
   182       (fn _ => f x)
   182       (fn _ => f x)
   183   else interruptible f x;
   183    else interruptible f x)
       
   184   before Multithreading.interrupted ();
   184 
   185 
   185 fun cancel_now group = (*requires SYNCHRONIZED*)
   186 fun cancel_now group = (*requires SYNCHRONIZED*)
   186   Task_Queue.cancel (! queue) group;
   187   Task_Queue.cancel (! queue) group;
   187 
   188 
   188 fun cancel_later group = (*requires SYNCHRONIZED*)
   189 fun cancel_later group = (*requires SYNCHRONIZED*)
   207         val (run, wait, deps) = Task_Queue.timing_of_task task;
   208         val (run, wait, deps) = Task_Queue.timing_of_task task;
   208       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   209       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   209     val _ = SYNCHRONIZED "finish" (fn () =>
   210     val _ = SYNCHRONIZED "finish" (fn () =>
   210       let
   211       let
   211         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   212         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   212         val _ = Exn.capture Multithreading.interrupted ();
   213         val test = Exn.capture Multithreading.interrupted ();
   213         val _ =
   214         val _ =
   214           if ok then ()
   215           if ok andalso not (Exn.is_interrupt_exn test) then ()
   215           else if cancel_now group then ()
   216           else if cancel_now group then ()
   216           else cancel_later group;
   217           else cancel_later group;
   217         val _ = broadcast work_finished;
   218         val _ = broadcast work_finished;
   218         val _ = if maximal then () else signal work_available;
   219         val _ = if maximal then () else signal work_available;
   219       in () end);
   220       in () end);
   243     | some => (signal work_available; some));
   244     | some => (signal work_available; some));
   244 
   245 
   245 fun worker_loop name =
   246 fun worker_loop name =
   246   (case SYNCHRONIZED name (fn () => worker_next ()) of
   247   (case SYNCHRONIZED name (fn () => worker_next ()) of
   247     NONE => ()
   248     NONE => ()
   248   | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name));
   249   | SOME work => (worker_exec work; worker_loop name));
   249 
   250 
   250 fun worker_start name = (*requires SYNCHRONIZED*)
   251 fun worker_start name = (*requires SYNCHRONIZED*)
   251   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   252   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   252     Unsynchronized.ref Working));
   253     Unsynchronized.ref Working));
   253 
   254 
   429           if ok then
   430           if ok then
   430             Exn.capture (fn () =>
   431             Exn.capture (fn () =>
   431               Multithreading.with_attributes
   432               Multithreading.with_attributes
   432                 (if interrupts
   433                 (if interrupts
   433                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   434                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   434                 (fn _ => Position.setmp_thread_data pos e ()) before
   435                 (fn _ => Position.setmp_thread_data pos e ())) ()
   435               Multithreading.interrupted ()) ()
       
   436           else Exn.interrupt_exn;
   436           else Exn.interrupt_exn;
   437       in assign_result group result res end;
   437       in assign_result group result res end;
   438   in (result, job) end;
   438   in (result, job) end;
   439 
   439 
   440 
   440