src/Pure/Concurrent/future.ML
changeset 39232 69c6d3e87660
parent 38236 d8c7be27e01d
child 39243 307e3d07d19f
equal deleted inserted replaced
39231:25c345302a17 39232:69c6d3e87660
   108 fun assign_result group result res =
   108 fun assign_result group result res =
   109   let
   109   let
   110     val _ = Single_Assignment.assign result res
   110     val _ = Single_Assignment.assign result res
   111       handle exn as Fail _ =>
   111       handle exn as Fail _ =>
   112         (case Single_Assignment.peek result of
   112         (case Single_Assignment.peek result of
   113           SOME (Exn.Exn Exn.Interrupt) => raise Exn.Interrupt
   113           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   114         | _ => reraise exn);
   114         | _ => reraise exn);
   115     val ok =
   115     val ok =
   116       (case the (Single_Assignment.peek result) of
   116       (case the (Single_Assignment.peek result) of
   117         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   117         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   118       | Exn.Result _ => true);
   118       | Exn.Result _ => true);
   182         val res =
   182         val res =
   183           if ok then
   183           if ok then
   184             Exn.capture (fn () =>
   184             Exn.capture (fn () =>
   185               Multithreading.with_attributes Multithreading.private_interrupts
   185               Multithreading.with_attributes Multithreading.private_interrupts
   186                 (fn _ => Position.setmp_thread_data pos e ())) ()
   186                 (fn _ => Position.setmp_thread_data pos e ())) ()
   187           else Exn.Exn Exn.Interrupt;
   187           else Exn.interrupt_exn;
   188       in assign_result group result res end;
   188       in assign_result group result res end;
   189   in (result, job) end;
   189   in (result, job) end;
   190 
   190 
   191 fun cancel_now group = (*requires SYNCHRONIZED*)
   191 fun cancel_now group = (*requires SYNCHRONIZED*)
   192   Task_Queue.cancel (! queue) group;
   192   Task_Queue.cancel (! queue) group;
   357     val continue = not (! do_shutdown andalso null (! workers));
   357     val continue = not (! do_shutdown andalso null (! workers));
   358     val _ = if continue then () else scheduler := NONE;
   358     val _ = if continue then () else scheduler := NONE;
   359 
   359 
   360     val _ = broadcast scheduler_event;
   360     val _ = broadcast scheduler_event;
   361   in continue end
   361   in continue end
   362   handle Exn.Interrupt =>
   362   handle exn =>
   363    (Multithreading.tracing 1 (fn () => "Interrupt");
   363     if Exn.is_interrupt exn then
   364     List.app cancel_later (Task_Queue.cancel_all (! queue));
   364      (Multithreading.tracing 1 (fn () => "Interrupt");
   365     broadcast_work (); true);
   365       List.app cancel_later (Task_Queue.cancel_all (! queue));
       
   366       broadcast_work (); true)
       
   367     else reraise exn;
   366 
   368 
   367 fun scheduler_loop () =
   369 fun scheduler_loop () =
   368   while
   370   while
   369     Multithreading.with_attributes
   371     Multithreading.with_attributes
   370       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   372       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   413 local
   415 local
   414 
   416 
   415 fun get_result x =
   417 fun get_result x =
   416   (case peek x of
   418   (case peek x of
   417     NONE => Exn.Exn (Fail "Unfinished future")
   419     NONE => Exn.Exn (Fail "Unfinished future")
   418   | SOME (exn as Exn.Exn Exn.Interrupt) =>
   420   | SOME res =>
   419       (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   421       if Exn.is_interrupt_exn res then
   420         [] => exn
   422         (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   421       | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   423           [] => res
   422   | SOME res => res);
   424         | exns => Exn.Exn (Exn.EXCEPTIONS exns))
       
   425       else res);
   423 
   426 
   424 fun join_next deps = (*requires SYNCHRONIZED*)
   427 fun join_next deps = (*requires SYNCHRONIZED*)
   425   if null deps then NONE
   428   if null deps then NONE
   426   else
   429   else
   427     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   430     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   484 (* promised futures -- fulfilled by external means *)
   487 (* promised futures -- fulfilled by external means *)
   485 
   488 
   486 fun promise_group group : 'a future =
   489 fun promise_group group : 'a future =
   487   let
   490   let
   488     val result = Single_Assignment.var "promise" : 'a result;
   491     val result = Single_Assignment.var "promise" : 'a result;
   489     fun abort () = assign_result group result (Exn.Exn Exn.Interrupt) handle Fail _ => true;
   492     fun abort () = assign_result group result Exn.interrupt_exn handle Fail _ => true;
   490     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   493     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   491       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   494       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   492   in Future {promised = true, task = task, group = group, result = result} end;
   495   in Future {promised = true, task = task, group = group, result = result} end;
   493 
   496 
   494 fun promise () = promise_group (worker_subgroup ());
   497 fun promise () = promise_group (worker_subgroup ());
   495 
   498 
   496 fun fulfill_result (Future {promised, task, group, result}) res =
   499 fun fulfill_result (Future {promised, task, group, result}) res =
   497   let
   500   let
   498     val _ = promised orelse raise Fail "Not a promised future";
   501     val _ = promised orelse raise Fail "Not a promised future";
   499     fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
   502     fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   500     val _ = execute (task, group, [job]);
   503     val _ = execute (task, group, [job]);
   501   in () end;
   504   in () end;
   502 
   505 
   503 fun fulfill x res = fulfill_result x (Exn.Result res);
   506 fun fulfill x res = fulfill_result x (Exn.Result res);
   504 
   507