src/Pure/Concurrent/future.ML
author wenzelm
Tue Jul 21 23:42:29 2009 +0200 (2009-07-21)
changeset 32107 47d0da617fcc
parent 32102 81d03a29980c
child 32109 ff3677972e3f
permissions -rw-r--r--
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
control combinators: paranoia eta-expansion to ensure proper operational semantics;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values.
     5 
     6 Notes:
     7 
     8   * Futures are similar to delayed evaluation, i.e. delay/force is
     9     generalized to fork/join (and variants).  The idea is to model
    10     parallel value-oriented computations, but *not* communicating
    11     processes.
    12 
    13   * Futures are grouped; failure of one group member causes the whole
    14     group to be interrupted eventually.
    15 
    16   * Forked futures are evaluated spontaneously by a farm of worker
    17     threads in the background; join resynchronizes the computation and
    18     delivers results (values or exceptions).
    19 
    20   * The pool of worker threads is limited, usually in correlation with
    21     the number of physical cores on the machine.  Note that allocation
    22     of runtime resources is distorted either if workers yield CPU time
    23     (e.g. via system sleep or wait operations), or if non-worker
    24     threads contend for significant runtime resources independently.
    25 *)
    26 
    27 signature FUTURE =
    28 sig
    29   val enabled: unit -> bool
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_group: unit -> Task_Queue.group option
    34   type 'a future
    35   val task_of: 'a future -> task
    36   val group_of: 'a future -> group
    37   val peek: 'a future -> 'a Exn.result option
    38   val is_finished: 'a future -> bool
    39   val value: 'a -> 'a future
    40   val fork: (unit -> 'a) -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    43   val fork_pri: int -> (unit -> 'a) -> 'a future
    44   val join_results: 'a future list -> 'a Exn.result list
    45   val join_result: 'a future -> 'a Exn.result
    46   val join: 'a future -> 'a
    47   val map: ('a -> 'b) -> 'a future -> 'b future
    48   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    49   val interrupt_task: string -> unit
    50   val cancel_group: group -> unit
    51   val cancel: 'a future -> unit
    52   val shutdown: unit -> unit
    53 end;
    54 
    55 structure Future: FUTURE =
    56 struct
    57 
    58 (** future values **)
    59 
    60 fun enabled () =
    61   Multithreading.enabled () andalso
    62     not (Multithreading.self_critical ());
    63 
    64 
    65 (* identifiers *)
    66 
    67 type task = Task_Queue.task;
    68 type group = Task_Queue.group;
    69 
    70 local
    71   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    72 in
    73   fun thread_data () = the_default NONE (Thread.getLocal tag);
    74   fun setmp_thread_data data f x =
    75     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    76 end;
    77 
    78 val is_worker = is_some o thread_data;
    79 val worker_group = Option.map #3 o thread_data;
    80 
    81 
    82 (* datatype future *)
    83 
    84 datatype 'a future = Future of
    85  {task: task,
    86   group: group,
    87   result: 'a Exn.result option ref};
    88 
    89 fun task_of (Future {task, ...}) = task;
    90 fun group_of (Future {group, ...}) = group;
    91 
    92 fun peek (Future {result, ...}) = ! result;
    93 fun is_finished x = is_some (peek x);
    94 
    95 fun value x = Future
    96  {task = Task_Queue.new_task 0,
    97   group = Task_Queue.new_group NONE,
    98   result = ref (SOME (Exn.Result x))};
    99 
   100 
   101 
   102 (** scheduling **)
   103 
   104 (* global state *)
   105 
   106 val queue = ref Task_Queue.empty;
   107 val next = ref 0;
   108 val workers = ref ([]: (Thread.thread * bool) list);
   109 val scheduler = ref (NONE: Thread.thread option);
   110 val excessive = ref 0;
   111 val canceled = ref ([]: Task_Queue.group list);
   112 val do_shutdown = ref false;
   113 
   114 
   115 (* synchronization *)
   116 
   117 local
   118   val lock = Mutex.mutex ();
   119   val cond = ConditionVar.conditionVar ();
   120 in
   121 
   122 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   123 
   124 fun wait () = (*requires SYNCHRONIZED*)
   125   ConditionVar.wait (cond, lock);
   126 
   127 fun wait_timeout timeout = (*requires SYNCHRONIZED*)
   128   ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
   129 
   130 fun notify_all () = (*requires SYNCHRONIZED*)
   131   ConditionVar.broadcast cond;
   132 
   133 end;
   134 
   135 
   136 (* worker activity *)
   137 
   138 fun count_active ws =
   139   fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
   140 
   141 fun trace_active () = Multithreading.tracing 1 (fn () =>
   142   let
   143     val ws = ! workers;
   144     val m = string_of_int (length ws);
   145     val n = string_of_int (count_active ws);
   146   in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end);
   147 
   148 fun change_active active = (*requires SYNCHRONIZED*)
   149   change workers (AList.update Thread.equal (Thread.self (), active));
   150 
   151 fun overloaded () =
   152   count_active (! workers) > Multithreading.max_threads_value ();
   153 
   154 
   155 (* execute future jobs *)
   156 
   157 fun future_job group (e: unit -> 'a) =
   158   let
   159     val result = ref (NONE: 'a Exn.result option);
   160     fun job ok =
   161       let
   162         val res =
   163           if ok then
   164             Multithreading.with_attributes Multithreading.restricted_interrupts
   165               (fn _ => fn () => Exn.capture e ()) ()
   166           else Exn.Exn Exn.Interrupt;
   167         val _ = result := SOME res;
   168       in
   169         (case res of
   170           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   171         | Exn.Result _ => true)
   172       end;
   173   in (result, job) end;
   174 
   175 fun do_cancel group = (*requires SYNCHRONIZED*)
   176   change canceled (insert Task_Queue.eq_group group);
   177 
   178 fun execute name (task, group, jobs) =
   179   let
   180     val _ = trace_active ();
   181     val valid = not (Task_Queue.is_canceled group);
   182     val ok = setmp_thread_data (name, task, group) (fn () =>
   183       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   184     val _ = SYNCHRONIZED "execute" (fn () =>
   185      (change queue (Task_Queue.finish task);
   186       if ok then ()
   187       else if Task_Queue.cancel (! queue) group then ()
   188       else do_cancel group;
   189       notify_all ()));
   190   in () end;
   191 
   192 
   193 (* worker threads *)
   194 
   195 fun worker_wait () = (*requires SYNCHRONIZED*)
   196   (change_active false; wait (); change_active true);
   197 
   198 fun worker_next () = (*requires SYNCHRONIZED*)
   199   if ! excessive > 0 then
   200     (dec excessive;
   201      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   202      notify_all ();
   203      NONE)
   204   else if overloaded () then (worker_wait (); worker_next ())
   205   else
   206     (case change_result queue Task_Queue.dequeue of
   207       NONE => (worker_wait (); worker_next ())
   208     | some => some);
   209 
   210 fun worker_loop name =
   211   (case SYNCHRONIZED name (fn () => worker_next ()) of
   212     NONE => ()
   213   | SOME work => (execute name work; worker_loop name));
   214 
   215 fun worker_start name = (*requires SYNCHRONIZED*)
   216   change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
   217 
   218 
   219 (* scheduler *)
   220 
   221 fun scheduler_next () = (*requires SYNCHRONIZED*)
   222   let
   223     (*queue status*)
   224     val _ = Multithreading.tracing 1 (fn () =>
   225       let val {ready, pending, running} = Task_Queue.status (! queue) in
   226         "SCHEDULE: " ^
   227           string_of_int ready ^ " ready, " ^
   228           string_of_int pending ^ " pending, " ^
   229           string_of_int running ^ " running"
   230       end);
   231 
   232     (*worker threads*)
   233     val ws = ! workers;
   234     val _ =
   235       if forall (Thread.isActive o #1) ws then ()
   236       else
   237         (case List.partition (Thread.isActive o #1) ws of
   238           (_, []) => ()
   239         | (active, inactive) =>
   240             (workers := active; Multithreading.tracing 0 (fn () =>
   241               "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
   242     val _ = trace_active ();
   243 
   244     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   245     val mm = (m * 3) div 2;
   246     val l = length ws;
   247     val _ = excessive := l - mm;
   248     val _ =
   249       if mm > l then
   250         funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
   251       else ();
   252 
   253     (*canceled groups*)
   254     val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
   255 
   256     (*shutdown*)
   257     val continue = not (! do_shutdown andalso null ws);
   258     val _ = if continue then () else scheduler := NONE;
   259 
   260     val _ = notify_all ();
   261     val _ = interruptible (fn () =>
   262         wait_timeout (Time.fromMilliseconds (if null (! canceled) then 1000 else 50))) ()
   263       handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
   264   in continue end;
   265 
   266 fun scheduler_loop () =
   267   while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
   268 
   269 fun scheduler_active () = (*requires SYNCHRONIZED*)
   270   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   271 
   272 fun scheduler_check name = SYNCHRONIZED name (fn () =>
   273   if not (scheduler_active ()) then
   274     (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
   275   else if ! do_shutdown then error "Scheduler shutdown in progress"
   276   else ());
   277 
   278 
   279 
   280 (** futures **)
   281 
   282 (* fork *)
   283 
   284 fun fork_future opt_group deps pri e =
   285   let
   286     val _ = scheduler_check "future check";
   287 
   288     val group =
   289       (case opt_group of
   290         SOME group => group
   291       | NONE => Task_Queue.new_group (worker_group ()));
   292     val (result, job) = future_job group e;
   293     val task = SYNCHRONIZED "future" (fn () =>
   294       change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
   295   in Future {task = task, group = group, result = result} end;
   296 
   297 fun fork e = fork_future NONE [] 0 e;
   298 fun fork_group group e = fork_future (SOME group) [] 0 e;
   299 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   300 fun fork_pri pri e = fork_future NONE [] pri e;
   301 
   302 
   303 (* join *)
   304 
   305 local
   306 
   307 fun get_result x =
   308   (case peek x of
   309     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   310   | SOME (Exn.Exn Exn.Interrupt) =>
   311       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   312   | SOME res => res);
   313 
   314 fun join_next deps = (*requires SYNCHRONIZED*)
   315   if overloaded () then (worker_wait (); join_next deps)
   316   else change_result queue (Task_Queue.dequeue_towards deps);
   317 
   318 fun join_deps deps =
   319   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   320     NONE => ()
   321   | SOME (work, deps') => (execute "join" work; join_deps deps'));
   322 
   323 in
   324 
   325 fun join_results xs =
   326   if forall is_finished xs then map get_result xs
   327   else uninterruptible (fn _ => fn () =>
   328     let
   329       val _ = scheduler_check "join check";
   330       val _ = Multithreading.self_critical () andalso
   331         error "Cannot join future values within critical section";
   332 
   333       val worker = is_worker ();
   334       val _ = if worker then join_deps (map task_of xs) else ();
   335 
   336       fun join_wait x =
   337         if SYNCHRONIZED "join_wait" (fn () =>
   338           is_finished x orelse (if worker then worker_wait () else wait (); false))
   339         then () else join_wait x;
   340       val _ = List.app join_wait xs;
   341 
   342     in map get_result xs end) ();
   343 
   344 end;
   345 
   346 fun join_result x = singleton join_results x;
   347 fun join x = Exn.release (join_result x);
   348 
   349 
   350 (* map *)
   351 
   352 fun map_future f x =
   353   let
   354     val _ = scheduler_check "map_future check";
   355 
   356     val task = task_of x;
   357     val group = Task_Queue.new_group (SOME (group_of x));
   358     val (result, job) = future_job group (fn () => f (join x));
   359 
   360     val extended = SYNCHRONIZED "map_future" (fn () =>
   361       (case Task_Queue.extend task job (! queue) of
   362         SOME queue' => (queue := queue'; true)
   363       | NONE => false));
   364   in
   365     if extended then Future {task = task, group = group, result = result}
   366     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   367   end;
   368 
   369 
   370 (* cancellation *)
   371 
   372 fun interruptible_task f x =
   373   if Multithreading.available then
   374     Multithreading.with_attributes
   375       (if is_worker ()
   376        then Multithreading.restricted_interrupts
   377        else Multithreading.regular_interrupts)
   378       (fn _ => f) x
   379   else interruptible f x;
   380 
   381 (*interrupt: permissive signal, may get ignored*)
   382 fun interrupt_task id = SYNCHRONIZED "interrupt"
   383   (fn () => Task_Queue.interrupt_external (! queue) id);
   384 
   385 (*cancel: present and future group members will be interrupted eventually*)
   386 fun cancel_group group =
   387  (scheduler_check "cancel check";
   388   SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ())));
   389 
   390 fun cancel x = cancel_group (group_of x);
   391 
   392 
   393 (** global join and shutdown **)
   394 
   395 fun shutdown () =
   396   if Multithreading.available then
   397    (scheduler_check "shutdown check";
   398     SYNCHRONIZED "shutdown" (fn () =>
   399      (while not (scheduler_active ()) do wait ();
   400       while not (Task_Queue.is_empty (! queue)) do wait ();
   401       do_shutdown := true;
   402       notify_all ();
   403       while not (null (! workers)) do wait ();
   404       while scheduler_active () do wait ();
   405       OS.Process.sleep (Time.fromMilliseconds 300))))
   406   else ();
   407 
   408 
   409 (*final declarations of this structure!*)
   410 val map = map_future;
   411 
   412 end;
   413 
   414 type 'a future = 'a Future.future;
   415