src/Pure/Concurrent/future.ML
author haftmann
Thu Oct 22 13:48:06 2009 +0200 (2009-10-22)
changeset 33063 4d462963a7db
parent 32814 81897d30b97f
child 33061 e3e61133e0fc
permissions -rw-r--r--
map_range (and map_index) combinator
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_task: unit -> Task_Queue.task option
    34   val worker_group: unit -> Task_Queue.group option
    35   type 'a future
    36   val task_of: 'a future -> task
    37   val group_of: 'a future -> group
    38   val peek: 'a future -> 'a Exn.result option
    39   val is_finished: 'a future -> bool
    40   val value: 'a -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    43   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    44   val fork_pri: int -> (unit -> 'a) -> 'a future
    45   val fork: (unit -> 'a) -> 'a future
    46   val join_results: 'a future list -> 'a Exn.result list
    47   val join_result: 'a future -> 'a Exn.result
    48   val join: 'a future -> 'a
    49   val map: ('a -> 'b) -> 'a future -> 'b future
    50   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    51   val cancel_group: group -> unit
    52   val cancel: 'a future -> unit
    53   val shutdown: unit -> unit
    54 end;
    55 
    56 structure Future: FUTURE =
    57 struct
    58 
    59 (** future values **)
    60 
    61 (* identifiers *)
    62 
    63 type task = Task_Queue.task;
    64 type group = Task_Queue.group;
    65 
    66 local
    67   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    68 in
    69   fun thread_data () = the_default NONE (Thread.getLocal tag);
    70   fun setmp_thread_data data f x =
    71     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    72 end;
    73 
    74 val is_worker = is_some o thread_data;
    75 val worker_task = Option.map #2 o thread_data;
    76 val worker_group = Option.map #3 o thread_data;
    77 
    78 
    79 (* datatype future *)
    80 
    81 datatype 'a future = Future of
    82  {task: task,
    83   group: group,
    84   result: 'a Exn.result option Synchronized.var};
    85 
    86 fun task_of (Future {task, ...}) = task;
    87 fun group_of (Future {group, ...}) = group;
    88 fun result_of (Future {result, ...}) = result;
    89 
    90 fun peek x = Synchronized.value (result_of x);
    91 fun is_finished x = is_some (peek x);
    92 
    93 fun value x = Future
    94  {task = Task_Queue.new_task 0,
    95   group = Task_Queue.new_group NONE,
    96   result = Synchronized.var "future" (SOME (Exn.Result x))};
    97 
    98 
    99 
   100 (** scheduling **)
   101 
   102 (* global state *)
   103 
   104 val queue = Unsynchronized.ref Task_Queue.empty;
   105 val next = Unsynchronized.ref 0;
   106 val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
   107 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   108 val excessive = Unsynchronized.ref 0;
   109 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   110 val do_shutdown = Unsynchronized.ref false;
   111 
   112 
   113 (* synchronization *)
   114 
   115 val scheduler_event = ConditionVar.conditionVar ();
   116 val work_available = ConditionVar.conditionVar ();
   117 val work_finished = ConditionVar.conditionVar ();
   118 
   119 local
   120   val lock = Mutex.mutex ();
   121 in
   122 
   123 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   124 
   125 fun wait cond = (*requires SYNCHRONIZED*)
   126   Multithreading.sync_wait NONE NONE cond lock;
   127 
   128 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   129   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   130 
   131 fun signal cond = (*requires SYNCHRONIZED*)
   132   ConditionVar.signal cond;
   133 
   134 fun broadcast cond = (*requires SYNCHRONIZED*)
   135   ConditionVar.broadcast cond;
   136 
   137 fun broadcast_work () = (*requires SYNCHRONIZED*)
   138  (ConditionVar.broadcast work_available;
   139   ConditionVar.broadcast work_finished);
   140 
   141 end;
   142 
   143 
   144 (* execute future jobs *)
   145 
   146 fun future_job group (e: unit -> 'a) =
   147   let
   148     val result = Synchronized.var "future" (NONE: 'a Exn.result option);
   149     fun job ok =
   150       let
   151         val res =
   152           if ok then
   153             Exn.capture (fn () =>
   154               Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
   155           else Exn.Exn Exn.Interrupt;
   156         val _ = Synchronized.change result
   157           (fn NONE => SOME res
   158             | SOME _ => raise Fail "Duplicate assignment of future value");
   159       in
   160         (case res of
   161           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   162         | Exn.Result _ => true)
   163       end;
   164   in (result, job) end;
   165 
   166 fun do_cancel group = (*requires SYNCHRONIZED*)
   167  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   168   broadcast scheduler_event);
   169 
   170 fun execute name (task, group, jobs) =
   171   let
   172     val valid = not (Task_Queue.is_canceled group);
   173     val ok = setmp_thread_data (name, task, group) (fn () =>
   174       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   175     val _ = SYNCHRONIZED "finish" (fn () =>
   176       let
   177         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   178         val _ =
   179           if ok then ()
   180           else if Task_Queue.cancel (! queue) group then ()
   181           else do_cancel group;
   182         val _ = broadcast work_finished;
   183         val _ = if maximal then () else broadcast work_available;
   184       in () end);
   185   in () end;
   186 
   187 
   188 (* worker activity *)
   189 
   190 fun count_active () = (*requires SYNCHRONIZED*)
   191   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
   192 
   193 fun change_active active = (*requires SYNCHRONIZED*)
   194   Unsynchronized.change workers
   195     (AList.update Thread.equal (Thread.self (), active));
   196 
   197 
   198 (* worker threads *)
   199 
   200 fun worker_wait cond = (*requires SYNCHRONIZED*)
   201   (change_active false; wait cond; change_active true);
   202 
   203 fun worker_next () = (*requires SYNCHRONIZED*)
   204   if ! excessive > 0 then
   205     (Unsynchronized.dec excessive;
   206      Unsynchronized.change workers
   207       (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   208      broadcast scheduler_event;
   209      NONE)
   210   else if count_active () > Multithreading.max_threads_value () then
   211     (worker_wait scheduler_event; worker_next ())
   212   else
   213     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   214       NONE => (worker_wait work_available; worker_next ())
   215     | some => some);
   216 
   217 fun worker_loop name =
   218   (case SYNCHRONIZED name (fn () => worker_next ()) of
   219     NONE => ()
   220   | SOME work => (execute name work; worker_loop name));
   221 
   222 fun worker_start name = (*requires SYNCHRONIZED*)
   223   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
   224      (broadcast scheduler_event; worker_loop name)), true));
   225 
   226 
   227 (* scheduler *)
   228 
   229 val last_status = Unsynchronized.ref Time.zeroTime;
   230 val next_status = Time.fromMilliseconds 500;
   231 val next_round = Time.fromMilliseconds 50;
   232 
   233 fun scheduler_next () = (*requires SYNCHRONIZED*)
   234   let
   235     (*queue and worker status*)
   236     val _ =
   237       let val now = Time.now () in
   238         if Time.> (Time.+ (! last_status, next_status), now) then ()
   239         else
   240          (last_status := now; Multithreading.tracing 1 (fn () =>
   241             let
   242               val {ready, pending, running} = Task_Queue.status (! queue);
   243               val total = length (! workers);
   244               val active = count_active ();
   245             in
   246               "SCHEDULE " ^ Time.toString now ^ ": " ^
   247                 string_of_int ready ^ " ready, " ^
   248                 string_of_int pending ^ " pending, " ^
   249                 string_of_int running ^ " running; " ^
   250                 string_of_int total ^ " workers, " ^
   251                 string_of_int active ^ " active"
   252             end))
   253       end;
   254 
   255     (*worker threads*)
   256     val _ =
   257       if forall (Thread.isActive o #1) (! workers) then ()
   258       else
   259         (case List.partition (Thread.isActive o #1) (! workers) of
   260           (_, []) => ()
   261         | (alive, dead) =>
   262             (workers := alive; Multithreading.tracing 0 (fn () =>
   263               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
   264 
   265     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   266     val mm = if m = 9999 then 1 else m * 2;
   267     val l = length (! workers);
   268     val _ = excessive := l - mm;
   269     val _ =
   270       if mm > l then
   271         funpow (mm - l) (fn () =>
   272           worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
   273       else ();
   274 
   275     (*canceled groups*)
   276     val _ =
   277       if null (! canceled) then ()
   278       else
   279        (Multithreading.tracing 1 (fn () =>
   280           string_of_int (length (! canceled)) ^ " canceled groups");
   281         Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
   282         broadcast_work ());
   283 
   284     (*delay loop*)
   285     val _ = Exn.release (wait_timeout next_round scheduler_event);
   286 
   287     (*shutdown*)
   288     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   289     val continue = not (! do_shutdown andalso null (! workers));
   290     val _ = if continue then () else scheduler := NONE;
   291     val _ = broadcast scheduler_event;
   292   in continue end
   293   handle Exn.Interrupt =>
   294    (Multithreading.tracing 1 (fn () => "Interrupt");
   295     uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
   296     scheduler_next ());
   297 
   298 fun scheduler_loop () =
   299   Multithreading.with_attributes
   300     (Multithreading.sync_interrupts Multithreading.public_interrupts)
   301     (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
   302 
   303 fun scheduler_active () = (*requires SYNCHRONIZED*)
   304   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   305 
   306 fun scheduler_check () = (*requires SYNCHRONIZED*)
   307  (do_shutdown := false;
   308   if scheduler_active () then ()
   309   else scheduler := SOME (SimpleThread.fork false scheduler_loop));
   310 
   311 
   312 
   313 (** futures **)
   314 
   315 (* fork *)
   316 
   317 fun fork_future opt_group deps pri e =
   318   let
   319     val group =
   320       (case opt_group of
   321         SOME group => group
   322       | NONE => Task_Queue.new_group (worker_group ()));
   323     val (result, job) = future_job group e;
   324     val task = SYNCHRONIZED "enqueue" (fn () =>
   325       let
   326         val (task, minimal) =
   327           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   328         val _ = if minimal then signal work_available else ();
   329         val _ = scheduler_check ();
   330       in task end);
   331   in Future {task = task, group = group, result = result} end;
   332 
   333 fun fork_group group e = fork_future (SOME group) [] 0 e;
   334 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   335 fun fork_deps deps e = fork_deps_pri deps 0 e;
   336 fun fork_pri pri e = fork_deps_pri [] pri e;
   337 fun fork e = fork_deps [] e;
   338 
   339 
   340 (* join *)
   341 
   342 local
   343 
   344 fun get_result x =
   345   (case peek x of
   346     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   347   | SOME (Exn.Exn Exn.Interrupt) =>
   348       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   349   | SOME res => res);
   350 
   351 fun join_wait x =
   352   Synchronized.guarded_access (result_of x)
   353     (fn NONE => NONE | some => SOME ((), some));
   354 
   355 fun join_next deps = (*requires SYNCHRONIZED*)
   356   if null deps then NONE
   357   else
   358     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   359       (NONE, []) => NONE
   360     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   361     | (SOME work, deps') => SOME (work, deps'));
   362 
   363 fun execute_work NONE = ()
   364   | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps')
   365 and join_work deps =
   366   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   367 
   368 fun join_depend task deps =
   369   execute_work (SYNCHRONIZED "join" (fn () =>
   370     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   371 
   372 in
   373 
   374 fun join_results xs =
   375   if forall is_finished xs then map get_result xs
   376   else if Multithreading.self_critical () then
   377     error "Cannot join future values within critical section"
   378   else
   379     (case worker_task () of
   380       SOME task => join_depend task (map task_of xs)
   381     | NONE => List.app join_wait xs;
   382     map get_result xs);
   383 
   384 end;
   385 
   386 fun join_result x = singleton join_results x;
   387 fun join x = Exn.release (join_result x);
   388 
   389 
   390 (* map *)
   391 
   392 fun map_future f x =
   393   let
   394     val task = task_of x;
   395     val group = Task_Queue.new_group (SOME (group_of x));
   396     val (result, job) = future_job group (fn () => f (join x));
   397 
   398     val extended = SYNCHRONIZED "extend" (fn () =>
   399       (case Task_Queue.extend task job (! queue) of
   400         SOME queue' => (queue := queue'; true)
   401       | NONE => false));
   402   in
   403     if extended then Future {task = task, group = group, result = result}
   404     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   405   end;
   406 
   407 
   408 (* cancellation *)
   409 
   410 fun interruptible_task f x =
   411   if Multithreading.available then
   412     Multithreading.with_attributes
   413       (if is_worker ()
   414        then Multithreading.private_interrupts
   415        else Multithreading.public_interrupts)
   416       (fn _ => f x)
   417   else interruptible f x;
   418 
   419 (*cancel: present and future group members will be interrupted eventually*)
   420 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
   421 fun cancel x = cancel_group (group_of x);
   422 
   423 
   424 (* shutdown *)
   425 
   426 fun shutdown () =
   427   if Multithreading.available then
   428     SYNCHRONIZED "shutdown" (fn () =>
   429      while scheduler_active () do
   430       (wait scheduler_event; broadcast_work ()))
   431   else ();
   432 
   433 
   434 (*final declarations of this structure!*)
   435 val map = map_future;
   436 
   437 end;
   438 
   439 type 'a future = 'a Future.future;
   440