src/Pure/Concurrent/future.ML
author wenzelm
Fri Mar 20 20:20:09 2009 +0100 (2009-03-20 ago)
changeset 30612 cb6421b6a18f
parent 29551 95e469919c3e
child 30618 046f4f986fb5
permissions -rw-r--r--
future_job: do not inherit attributes, but enforce restricted interrupts -- attempt to prevent interrupt race conditions;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values.
     5 
     6 Notes:
     7 
     8   * Futures are similar to delayed evaluation, i.e. delay/force is
     9     generalized to fork/join (and variants).  The idea is to model
    10     parallel value-oriented computations, but *not* communicating
    11     processes.
    12 
    13   * Futures are grouped; failure of one group member causes the whole
    14     group to be interrupted eventually.
    15 
    16   * Forked futures are evaluated spontaneously by a farm of worker
    17     threads in the background; join resynchronizes the computation and
    18     delivers results (values or exceptions).
    19 
    20   * The pool of worker threads is limited, usually in correlation with
    21     the number of physical cores on the machine.  Note that allocation
    22     of runtime resources is distorted either if workers yield CPU time
    23     (e.g. via system sleep or wait operations), or if non-worker
    24     threads contend for significant runtime resources independently.
    25 *)
    26 
    27 signature FUTURE =
    28 sig
    29   val enabled: unit -> bool
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val thread_data: unit -> (string * task) option
    33   type 'a future
    34   val task_of: 'a future -> task
    35   val group_of: 'a future -> group
    36   val peek: 'a future -> 'a Exn.result option
    37   val is_finished: 'a future -> bool
    38   val value: 'a -> 'a future
    39   val fork: (unit -> 'a) -> 'a future
    40   val fork_group: group -> (unit -> 'a) -> 'a future
    41   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    42   val fork_pri: int -> (unit -> 'a) -> 'a future
    43   val join_results: 'a future list -> 'a Exn.result list
    44   val join_result: 'a future -> 'a Exn.result
    45   val join: 'a future -> 'a
    46   val map: ('a -> 'b) -> 'a future -> 'b future
    47   val interrupt_task: string -> unit
    48   val cancel_group: group -> unit
    49   val cancel: 'a future -> unit
    50   val shutdown: unit -> unit
    51 end;
    52 
    53 structure Future: FUTURE =
    54 struct
    55 
    56 (** future values **)
    57 
    58 fun enabled () =
    59   Multithreading.enabled () andalso
    60     not (Multithreading.self_critical ());
    61 
    62 
    63 (* identifiers *)
    64 
    65 type task = Task_Queue.task;
    66 type group = Task_Queue.group;
    67 
    68 local val tag = Universal.tag () : (string * task) option Universal.tag in
    69   fun thread_data () = the_default NONE (Thread.getLocal tag);
    70   fun setmp_thread_data data f x = Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    71 end;
    72 
    73 
    74 (* datatype future *)
    75 
    76 datatype 'a future = Future of
    77  {task: task,
    78   group: group,
    79   result: 'a Exn.result option ref};
    80 
    81 fun task_of (Future {task, ...}) = task;
    82 fun group_of (Future {group, ...}) = group;
    83 
    84 fun peek (Future {result, ...}) = ! result;
    85 fun is_finished x = is_some (peek x);
    86 
    87 fun value x = Future
    88  {task = Task_Queue.new_task 0,
    89   group = Task_Queue.new_group (),
    90   result = ref (SOME (Exn.Result x))};
    91 
    92 
    93 
    94 (** scheduling **)
    95 
    96 (* global state *)
    97 
    98 val queue = ref Task_Queue.empty;
    99 val next = ref 0;
   100 val workers = ref ([]: (Thread.thread * bool) list);
   101 val scheduler = ref (NONE: Thread.thread option);
   102 val excessive = ref 0;
   103 val canceled = ref ([]: Task_Queue.group list);
   104 val do_shutdown = ref false;
   105 
   106 
   107 (* synchronization *)
   108 
   109 local
   110   val lock = Mutex.mutex ();
   111   val cond = ConditionVar.conditionVar ();
   112 in
   113 
   114 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   115 
   116 fun wait () = (*requires SYNCHRONIZED*)
   117   ConditionVar.wait (cond, lock);
   118 
   119 fun wait_timeout timeout = (*requires SYNCHRONIZED*)
   120   ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
   121 
   122 fun notify_all () = (*requires SYNCHRONIZED*)
   123   ConditionVar.broadcast cond;
   124 
   125 end;
   126 
   127 
   128 (* worker activity *)
   129 
   130 fun trace_active () =
   131   let
   132     val ws = ! workers;
   133     val m = string_of_int (length ws);
   134     val n = string_of_int (length (filter #2 ws));
   135   in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
   136 
   137 fun change_active active = (*requires SYNCHRONIZED*)
   138   change workers (AList.update Thread.equal (Thread.self (), active));
   139 
   140 
   141 (* execute jobs *)
   142 
   143 fun do_cancel group = (*requires SYNCHRONIZED*)
   144   change canceled (insert Task_Queue.eq_group group);
   145 
   146 fun execute name (task, group, jobs) =
   147   let
   148     val _ = trace_active ();
   149     val valid = Task_Queue.is_valid group;
   150     val ok = setmp_thread_data (name, task) (fn () =>
   151       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   152     val _ = SYNCHRONIZED "execute" (fn () =>
   153      (change queue (Task_Queue.finish task);
   154       if ok then ()
   155       else if Task_Queue.cancel (! queue) group then ()
   156       else do_cancel group;
   157       notify_all ()));
   158   in () end;
   159 
   160 
   161 (* worker threads *)
   162 
   163 fun worker_wait () = (*requires SYNCHRONIZED*)
   164   (change_active false; wait (); change_active true);
   165 
   166 fun worker_next () = (*requires SYNCHRONIZED*)
   167   if ! excessive > 0 then
   168     (dec excessive;
   169      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   170      notify_all ();
   171      NONE)
   172   else
   173     (case change_result queue Task_Queue.dequeue of
   174       NONE => (worker_wait (); worker_next ())
   175     | some => some);
   176 
   177 fun worker_loop name =
   178   (case SYNCHRONIZED name worker_next of
   179     NONE => ()
   180   | SOME work => (execute name work; worker_loop name));
   181 
   182 fun worker_start name = (*requires SYNCHRONIZED*)
   183   change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
   184 
   185 
   186 (* scheduler *)
   187 
   188 fun scheduler_next () = (*requires SYNCHRONIZED*)
   189   let
   190     (*worker threads*)
   191     val _ =
   192       (case List.partition (Thread.isActive o #1) (! workers) of
   193         (_, []) => ()
   194       | (active, inactive) =>
   195           (workers := active; Multithreading.tracing 0 (fn () =>
   196             "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
   197     val _ = trace_active ();
   198 
   199     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   200     val l = length (! workers);
   201     val _ = excessive := l - m;
   202     val _ =
   203       if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
   204       else ();
   205 
   206     (*canceled groups*)
   207     val _ =  change canceled (filter_out (Task_Queue.cancel (! queue)));
   208 
   209     (*shutdown*)
   210     val continue = not (! do_shutdown andalso null (! workers));
   211     val _ = if continue then () else scheduler := NONE;
   212 
   213     val _ = notify_all ();
   214     val _ = interruptible (fn () => wait_timeout (Time.fromSeconds 1)) ()
   215       handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
   216   in continue end;
   217 
   218 fun scheduler_loop () =
   219   while SYNCHRONIZED "scheduler" scheduler_next do ();
   220 
   221 fun scheduler_active () = (*requires SYNCHRONIZED*)
   222   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   223 
   224 fun scheduler_check name = SYNCHRONIZED name (fn () =>
   225   if not (scheduler_active ()) then
   226     (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
   227   else if ! do_shutdown then error "Scheduler shutdown in progress"
   228   else ());
   229 
   230 
   231 
   232 (** futures **)
   233 
   234 (* future job: fill result *)
   235 
   236 fun future_job group (e: unit -> 'a) =
   237   let
   238     val result = ref (NONE: 'a Exn.result option);
   239     val job = Multithreading.with_attributes Multithreading.restricted_interrupts
   240       (fn _ => fn ok =>
   241         let
   242           val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt;
   243           val _ = result := SOME res;
   244           val res_ok =
   245             (case res of
   246               Exn.Result _ => true
   247             | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
   248             | _ => false);
   249         in res_ok end);
   250   in (result, job) end;
   251 
   252 
   253 (* fork *)
   254 
   255 fun fork_future opt_group deps pri e =
   256   let
   257     val _ = scheduler_check "future check";
   258 
   259     val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
   260     val (result, job) = future_job group e;
   261     val task = SYNCHRONIZED "future" (fn () =>
   262       change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
   263   in Future {task = task, group = group, result = result} end;
   264 
   265 fun fork e = fork_future NONE [] 0 e;
   266 fun fork_group group e = fork_future (SOME group) [] 0 e;
   267 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   268 fun fork_pri pri e = fork_future NONE [] pri e;
   269 
   270 
   271 (* join *)
   272 
   273 local
   274 
   275 fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);
   276 
   277 fun join_next pending = (*requires SYNCHRONIZED*)
   278   if forall is_finished pending then NONE
   279   else
   280     (case change_result queue Task_Queue.dequeue of
   281       NONE => (worker_wait (); join_next pending)
   282     | some => some);
   283 
   284 fun join_loop name pending =
   285   (case SYNCHRONIZED name (fn () => join_next pending) of
   286     NONE => ()
   287   | SOME work => (execute name work; join_loop name pending));
   288 
   289 in
   290 
   291 fun join_results xs =
   292   if forall is_finished xs then map get_result xs
   293   else uninterruptible (fn _ => fn () =>
   294     let
   295       val _ = scheduler_check "join check";
   296       val _ = Multithreading.self_critical () andalso
   297         error "Cannot join future values within critical section";
   298 
   299       fun join_deps _ [] = ()
   300         | join_deps name deps =
   301             (case SYNCHRONIZED name (fn () =>
   302                 change_result queue (Task_Queue.dequeue_towards deps)) of
   303               NONE => ()
   304             | SOME (work, deps') => (execute name work; join_deps name deps'));
   305 
   306       val _ =
   307         (case thread_data () of
   308           NONE =>
   309             (*alien thread -- refrain from contending for resources*)
   310             while not (forall is_finished xs)
   311             do SYNCHRONIZED "join_thread" (fn () => wait ())
   312         | SOME (name, task) =>
   313             (*proper task -- actively work towards results*)
   314             let
   315               val pending = filter_out is_finished xs;
   316               val deps = map task_of pending;
   317               val _ = SYNCHRONIZED "join" (fn () =>
   318                 (change queue (Task_Queue.depend deps task); notify_all ()));
   319               val _ = join_deps ("join_deps: " ^ name) deps;
   320               val _ = join_loop ("join_loop: " ^ name) (filter_out is_finished pending);
   321             in () end);
   322 
   323     in map get_result xs end) ();
   324 
   325 end;
   326 
   327 fun join_result x = singleton join_results x;
   328 fun join x = Exn.release (join_result x);
   329 
   330 
   331 (* map *)
   332 
   333 fun map_future f x =
   334   let
   335     val _ = scheduler_check "map_future check";
   336 
   337     val task = task_of x;
   338     val group = Task_Queue.new_group ();
   339     val (result, job) = future_job group (fn () => f (join x));
   340 
   341     val extended = SYNCHRONIZED "map_future" (fn () =>
   342       (case Task_Queue.extend task job (! queue) of
   343         SOME queue' => (queue := queue'; true)
   344       | NONE => false));
   345   in
   346     if extended then Future {task = task, group = group, result = result}
   347     else fork_future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   348   end;
   349 
   350 
   351 (* cancellation *)
   352 
   353 (*interrupt: permissive signal, may get ignored*)
   354 fun interrupt_task id = SYNCHRONIZED "interrupt"
   355   (fn () => Task_Queue.interrupt_external (! queue) id);
   356 
   357 (*cancel: present and future group members will be interrupted eventually*)
   358 fun cancel_group group =
   359  (scheduler_check "cancel check";
   360   SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ())));
   361 
   362 fun cancel x = cancel_group (group_of x);
   363 
   364 
   365 (** global join and shutdown **)
   366 
   367 fun shutdown () =
   368   if Multithreading.available then
   369    (scheduler_check "shutdown check";
   370     SYNCHRONIZED "shutdown" (fn () =>
   371      (while not (scheduler_active ()) do wait ();
   372       while not (Task_Queue.is_empty (! queue)) do wait ();
   373       do_shutdown := true;
   374       notify_all ();
   375       while not (null (! workers)) do wait ();
   376       while scheduler_active () do wait ();
   377       OS.Process.sleep (Time.fromMilliseconds 300))))
   378   else ();
   379 
   380 
   381 (*final declarations of this structure!*)
   382 val map = map_future;
   383 
   384 end;
   385 
   386 type 'a future = 'a Future.future;
   387