src/Pure/Concurrent/future.ML
author wenzelm
Tue Sep 29 11:49:22 2009 +0200 (2009-09-29)
changeset 32738 15bb09ca0378
parent 32724 aaeeb0ba2035
child 32814 81897d30b97f
permissions -rw-r--r--
explicit indication of Unsynchronized.ref;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_group: unit -> Task_Queue.group option
    34   type 'a future
    35   val task_of: 'a future -> task
    36   val group_of: 'a future -> group
    37   val peek: 'a future -> 'a Exn.result option
    38   val is_finished: 'a future -> bool
    39   val value: 'a -> 'a future
    40   val fork_group: group -> (unit -> 'a) -> 'a future
    41   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    42   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    43   val fork_pri: int -> (unit -> 'a) -> 'a future
    44   val fork: (unit -> 'a) -> 'a future
    45   val join_results: 'a future list -> 'a Exn.result list
    46   val join_result: 'a future -> 'a Exn.result
    47   val join: 'a future -> 'a
    48   val map: ('a -> 'b) -> 'a future -> 'b future
    49   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    50   val cancel_group: group -> unit
    51   val cancel: 'a future -> unit
    52   val shutdown: unit -> unit
    53 end;
    54 
    55 structure Future: FUTURE =
    56 struct
    57 
    58 (** future values **)
    59 
    60 (* identifiers *)
    61 
    62 type task = Task_Queue.task;
    63 type group = Task_Queue.group;
    64 
    65 local
    66   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    67 in
    68   fun thread_data () = the_default NONE (Thread.getLocal tag);
    69   fun setmp_thread_data data f x =
    70     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    71 end;
    72 
    73 val is_worker = is_some o thread_data;
    74 val worker_group = Option.map #3 o thread_data;
    75 
    76 
    77 (* datatype future *)
    78 
    79 datatype 'a future = Future of
    80  {task: task,
    81   group: group,
    82   result: 'a Exn.result option Synchronized.var};
    83 
    84 fun task_of (Future {task, ...}) = task;
    85 fun group_of (Future {group, ...}) = group;
    86 fun result_of (Future {result, ...}) = result;
    87 
    88 fun peek x = Synchronized.value (result_of x);
    89 fun is_finished x = is_some (peek x);
    90 
    91 fun value x = Future
    92  {task = Task_Queue.new_task 0,
    93   group = Task_Queue.new_group NONE,
    94   result = Synchronized.var "future" (SOME (Exn.Result x))};
    95 
    96 
    97 
    98 (** scheduling **)
    99 
   100 (* global state *)
   101 
   102 val queue = Unsynchronized.ref Task_Queue.empty;
   103 val next = Unsynchronized.ref 0;
   104 val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
   105 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   106 val excessive = Unsynchronized.ref 0;
   107 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   108 val do_shutdown = Unsynchronized.ref false;
   109 
   110 
   111 (* synchronization *)
   112 
   113 val scheduler_event = ConditionVar.conditionVar ();
   114 val work_available = ConditionVar.conditionVar ();
   115 val work_finished = ConditionVar.conditionVar ();
   116 
   117 local
   118   val lock = Mutex.mutex ();
   119 in
   120 
   121 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   122 
   123 fun wait cond = (*requires SYNCHRONIZED*)
   124   Multithreading.sync_wait NONE NONE cond lock;
   125 
   126 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   127   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   128 
   129 fun signal cond = (*requires SYNCHRONIZED*)
   130   ConditionVar.signal cond;
   131 
   132 fun broadcast cond = (*requires SYNCHRONIZED*)
   133   ConditionVar.broadcast cond;
   134 
   135 fun broadcast_work () = (*requires SYNCHRONIZED*)
   136  (ConditionVar.broadcast work_available;
   137   ConditionVar.broadcast work_finished);
   138 
   139 end;
   140 
   141 
   142 (* execute future jobs *)
   143 
   144 fun future_job group (e: unit -> 'a) =
   145   let
   146     val result = Synchronized.var "future" (NONE: 'a Exn.result option);
   147     fun job ok =
   148       let
   149         val res =
   150           if ok then
   151             Exn.capture (fn () =>
   152               Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
   153           else Exn.Exn Exn.Interrupt;
   154         val _ = Synchronized.change result
   155           (fn NONE => SOME res
   156             | SOME _ => raise Fail "Duplicate assignment of future value");
   157       in
   158         (case res of
   159           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   160         | Exn.Result _ => true)
   161       end;
   162   in (result, job) end;
   163 
   164 fun do_cancel group = (*requires SYNCHRONIZED*)
   165  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   166   broadcast scheduler_event);
   167 
   168 fun execute name (task, group, jobs) =
   169   let
   170     val valid = not (Task_Queue.is_canceled group);
   171     val ok = setmp_thread_data (name, task, group) (fn () =>
   172       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   173     val _ = SYNCHRONIZED "finish" (fn () =>
   174       let
   175         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   176         val _ =
   177           if ok then ()
   178           else if Task_Queue.cancel (! queue) group then ()
   179           else do_cancel group;
   180         val _ = broadcast work_finished;
   181         val _ = if maximal then () else broadcast work_available;
   182       in () end);
   183   in () end;
   184 
   185 
   186 (* worker activity *)
   187 
   188 fun count_active () = (*requires SYNCHRONIZED*)
   189   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
   190 
   191 fun change_active active = (*requires SYNCHRONIZED*)
   192   Unsynchronized.change workers
   193     (AList.update Thread.equal (Thread.self (), active));
   194 
   195 
   196 (* worker threads *)
   197 
   198 fun worker_wait cond = (*requires SYNCHRONIZED*)
   199   (change_active false; wait cond; change_active true);
   200 
   201 fun worker_next () = (*requires SYNCHRONIZED*)
   202   if ! excessive > 0 then
   203     (Unsynchronized.dec excessive;
   204      Unsynchronized.change workers
   205       (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   206      broadcast scheduler_event;
   207      NONE)
   208   else if count_active () > Multithreading.max_threads_value () then
   209     (worker_wait scheduler_event; worker_next ())
   210   else
   211     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   212       NONE => (worker_wait work_available; worker_next ())
   213     | some => some);
   214 
   215 fun worker_loop name =
   216   (case SYNCHRONIZED name (fn () => worker_next ()) of
   217     NONE => ()
   218   | SOME work => (execute name work; worker_loop name));
   219 
   220 fun worker_start name = (*requires SYNCHRONIZED*)
   221   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
   222      (broadcast scheduler_event; worker_loop name)), true));
   223 
   224 
   225 (* scheduler *)
   226 
   227 val last_status = Unsynchronized.ref Time.zeroTime;
   228 val next_status = Time.fromMilliseconds 500;
   229 val next_round = Time.fromMilliseconds 50;
   230 
   231 fun scheduler_next () = (*requires SYNCHRONIZED*)
   232   let
   233     (*queue and worker status*)
   234     val _ =
   235       let val now = Time.now () in
   236         if Time.> (Time.+ (! last_status, next_status), now) then ()
   237         else
   238          (last_status := now; Multithreading.tracing 1 (fn () =>
   239             let
   240               val {ready, pending, running} = Task_Queue.status (! queue);
   241               val total = length (! workers);
   242               val active = count_active ();
   243             in
   244               "SCHEDULE " ^ Time.toString now ^ ": " ^
   245                 string_of_int ready ^ " ready, " ^
   246                 string_of_int pending ^ " pending, " ^
   247                 string_of_int running ^ " running; " ^
   248                 string_of_int total ^ " workers, " ^
   249                 string_of_int active ^ " active"
   250             end))
   251       end;
   252 
   253     (*worker threads*)
   254     val _ =
   255       if forall (Thread.isActive o #1) (! workers) then ()
   256       else
   257         (case List.partition (Thread.isActive o #1) (! workers) of
   258           (_, []) => ()
   259         | (alive, dead) =>
   260             (workers := alive; Multithreading.tracing 0 (fn () =>
   261               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
   262 
   263     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   264     val mm = if m = 9999 then 1 else m * 2;
   265     val l = length (! workers);
   266     val _ = excessive := l - mm;
   267     val _ =
   268       if mm > l then
   269         funpow (mm - l) (fn () =>
   270           worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
   271       else ();
   272 
   273     (*canceled groups*)
   274     val _ =
   275       if null (! canceled) then ()
   276       else
   277        (Multithreading.tracing 1 (fn () =>
   278           string_of_int (length (! canceled)) ^ " canceled groups");
   279         Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
   280         broadcast_work ());
   281 
   282     (*delay loop*)
   283     val _ = Exn.release (wait_timeout next_round scheduler_event);
   284 
   285     (*shutdown*)
   286     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   287     val continue = not (! do_shutdown andalso null (! workers));
   288     val _ = if continue then () else scheduler := NONE;
   289     val _ = broadcast scheduler_event;
   290   in continue end
   291   handle Exn.Interrupt =>
   292    (Multithreading.tracing 1 (fn () => "Interrupt");
   293     uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
   294     scheduler_next ());
   295 
   296 fun scheduler_loop () =
   297   Multithreading.with_attributes
   298     (Multithreading.sync_interrupts Multithreading.public_interrupts)
   299     (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
   300 
   301 fun scheduler_active () = (*requires SYNCHRONIZED*)
   302   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   303 
   304 fun scheduler_check () = (*requires SYNCHRONIZED*)
   305  (do_shutdown := false;
   306   if scheduler_active () then ()
   307   else scheduler := SOME (SimpleThread.fork false scheduler_loop));
   308 
   309 
   310 
   311 (** futures **)
   312 
   313 (* fork *)
   314 
   315 fun fork_future opt_group deps pri e =
   316   let
   317     val group =
   318       (case opt_group of
   319         SOME group => group
   320       | NONE => Task_Queue.new_group (worker_group ()));
   321     val (result, job) = future_job group e;
   322     val task = SYNCHRONIZED "enqueue" (fn () =>
   323       let
   324         val (task, minimal) =
   325           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   326         val _ = if minimal then signal work_available else ();
   327         val _ = scheduler_check ();
   328       in task end);
   329   in Future {task = task, group = group, result = result} end;
   330 
   331 fun fork_group group e = fork_future (SOME group) [] 0 e;
   332 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   333 fun fork_deps deps e = fork_deps_pri deps 0 e;
   334 fun fork_pri pri e = fork_deps_pri [] pri e;
   335 fun fork e = fork_deps [] e;
   336 
   337 
   338 (* join *)
   339 
   340 local
   341 
   342 fun get_result x =
   343   (case peek x of
   344     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   345   | SOME (Exn.Exn Exn.Interrupt) =>
   346       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   347   | SOME res => res);
   348 
   349 fun join_wait x =
   350   Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
   351 
   352 fun join_next deps = (*requires SYNCHRONIZED*)
   353   if null deps then NONE
   354   else
   355     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   356       (NONE, []) => NONE
   357     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   358     | (SOME work, deps') => SOME (work, deps'));
   359 
   360 fun join_work deps =
   361   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   362     NONE => ()
   363   | SOME (work, deps') => (execute "join" work; join_work deps'));
   364 
   365 in
   366 
   367 fun join_results xs =
   368   if forall is_finished xs then map get_result xs
   369   else if Multithreading.self_critical () then
   370     error "Cannot join future values within critical section"
   371   else uninterruptible (fn _ => fn () =>
   372      (if is_worker ()
   373       then join_work (map task_of xs)
   374       else List.app join_wait xs;
   375       map get_result xs)) ();
   376 
   377 end;
   378 
   379 fun join_result x = singleton join_results x;
   380 fun join x = Exn.release (join_result x);
   381 
   382 
   383 (* map *)
   384 
   385 fun map_future f x =
   386   let
   387     val task = task_of x;
   388     val group = Task_Queue.new_group (SOME (group_of x));
   389     val (result, job) = future_job group (fn () => f (join x));
   390 
   391     val extended = SYNCHRONIZED "extend" (fn () =>
   392       (case Task_Queue.extend task job (! queue) of
   393         SOME queue' => (queue := queue'; true)
   394       | NONE => false));
   395   in
   396     if extended then Future {task = task, group = group, result = result}
   397     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   398   end;
   399 
   400 
   401 (* cancellation *)
   402 
   403 fun interruptible_task f x =
   404   if Multithreading.available then
   405     Multithreading.with_attributes
   406       (if is_worker ()
   407        then Multithreading.private_interrupts
   408        else Multithreading.public_interrupts)
   409       (fn _ => f x)
   410   else interruptible f x;
   411 
   412 (*cancel: present and future group members will be interrupted eventually*)
   413 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
   414 fun cancel x = cancel_group (group_of x);
   415 
   416 
   417 (* shutdown *)
   418 
   419 fun shutdown () =
   420   if Multithreading.available then
   421     SYNCHRONIZED "shutdown" (fn () =>
   422      while scheduler_active () do
   423       (wait scheduler_event; broadcast_work ()))
   424   else ();
   425 
   426 
   427 (*final declarations of this structure!*)
   428 val map = map_future;
   429 
   430 end;
   431 
   432 type 'a future = 'a Future.future;
   433