src/Pure/Concurrent/future.ML
author wenzelm
Mon Jul 27 17:36:30 2009 +0200 (2009-07-27 ago)
changeset 32230 9f6461b1c9cc
parent 32229 abdc0f6214c8
child 32246 d4f7934e9555
permissions -rw-r--r--
interruptible: Thread.testInterrupt before changing thread attributes;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values.
     5 
     6 Notes:
     7 
     8   * Futures are similar to delayed evaluation, i.e. delay/force is
     9     generalized to fork/join (and variants).  The idea is to model
    10     parallel value-oriented computations, but *not* communicating
    11     processes.
    12 
    13   * Futures are grouped; failure of one group member causes the whole
    14     group to be interrupted eventually.  Groups are block-structured.
    15 
    16   * Forked futures are evaluated spontaneously by a farm of worker
    17     threads in the background; join resynchronizes the computation and
    18     delivers results (values or exceptions).
    19 
    20   * The pool of worker threads is limited, usually in correlation with
    21     the number of physical cores on the machine.  Note that allocation
    22     of runtime resources is distorted either if workers yield CPU time
    23     (e.g. via system sleep or wait operations), or if non-worker
    24     threads contend for significant runtime resources independently.
    25 *)
    26 
    27 signature FUTURE =
    28 sig
    29   val enabled: unit -> bool
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_group: unit -> Task_Queue.group option
    34   type 'a future
    35   val task_of: 'a future -> task
    36   val group_of: 'a future -> group
    37   val peek: 'a future -> 'a Exn.result option
    38   val is_finished: 'a future -> bool
    39   val value: 'a -> 'a future
    40   val fork: (unit -> 'a) -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    43   val fork_pri: int -> (unit -> 'a) -> 'a future
    44   val join_results: 'a future list -> 'a Exn.result list
    45   val join_result: 'a future -> 'a Exn.result
    46   val join: 'a future -> 'a
    47   val map: ('a -> 'b) -> 'a future -> 'b future
    48   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    49   val cancel_group: group -> unit
    50   val cancel: 'a future -> unit
    51   val shutdown: unit -> unit
    52 end;
    53 
    54 structure Future: FUTURE =
    55 struct
    56 
    57 (** future values **)
    58 
    59 fun enabled () =
    60   Multithreading.enabled () andalso
    61     not (Multithreading.self_critical ());
    62 
    63 
    64 (* identifiers *)
    65 
    66 type task = Task_Queue.task;
    67 type group = Task_Queue.group;
    68 
    69 local
    70   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    71 in
    72   fun thread_data () = the_default NONE (Thread.getLocal tag);
    73   fun setmp_thread_data data f x =
    74     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    75 end;
    76 
    77 val is_worker = is_some o thread_data;
    78 val worker_group = Option.map #3 o thread_data;
    79 
    80 
    81 (* datatype future *)
    82 
    83 datatype 'a future = Future of
    84  {task: task,
    85   group: group,
    86   result: 'a Exn.result option ref};
    87 
    88 fun task_of (Future {task, ...}) = task;
    89 fun group_of (Future {group, ...}) = group;
    90 
    91 fun peek (Future {result, ...}) = ! result;
    92 fun is_finished x = is_some (peek x);
    93 
    94 fun value x = Future
    95  {task = Task_Queue.new_task 0,
    96   group = Task_Queue.new_group NONE,
    97   result = ref (SOME (Exn.Result x))};
    98 
    99 
   100 
   101 (** scheduling **)
   102 
   103 (* global state *)
   104 
   105 val queue = ref Task_Queue.empty;
   106 val next = ref 0;
   107 val workers = ref ([]: (Thread.thread * bool) list);
   108 val scheduler = ref (NONE: Thread.thread option);
   109 val excessive = ref 0;
   110 val canceled = ref ([]: Task_Queue.group list);
   111 val do_shutdown = ref false;
   112 
   113 
   114 (* synchronization *)
   115 
   116 val scheduler_event = ConditionVar.conditionVar ();
   117 val work_available = ConditionVar.conditionVar ();
   118 val work_finished = ConditionVar.conditionVar ();
   119 
   120 local
   121   val lock = Mutex.mutex ();
   122 in
   123 
   124 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   125 
   126 fun wait cond = (*requires SYNCHRONIZED*)
   127   ConditionVar.wait (cond, lock) handle Exn.Interrupt => ();
   128 
   129 fun wait_interruptible cond timeout = (*requires SYNCHRONIZED*)
   130   interruptible (fn () =>
   131     ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)))) ();
   132 
   133 fun signal cond = (*requires SYNCHRONIZED*)
   134   ConditionVar.signal cond;
   135 
   136 fun broadcast cond = (*requires SYNCHRONIZED*)
   137   ConditionVar.broadcast cond;
   138 
   139 fun broadcast_all () = (*requires SYNCHRONIZED*)
   140  (ConditionVar.broadcast scheduler_event;
   141   ConditionVar.broadcast work_available;
   142   ConditionVar.broadcast work_finished);
   143 
   144 end;
   145 
   146 
   147 (* worker activity *)
   148 
   149 fun count_active ws =
   150   fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
   151 
   152 fun change_active active = (*requires SYNCHRONIZED*)
   153   change workers (AList.update Thread.equal (Thread.self (), active));
   154 
   155 fun overloaded () =
   156   count_active (! workers) > Multithreading.max_threads_value ();
   157 
   158 
   159 (* execute future jobs *)
   160 
   161 fun future_job group (e: unit -> 'a) =
   162   let
   163     val result = ref (NONE: 'a Exn.result option);
   164     fun job ok =
   165       let
   166         val res =
   167           if ok then
   168             Exn.capture (fn () =>
   169              (Thread.testInterrupt ();
   170               Multithreading.with_attributes Multithreading.restricted_interrupts
   171                 (fn _ => fn () => e ())) ()) ()
   172           else Exn.Exn Exn.Interrupt;
   173         val _ = result := SOME res;
   174       in
   175         (case res of
   176           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   177         | Exn.Result _ => true)
   178       end;
   179   in (result, job) end;
   180 
   181 fun do_cancel group = (*requires SYNCHRONIZED*)
   182  (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
   183 
   184 fun execute name (task, group, jobs) =
   185   let
   186     val valid = not (Task_Queue.is_canceled group);
   187     val ok = setmp_thread_data (name, task, group) (fn () =>
   188       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   189     val _ = SYNCHRONIZED "execute" (fn () =>
   190       let
   191         val maximal = change_result queue (Task_Queue.finish task);
   192         val _ =
   193           if ok then ()
   194           else if Task_Queue.cancel (! queue) group then ()
   195           else do_cancel group;
   196         val _ = broadcast work_finished;
   197         val _ = if maximal then () else broadcast work_available;
   198       in () end);
   199   in () end;
   200 
   201 
   202 (* worker threads *)
   203 
   204 fun worker_wait cond = (*requires SYNCHRONIZED*)
   205  (change_active false; broadcast scheduler_event;
   206   wait cond;
   207   change_active true; broadcast scheduler_event);
   208 
   209 fun worker_next () = (*requires SYNCHRONIZED*)
   210   if ! excessive > 0 then
   211     (dec excessive;
   212      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   213      broadcast scheduler_event;
   214      NONE)
   215   else if overloaded () then (worker_wait scheduler_event; worker_next ())
   216   else
   217     (case change_result queue Task_Queue.dequeue of
   218       NONE => (worker_wait work_available; worker_next ())
   219     | some => some);
   220 
   221 fun worker_loop name =
   222   (case SYNCHRONIZED name (fn () => worker_next ()) of
   223     NONE => ()
   224   | SOME work => (execute name work; worker_loop name));
   225 
   226 fun worker_start name = (*requires SYNCHRONIZED*)
   227   change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
   228 
   229 
   230 (* scheduler *)
   231 
   232 val last_status = ref Time.zeroTime;
   233 val next_status = Time.fromMilliseconds 450;
   234 
   235 fun scheduler_next () = (*requires SYNCHRONIZED*)
   236   let
   237     (*queue and worker status*)
   238     val _ =
   239       let val now = Time.now () in
   240         if Time.> (Time.+ (! last_status, next_status), now) then ()
   241         else
   242          (last_status := now; Multithreading.tracing 1 (fn () =>
   243             let
   244               val {ready, pending, running} = Task_Queue.status (! queue);
   245               val total = length (! workers);
   246               val active = count_active (! workers);
   247             in
   248               "SCHEDULE: " ^
   249                 string_of_int ready ^ " ready, " ^
   250                 string_of_int pending ^ " pending, " ^
   251                 string_of_int running ^ " running; " ^
   252                 string_of_int total ^ " workers, " ^
   253                 string_of_int active ^ " active"
   254             end))
   255       end;
   256 
   257     (*worker threads*)
   258     val _ =
   259       if forall (Thread.isActive o #1) (! workers) then ()
   260       else
   261         (case List.partition (Thread.isActive o #1) (! workers) of
   262           (_, []) => ()
   263         | (alive, dead) =>
   264             (workers := alive; Multithreading.tracing 0 (fn () =>
   265               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
   266 
   267     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   268     val mm = (m * 3) div 2;
   269     val l = length (! workers);
   270     val _ = excessive := l - mm;
   271     val _ =
   272       if mm > l then
   273        (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
   274         broadcast scheduler_event)
   275       else ();
   276 
   277     (*canceled groups*)
   278     val _ =
   279       if null (! canceled) then ()
   280       else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ());
   281 
   282     (*delay loop*)
   283     val delay =
   284       Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
   285     val _ = wait_interruptible scheduler_event delay
   286       handle Exn.Interrupt =>
   287         (Multithreading.tracing 1 (fn () => "Interrupt");
   288           List.app do_cancel (Task_Queue.cancel_all (! queue)));
   289 
   290     (*shutdown*)
   291     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   292     val continue = not (! do_shutdown andalso null (! workers));
   293     val _ = if continue then () else scheduler := NONE;
   294     val _ = broadcast scheduler_event;
   295   in continue end;
   296 
   297 fun scheduler_loop () =
   298   while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
   299 
   300 fun scheduler_active () = (*requires SYNCHRONIZED*)
   301   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   302 
   303 fun scheduler_check () = (*requires SYNCHRONIZED*)
   304  (do_shutdown := false;
   305   if not (scheduler_active ()) then
   306    (scheduler := SOME (SimpleThread.fork false scheduler_loop); broadcast scheduler_event)
   307   else ());
   308 
   309 
   310 
   311 (** futures **)
   312 
   313 (* fork *)
   314 
   315 fun fork_future opt_group deps pri e =
   316   let
   317     val group =
   318       (case opt_group of
   319         SOME group => group
   320       | NONE => Task_Queue.new_group (worker_group ()));
   321     val (result, job) = future_job group e;
   322     val task = SYNCHRONIZED "future" (fn () =>
   323       let
   324         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   325         val _ = if minimal then signal work_available else ();
   326         val _ = scheduler_check ();
   327       in task end);
   328   in Future {task = task, group = group, result = result} end;
   329 
   330 fun fork e = fork_future NONE [] 0 e;
   331 fun fork_group group e = fork_future (SOME group) [] 0 e;
   332 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   333 fun fork_pri pri e = fork_future NONE [] pri e;
   334 
   335 
   336 (* join *)
   337 
   338 local
   339 
   340 fun get_result x =
   341   (case peek x of
   342     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   343   | SOME (Exn.Exn Exn.Interrupt) =>
   344       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   345   | SOME res => res);
   346 
   347 fun join_wait x =
   348   if SYNCHRONIZED "join_wait" (fn () =>
   349     is_finished x orelse (wait work_finished; false))
   350   then () else join_wait x;
   351 
   352 fun join_next deps = (*requires SYNCHRONIZED*)
   353   if null deps then NONE
   354   else
   355     (case change_result queue (Task_Queue.dequeue_towards deps) of
   356       (NONE, []) => NONE
   357     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   358     | (SOME work, deps') => SOME (work, deps'));
   359 
   360 fun join_work deps =
   361   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   362     NONE => ()
   363   | SOME (work, deps') => (execute "join" work; join_work deps'));
   364 
   365 in
   366 
   367 fun join_results xs =
   368   if forall is_finished xs then map get_result xs
   369   else uninterruptible (fn _ => fn () =>
   370     let
   371       val _ = Multithreading.self_critical () andalso
   372         error "Cannot join future values within critical section";
   373       val _ =
   374         if is_worker () then join_work (map task_of xs)
   375         else List.app join_wait xs;
   376     in map get_result xs end) ();
   377 
   378 end;
   379 
   380 fun join_result x = singleton join_results x;
   381 fun join x = Exn.release (join_result x);
   382 
   383 
   384 (* map *)
   385 
   386 fun map_future f x =
   387   let
   388     val task = task_of x;
   389     val group = Task_Queue.new_group (SOME (group_of x));
   390     val (result, job) = future_job group (fn () => f (join x));
   391 
   392     val extended = SYNCHRONIZED "map_future" (fn () =>
   393       (case Task_Queue.extend task job (! queue) of
   394         SOME queue' => (queue := queue'; true)
   395       | NONE => false));
   396   in
   397     if extended then Future {task = task, group = group, result = result}
   398     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   399   end;
   400 
   401 
   402 (* cancellation *)
   403 
   404 fun interruptible_task f x =
   405   if Multithreading.available then
   406     Multithreading.with_attributes
   407       (if is_worker ()
   408        then Multithreading.restricted_interrupts
   409        else Multithreading.regular_interrupts)
   410       (fn _ => f) x
   411   else interruptible f x;
   412 
   413 (*cancel: present and future group members will be interrupted eventually*)
   414 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
   415 fun cancel x = cancel_group (group_of x);
   416 
   417 
   418 (* shutdown *)
   419 
   420 fun shutdown () =
   421   if Multithreading.available then
   422     SYNCHRONIZED "shutdown" (fn () =>
   423      while scheduler_active () do
   424       (wait scheduler_event; broadcast_all ()))
   425   else ();
   426 
   427 
   428 (*final declarations of this structure!*)
   429 val map = map_future;
   430 
   431 end;
   432 
   433 type 'a future = 'a Future.future;
   434