src/Pure/Concurrent/future.ML
changeset 29341 6bb007a0f9f2
parent 29119 99941fd0cb0e
child 29366 1ffc8cbf39ec
equal deleted inserted replaced
29340:057a30ee8570 29341:6bb007a0f9f2
   114 
   114 
   115 fun wait () = (*requires SYNCHRONIZED*)
   115 fun wait () = (*requires SYNCHRONIZED*)
   116   ConditionVar.wait (cond, lock);
   116   ConditionVar.wait (cond, lock);
   117 
   117 
   118 fun wait_timeout timeout = (*requires SYNCHRONIZED*)
   118 fun wait_timeout timeout = (*requires SYNCHRONIZED*)
   119   ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
   119   ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
   120 
   120 
   121 fun notify_all () = (*requires SYNCHRONIZED*)
   121 fun notify_all () = (*requires SYNCHRONIZED*)
   122   ConditionVar.broadcast cond;
   122   ConditionVar.broadcast cond;
   123 
   123 
   124 end;
   124 end;
   136 fun change_active active = (*requires SYNCHRONIZED*)
   136 fun change_active active = (*requires SYNCHRONIZED*)
   137   change workers (AList.update Thread.equal (Thread.self (), active));
   137   change workers (AList.update Thread.equal (Thread.self (), active));
   138 
   138 
   139 
   139 
   140 (* execute *)
   140 (* execute *)
       
   141 
       
   142 fun do_cancel group = (*requires SYNCHRONIZED*)
       
   143   change canceled (insert Task_Queue.eq_group group);
   141 
   144 
   142 fun execute name (task, group, run) =
   145 fun execute name (task, group, run) =
   143   let
   146   let
   144     val _ = trace_active ();
   147     val _ = trace_active ();
   145     val ok = setmp_thread_data (name, task) run ();
   148     val ok = setmp_thread_data (name, task) run ();
   146     val _ = SYNCHRONIZED "execute" (fn () =>
   149     val _ = SYNCHRONIZED "execute" (fn () =>
   147      (change queue (Task_Queue.finish task);
   150      (change queue (Task_Queue.finish task);
   148       if ok then ()
   151       if ok then ()
   149       else if Task_Queue.cancel (! queue) group then ()
   152       else if Task_Queue.cancel (! queue) group then ()
   150       else change canceled (cons group);
   153       else do_cancel group;
   151       notify_all ()));
   154       notify_all ()));
   152   in () end;
   155   in () end;
   153 
   156 
   154 
   157 
   155 (* worker threads *)
   158 (* worker threads *)
   203     (*shutdown*)
   206     (*shutdown*)
   204     val continue = not (! do_shutdown andalso null (! workers));
   207     val continue = not (! do_shutdown andalso null (! workers));
   205     val _ = if continue then () else scheduler := NONE;
   208     val _ = if continue then () else scheduler := NONE;
   206 
   209 
   207     val _ = notify_all ();
   210     val _ = notify_all ();
   208     val _ = wait_timeout (Time.fromSeconds 3);
   211     val _ = interruptible (fn () => wait_timeout (Time.fromSeconds 1)) ()
       
   212       handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
   209   in continue end;
   213   in continue end;
   210 
   214 
   211 fun scheduler_loop () =
   215 fun scheduler_loop () =
   212   while SYNCHRONIZED "scheduler" scheduler_next do ();
   216   while SYNCHRONIZED "scheduler" scheduler_next do ();
   213 
   217 
   304   (fn () => Task_Queue.interrupt_external (! queue) id);
   308   (fn () => Task_Queue.interrupt_external (! queue) id);
   305 
   309 
   306 (*cancel: present and future group members will be interrupted eventually*)
   310 (*cancel: present and future group members will be interrupted eventually*)
   307 fun cancel x =
   311 fun cancel x =
   308  (scheduler_check "cancel check";
   312  (scheduler_check "cancel check";
   309   SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ())));
   313   SYNCHRONIZED "cancel" (fn () => (do_cancel (group_of x); notify_all ())));
   310 
   314 
   311 
   315 
   312 (*global join and shutdown*)
   316 (*global join and shutdown*)
   313 fun shutdown () =
   317 fun shutdown () =
   314   if Multithreading.available then
   318   if Multithreading.available then