src/Pure/Concurrent/future.ML
changeset 28192 6d977729c8fa
parent 28191 9e5f556409c6
child 28193 7ed74d0ba607
equal deleted inserted replaced
28191:9e5f556409c6 28192:6d977729c8fa
    51 (** scheduling **)
    51 (** scheduling **)
    52 
    52 
    53 (* global state *)
    53 (* global state *)
    54 
    54 
    55 val queue = ref TaskQueue.empty;
    55 val queue = ref TaskQueue.empty;
    56 val workers = ref ([]: Thread.thread list);
    56 val workers = ref ([]: (Thread.thread * bool) list);
    57 val scheduler = ref (NONE: Thread.thread option);
    57 val scheduler = ref (NONE: Thread.thread option);
    58 
    58 
    59 val excessive = ref 0;
    59 val excessive = ref 0;
    60 val active = ref 0;
       
    61 
    60 
    62 fun trace_active () =
    61 fun trace_active () =
    63   Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int (! active) ^ " active");
    62   let
       
    63     val ws = ! workers;
       
    64     val m = string_of_int (length ws);
       
    65     val n = string_of_int (length (filter #2 ws));
       
    66   in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
    64 
    67 
    65 
    68 
    66 (* requests *)
    69 (* requests *)
    67 
    70 
    68 datatype request = Shutdown | Cancel of group;
    71 datatype request = Shutdown | Cancel of group;
    77 local
    80 local
    78   val lock = Mutex.mutex ();
    81   val lock = Mutex.mutex ();
    79   val cond = ConditionVar.conditionVar ();
    82   val cond = ConditionVar.conditionVar ();
    80 in
    83 in
    81 
    84 
    82 fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () =>
    85 fun SYNCHRONIZED name e = uninterruptible (fn restore_attributes => fn () =>
    83   let
    86   let
       
    87     val _ = Multithreading.tracing 4 (fn () => name ^ ": locking");
    84     val _ = Mutex.lock lock;
    88     val _ = Mutex.lock lock;
       
    89     val _ = Multithreading.tracing 4 (fn () => name ^ ": locked");
    85     val result = Exn.capture (restore_attributes e) ();
    90     val result = Exn.capture (restore_attributes e) ();
    86     val _ = Mutex.unlock lock;
    91     val _ = Mutex.unlock lock;
       
    92     val _ = Multithreading.tracing 4 (fn () => name ^ ": unlocked");
    87   in Exn.release result end) ();
    93   in Exn.release result end) ();
    88 
    94 
    89 fun wait name = (*requires SYNCHRONIZED*)
    95 fun wait name = (*requires SYNCHRONIZED*)
    90   let
    96   let
    91     val _ = Multithreading.tracing 4 (fn () => name ^ " : waiting");
    97     val _ = Multithreading.tracing 4 (fn () => name ^ ": waiting");
    92     val _ = ConditionVar.wait (cond, lock);
    98     val _ = ConditionVar.wait (cond, lock);
    93     val _ = Multithreading.tracing 4 (fn () => name ^ " : notified");
    99     val _ = Multithreading.tracing 4 (fn () => name ^ ": notified");
    94   in () end;
   100   in () end;
    95 
   101 
    96 fun notify_all () = (*requires SYNCHRONIZED*)
   102 fun notify_all () = (*requires SYNCHRONIZED*)
    97   ConditionVar.broadcast cond;
   103   ConditionVar.broadcast cond;
    98 
   104 
   106     val _ = set_thread_data (SOME (task, group));
   112     val _ = set_thread_data (SOME (task, group));
   107     val _ = Multithreading.tracing 4 (fn () => name ^ ": running");
   113     val _ = Multithreading.tracing 4 (fn () => name ^ ": running");
   108     val ok = run ();
   114     val ok = run ();
   109     val _ = Multithreading.tracing 4 (fn () => name ^ ": finished");
   115     val _ = Multithreading.tracing 4 (fn () => name ^ ": finished");
   110     val _ = set_thread_data NONE;
   116     val _ = set_thread_data NONE;
   111     val _ = SYNCHRONIZED (fn () =>
   117     val _ = SYNCHRONIZED "execute" (fn () =>
   112      (change queue (TaskQueue.finish task);
   118      (change queue (TaskQueue.finish task);
   113       if ok then ()
   119       if ok then ()
   114       else if TaskQueue.cancel (! queue) group then ()
   120       else if TaskQueue.cancel (! queue) group then ()
   115       else cancel_request group;
   121       else cancel_request group;
   116       notify_all ()));
   122       notify_all ()));
   117   in () end;
   123   in () end;
   118 
   124 
   119 
   125 
   120 (* worker threads *)
   126 (* worker threads *)
   121 
   127 
   122 fun change_active b = (*requires SYNCHRONIZED*)
   128 fun change_active active = (*requires SYNCHRONIZED*)
   123   (change active (fn n => if b then n + 1 else n - 1); trace_active ());
   129   (change workers (AList.update Thread.equal (Thread.self (), active)); trace_active ());
   124 
   130 
   125 fun worker_wait name = (*requires SYNCHRONIZED*)
   131 fun worker_wait name = (*requires SYNCHRONIZED*)
   126   (change_active false; wait name; change_active true);
   132   (change_active false; wait name; change_active true);
   127 
   133 
   128 fun worker_next name = (*requires SYNCHRONIZED*)
   134 fun worker_next name = (*requires SYNCHRONIZED*)
   129   if ! excessive > 0 then
   135   if ! excessive > 0 then
   130     (dec excessive;
   136     (dec excessive;
   131      change_active false;
   137      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   132      change workers (remove Thread.equal (Thread.self ()));
       
   133      NONE)
   138      NONE)
   134   else
   139   else
   135     (case change_result queue TaskQueue.dequeue of
   140     (case change_result queue TaskQueue.dequeue of
   136       NONE => (worker_wait name; worker_next name)
   141       NONE => (worker_wait name; worker_next name)
   137     | some => some);
   142     | some => some);
   138 
   143 
   139 fun worker_loop name =
   144 fun worker_loop name =
   140   (case SYNCHRONIZED (fn () => worker_next name) of
   145   (case SYNCHRONIZED name (fn () => worker_next name) of
   141     NONE => ()
   146     NONE => ()
   142   | SOME work => (execute name work; worker_loop name));
   147   | SOME work => (execute name work; worker_loop name));
   143 
   148 
   144 fun worker_start name = (*requires SYNCHRONIZED*)
   149 fun worker_start name = (*requires SYNCHRONIZED*)
   145  (change_active true;
   150   change workers
   146   change workers (cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts))));
   151     (cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts), true));
   147 
   152 
   148 
   153 
   149 (* scheduler *)
   154 (* scheduler *)
   150 
   155 
   151 fun scheduler_fork shutdown = SYNCHRONIZED (fn () =>
   156 fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () =>
   152   let
   157   let
   153     val _ = trace_active ();
   158     val _ = trace_active ();
   154     val _ =
   159     val _ =
   155       (case List.partition Thread.isActive (! workers) of
   160       (case List.partition (Thread.isActive o #1) (! workers) of
   156         (_, []) => ()
   161         (_, []) => ()
   157       | (active, inactive) =>
   162       | (active, inactive) =>
   158           (workers := active; Multithreading.tracing 0 (fn () =>
   163           (workers := active; Multithreading.tracing 0 (fn () =>
   159             "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " inactive worker threads")));
   164             "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
   160 
   165 
   161     val m = if shutdown then 0 else Multithreading.max_threads_value ();
   166     val m = if shutdown then 0 else Multithreading.max_threads_value ();
   162     val l = length (! workers);
   167     val l = length (! workers);
   163     val _ = excessive := l - m;
   168     val _ = excessive := l - m;
   164     val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1);
   169     val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1);
   165   in null (! workers) end);
   170     val _ = if shutdown then notify_all () else ();
       
   171   in shutdown andalso null (! workers) end);
   166 
   172 
   167 fun scheduler_loop (shutdown, canceled) =
   173 fun scheduler_loop (shutdown, canceled) =
   168   if scheduler_fork shutdown then ()
   174   if scheduler_fork shutdown then ()
   169   else
   175   else
   170     let val canceled' = SYNCHRONIZED (fn () => filter_out (TaskQueue.cancel (! queue)) canceled) in
   176     let
       
   177       val canceled' = SYNCHRONIZED "scheduler"
       
   178         (fn () => filter_out (TaskQueue.cancel (! queue)) canceled);
       
   179     in
   171       (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
   180       (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
   172         SOME Shutdown => scheduler_loop (true, canceled')
   181         SOME Shutdown => scheduler_loop (true, canceled')
   173       | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled')
   182       | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled')
   174       | NONE => scheduler_loop (shutdown, canceled'))
   183       | NONE => scheduler_loop (shutdown, canceled'))
   175     end;
   184     end;
   176 
   185 
   177 fun scheduler_check () = SYNCHRONIZED (fn () =>
   186 fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () =>
   178   if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then ()
   187   if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then ()
   179   else scheduler :=
   188   else scheduler :=
   180     SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts)));
   189     SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts)));
   181 
   190 
   182 
   191 
   192     val run = Multithreading.with_attributes (Thread.getAttributes ())
   201     val run = Multithreading.with_attributes (Thread.getAttributes ())
   193       (fn _ => fn ok =>
   202       (fn _ => fn ok =>
   194         let val res = if ok then Exn.capture e () else Exn.Exn Interrupt
   203         let val res = if ok then Exn.capture e () else Exn.Exn Interrupt
   195         in result := SOME res; is_some (Exn.get_result res) end);
   204         in result := SOME res; is_some (Exn.get_result res) end);
   196 
   205 
   197     val task = SYNCHRONIZED (fn () =>
   206     val task = SYNCHRONIZED "future" (fn () =>
   198       change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
   207       change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
   199   in Future {task = task, group = group, result = result} end;
   208   in Future {task = task, group = group, result = result} end;
   200 
   209 
   201 fun fork e = future (Option.map #2 (thread_data ())) [] e;
   210 fun fork e = future (Option.map #2 (thread_data ())) [] e;
   202 
   211 
   223             NONE => (worker_wait "join"; active_join ())
   232             NONE => (worker_wait "join"; active_join ())
   224           | SOME work => (execute "join" work; active_join ())));
   233           | SOME work => (execute "join" work; active_join ())));
   225 
   234 
   226     val _ =
   235     val _ =
   227       (case thread_data () of
   236       (case thread_data () of
   228         NONE => SYNCHRONIZED passive_join
   237         NONE => SYNCHRONIZED "join" passive_join
   229       | SOME (task, _) => SYNCHRONIZED (fn () =>
   238       | SOME (task, _) => SYNCHRONIZED "join" (fn () =>
   230          (change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
   239          (change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
   231 
   240 
   232     val res = xs |> map (fn Future {result = ref (SOME res), ...} => res);
   241     val res = xs |> map (fn Future {result = ref (SOME res), ...} => res);
   233   in
   242   in
   234     (case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of
   243     (case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of
   243 
   252 
   244 (*cancel: present and future group members will be interrupted eventually*)
   253 (*cancel: present and future group members will be interrupted eventually*)
   245 fun cancel x = (scheduler_check (); cancel_request (group_of x));
   254 fun cancel x = (scheduler_check (); cancel_request (group_of x));
   246 
   255 
   247 (*interrupt: adhoc signal, permissive, may get ignored*)
   256 (*interrupt: adhoc signal, permissive, may get ignored*)
   248 fun interrupt_task id = SYNCHRONIZED (fn () => TaskQueue.interrupt (! queue) id);
   257 fun interrupt_task id = SYNCHRONIZED "interrupt" (fn () => TaskQueue.interrupt (! queue) id);
   249 
   258 
   250 end;
   259 end;