src/Pure/Concurrent/future.ML
author wenzelm
Wed Oct 01 12:00:01 2008 +0200 (2008-10-01)
changeset 28442 bd9573735bdd
parent 28430 29b2886114fb
child 28464 dcc030b52583
permissions -rw-r--r--
removed release_results (cf. Exn.release_all, Exn.release_first);
     1 (*  Title:      Pure/Concurrent/future.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Future values.
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   type task = TaskQueue.task
    31   type group = TaskQueue.group
    32   val thread_data: unit -> (string * task * group) option
    33   type 'a T
    34   val task_of: 'a T -> task
    35   val group_of: 'a T -> group
    36   val str_of: 'a T -> string
    37   val is_finished: 'a T -> bool
    38   val future: group option -> task list -> bool -> (unit -> 'a) -> 'a T
    39   val fork: (unit -> 'a) -> 'a T
    40   val fork_background: (unit -> 'a) -> 'a T
    41   val join_results: 'a T list -> 'a Exn.result list
    42   val join: 'a T -> 'a
    43   val focus: task list -> unit
    44   val interrupt_task: string -> unit
    45   val cancel: 'a T -> unit
    46   val shutdown: unit -> unit
    47 end;
    48 
    49 structure Future: FUTURE =
    50 struct
    51 
    52 (** future values **)
    53 
    54 (* identifiers *)
    55 
    56 type task = TaskQueue.task;
    57 type group = TaskQueue.group;
    58 
    59 local val tag = Universal.tag () : (string * task * group) option Universal.tag in
    60   fun thread_data () = the_default NONE (Thread.getLocal tag);
    61   fun setmp_thread_data data f x = Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    62 end;
    63 
    64 
    65 (* datatype future *)
    66 
    67 datatype 'a T = Future of
    68  {task: task,
    69   group: group,
    70   result: 'a Exn.result option ref};
    71 
    72 fun task_of (Future {task, ...}) = task;
    73 fun group_of (Future {group, ...}) = group;
    74 
    75 fun str_of (Future {result, ...}) =
    76   (case ! result of
    77     NONE => "<future>"
    78   | SOME (Exn.Result _) => "<finished future>"
    79   | SOME (Exn.Exn _) => "<failed future>");
    80 
    81 fun is_finished (Future {result, ...}) = is_some (! result);
    82 
    83 
    84 
    85 (** scheduling **)
    86 
    87 (* global state *)
    88 
    89 val queue = ref TaskQueue.empty;
    90 val workers = ref ([]: (Thread.thread * bool) list);
    91 val scheduler = ref (NONE: Thread.thread option);
    92 val excessive = ref 0;
    93 val canceled = ref ([]: TaskQueue.group list);
    94 val do_shutdown = ref false;
    95 
    96 
    97 (* synchronization *)
    98 
    99 local
   100   val lock = Mutex.mutex ();
   101   val cond = ConditionVar.conditionVar ();
   102 in
   103 
   104 fun SYNCHRONIZED name e = uninterruptible (fn restore_attributes => fn () =>
   105   let
   106     val _ = Multithreading.tracing 3 (fn () => name ^ ": locking");
   107     val _ = Mutex.lock lock;
   108     val _ = Multithreading.tracing 3 (fn () => name ^ ": locked");
   109     val result = Exn.capture (restore_attributes e) ();
   110     val _ = Mutex.unlock lock;
   111     val _ = Multithreading.tracing 3 (fn () => name ^ ": unlocked");
   112   in Exn.release result end) ();
   113 
   114 fun wait name = (*requires SYNCHRONIZED*)
   115   ConditionVar.wait (cond, lock);
   116 
   117 fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
   118   ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
   119 
   120 fun notify_all () = (*requires SYNCHRONIZED*)
   121   ConditionVar.broadcast cond;
   122 
   123 end;
   124 
   125 
   126 (* worker activity *)
   127 
   128 fun trace_active () =
   129   let
   130     val ws = ! workers;
   131     val m = string_of_int (length ws);
   132     val n = string_of_int (length (filter #2 ws));
   133   in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
   134 
   135 fun change_active active = (*requires SYNCHRONIZED*)
   136   change workers (AList.update Thread.equal (Thread.self (), active));
   137 
   138 
   139 (* execute *)
   140 
   141 fun execute name (task, group, run) =
   142   let
   143     val _ = trace_active ();
   144     val _ = Multithreading.tracing 3 (fn () => name ^ ": running");
   145     val ok = setmp_thread_data (name, task, group) run ();
   146     val _ = Multithreading.tracing 3 (fn () => name ^ ": finished");
   147     val _ = SYNCHRONIZED "execute" (fn () =>
   148      (change queue (TaskQueue.finish task);
   149       if ok then ()
   150       else if TaskQueue.cancel (! queue) group then ()
   151       else change canceled (cons group);
   152       notify_all ()));
   153   in () end;
   154 
   155 
   156 (* worker threads *)
   157 
   158 fun worker_wait name = (*requires SYNCHRONIZED*)
   159   (change_active false; wait name; change_active true);
   160 
   161 fun worker_next name = (*requires SYNCHRONIZED*)
   162   if ! excessive > 0 then
   163     (dec excessive;
   164      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   165      notify_all ();
   166      NONE)
   167   else
   168     (case change_result queue TaskQueue.dequeue of
   169       NONE => (worker_wait name; worker_next name)
   170     | some => some);
   171 
   172 fun worker_loop name =
   173   (case SYNCHRONIZED name (fn () => worker_next name) of
   174     NONE => Multithreading.tracing 3 (fn () => name ^ ": exit")
   175   | SOME work => (execute name work; worker_loop name));
   176 
   177 fun worker_start name = (*requires SYNCHRONIZED*)
   178   change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
   179 
   180 
   181 (* scheduler *)
   182 
   183 fun scheduler_next () = (*requires SYNCHRONIZED*)
   184   let
   185     (*worker threads*)
   186     val _ =
   187       (case List.partition (Thread.isActive o #1) (! workers) of
   188         (_, []) => ()
   189       | (active, inactive) =>
   190           (workers := active; Multithreading.tracing 0 (fn () =>
   191             "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
   192     val _ = trace_active ();
   193 
   194     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   195     val l = length (! workers);
   196     val _ = excessive := l - m;
   197     val _ =
   198       if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) ()
   199       else ();
   200 
   201     (*canceled groups*)
   202     val _ =  change canceled (filter_out (TaskQueue.cancel (! queue)));
   203 
   204     (*shutdown*)
   205     val continue = not (! do_shutdown andalso null (! workers));
   206     val _ = if continue then () else scheduler := NONE;
   207 
   208     val _ = notify_all ();
   209     val _ = wait_timeout "scheduler" (Time.fromSeconds 1);
   210   in continue end;
   211 
   212 fun scheduler_loop () =
   213  (while SYNCHRONIZED "scheduler" scheduler_next do ();
   214   Multithreading.tracing 3 (fn () => "scheduler: exit"));
   215 
   216 fun scheduler_active () = (*requires SYNCHRONIZED*)
   217   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   218 
   219 fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () =>
   220   if not (scheduler_active ()) then
   221     (Multithreading.tracing 3 (fn () => "scheduler: fork");
   222      do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
   223   else if ! do_shutdown then error "Scheduler shutdown in progress"
   224   else ());
   225 
   226 
   227 (* future values: fork independent computation *)
   228 
   229 fun future opt_group deps pri (e: unit -> 'a) =
   230   let
   231     val _ = scheduler_check ();
   232 
   233     val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
   234 
   235     val result = ref (NONE: 'a Exn.result option);
   236     val run = Multithreading.with_attributes (Thread.getAttributes ())
   237       (fn _ => fn ok =>
   238         let val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt
   239         in result := SOME res; is_some (Exn.get_result res) end);
   240 
   241     val task = SYNCHRONIZED "future" (fn () =>
   242       change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
   243   in Future {task = task, group = group, result = result} end;
   244 
   245 fun fork_common pri = future (Option.map #3 (thread_data ())) [] pri;
   246 
   247 fun fork e = fork_common true e;
   248 fun fork_background e = fork_common false e;
   249 
   250 
   251 (* join: retrieve results *)
   252 
   253 fun join_results [] = []
   254   | join_results xs =
   255       let
   256         val _ = scheduler_check ();
   257         val _ = Multithreading.self_critical () andalso
   258           error "Cannot join future values within critical section";
   259 
   260         fun join_loop _ [] = ()
   261           | join_loop name tasks =
   262               (case SYNCHRONIZED name (fn () =>
   263                   change_result queue (TaskQueue.dequeue_towards tasks)) of
   264                 NONE => ()
   265               | SOME (work, tasks') => (execute name work; join_loop name tasks'));
   266         val _ =
   267           (case thread_data () of
   268             NONE =>
   269               (*alien thread -- refrain from contending for resources*)
   270               while exists (not o is_finished) xs
   271               do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
   272           | SOME (name, task, _) =>
   273               (*proper task -- actively work towards results*)
   274               let
   275                 val unfinished = xs |> map_filter
   276                   (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
   277                 val _ = SYNCHRONIZED "join" (fn () =>
   278                   (change queue (TaskQueue.depend unfinished task); notify_all ()));
   279                 val _ = join_loop ("join_loop: " ^ name) unfinished;
   280                 val _ =
   281                   while exists (not o is_finished) xs
   282                   do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task");
   283               in () end);
   284 
   285       in xs |> map (fn Future {result = ref (SOME res), ...} => res) end;
   286 
   287 fun join x = Exn.release (singleton join_results x);
   288 
   289 
   290 (* misc operations *)
   291 
   292 (*focus: collection of high-priority task*)
   293 fun focus tasks = SYNCHRONIZED "interrupt" (fn () =>
   294   change queue (TaskQueue.focus tasks));
   295 
   296 (*interrupt: permissive signal, may get ignored*)
   297 fun interrupt_task id = SYNCHRONIZED "interrupt"
   298   (fn () => TaskQueue.interrupt_external (! queue) id);
   299 
   300 (*cancel: present and future group members will be interrupted eventually*)
   301 fun cancel x =
   302  (scheduler_check ();
   303   SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ())));
   304 
   305 
   306 (*global join and shutdown*)
   307 fun shutdown () =
   308   if Multithreading.available then
   309    (scheduler_check ();
   310     SYNCHRONIZED "shutdown" (fn () =>
   311      (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
   312       while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
   313       do_shutdown := true;
   314       notify_all ();
   315       while not (null (! workers)) do wait "shutdown: workers";
   316       while scheduler_active () do wait "shutdown: scheduler still active")))
   317   else ();
   318 
   319 end;