src/Pure/Concurrent/future.ML
changeset 32738 15bb09ca0378
parent 32724 aaeeb0ba2035
child 32814 81897d30b97f
equal deleted inserted replaced
32737:76fa673eee8b 32738:15bb09ca0378
    97 
    97 
    98 (** scheduling **)
    98 (** scheduling **)
    99 
    99 
   100 (* global state *)
   100 (* global state *)
   101 
   101 
   102 val queue = ref Task_Queue.empty;
   102 val queue = Unsynchronized.ref Task_Queue.empty;
   103 val next = ref 0;
   103 val next = Unsynchronized.ref 0;
   104 val workers = ref ([]: (Thread.thread * bool) list);
   104 val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
   105 val scheduler = ref (NONE: Thread.thread option);
   105 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   106 val excessive = ref 0;
   106 val excessive = Unsynchronized.ref 0;
   107 val canceled = ref ([]: Task_Queue.group list);
   107 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   108 val do_shutdown = ref false;
   108 val do_shutdown = Unsynchronized.ref false;
   109 
   109 
   110 
   110 
   111 (* synchronization *)
   111 (* synchronization *)
   112 
   112 
   113 val scheduler_event = ConditionVar.conditionVar ();
   113 val scheduler_event = ConditionVar.conditionVar ();
   160         | Exn.Result _ => true)
   160         | Exn.Result _ => true)
   161       end;
   161       end;
   162   in (result, job) end;
   162   in (result, job) end;
   163 
   163 
   164 fun do_cancel group = (*requires SYNCHRONIZED*)
   164 fun do_cancel group = (*requires SYNCHRONIZED*)
   165  (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
   165  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
       
   166   broadcast scheduler_event);
   166 
   167 
   167 fun execute name (task, group, jobs) =
   168 fun execute name (task, group, jobs) =
   168   let
   169   let
   169     val valid = not (Task_Queue.is_canceled group);
   170     val valid = not (Task_Queue.is_canceled group);
   170     val ok = setmp_thread_data (name, task, group) (fn () =>
   171     val ok = setmp_thread_data (name, task, group) (fn () =>
   171       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   172       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   172     val _ = SYNCHRONIZED "finish" (fn () =>
   173     val _ = SYNCHRONIZED "finish" (fn () =>
   173       let
   174       let
   174         val maximal = change_result queue (Task_Queue.finish task);
   175         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   175         val _ =
   176         val _ =
   176           if ok then ()
   177           if ok then ()
   177           else if Task_Queue.cancel (! queue) group then ()
   178           else if Task_Queue.cancel (! queue) group then ()
   178           else do_cancel group;
   179           else do_cancel group;
   179         val _ = broadcast work_finished;
   180         val _ = broadcast work_finished;
   186 
   187 
   187 fun count_active () = (*requires SYNCHRONIZED*)
   188 fun count_active () = (*requires SYNCHRONIZED*)
   188   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
   189   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
   189 
   190 
   190 fun change_active active = (*requires SYNCHRONIZED*)
   191 fun change_active active = (*requires SYNCHRONIZED*)
   191   change workers (AList.update Thread.equal (Thread.self (), active));
   192   Unsynchronized.change workers
       
   193     (AList.update Thread.equal (Thread.self (), active));
   192 
   194 
   193 
   195 
   194 (* worker threads *)
   196 (* worker threads *)
   195 
   197 
   196 fun worker_wait cond = (*requires SYNCHRONIZED*)
   198 fun worker_wait cond = (*requires SYNCHRONIZED*)
   197   (change_active false; wait cond; change_active true);
   199   (change_active false; wait cond; change_active true);
   198 
   200 
   199 fun worker_next () = (*requires SYNCHRONIZED*)
   201 fun worker_next () = (*requires SYNCHRONIZED*)
   200   if ! excessive > 0 then
   202   if ! excessive > 0 then
   201     (dec excessive;
   203     (Unsynchronized.dec excessive;
   202      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   204      Unsynchronized.change workers
       
   205       (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   203      broadcast scheduler_event;
   206      broadcast scheduler_event;
   204      NONE)
   207      NONE)
   205   else if count_active () > Multithreading.max_threads_value () then
   208   else if count_active () > Multithreading.max_threads_value () then
   206     (worker_wait scheduler_event; worker_next ())
   209     (worker_wait scheduler_event; worker_next ())
   207   else
   210   else
   208     (case change_result queue (Task_Queue.dequeue (Thread.self ())) of
   211     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   209       NONE => (worker_wait work_available; worker_next ())
   212       NONE => (worker_wait work_available; worker_next ())
   210     | some => some);
   213     | some => some);
   211 
   214 
   212 fun worker_loop name =
   215 fun worker_loop name =
   213   (case SYNCHRONIZED name (fn () => worker_next ()) of
   216   (case SYNCHRONIZED name (fn () => worker_next ()) of
   214     NONE => ()
   217     NONE => ()
   215   | SOME work => (execute name work; worker_loop name));
   218   | SOME work => (execute name work; worker_loop name));
   216 
   219 
   217 fun worker_start name = (*requires SYNCHRONIZED*)
   220 fun worker_start name = (*requires SYNCHRONIZED*)
   218   change workers (cons (SimpleThread.fork false (fn () =>
   221   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
   219      (broadcast scheduler_event; worker_loop name)), true));
   222      (broadcast scheduler_event; worker_loop name)), true));
   220 
   223 
   221 
   224 
   222 (* scheduler *)
   225 (* scheduler *)
   223 
   226 
   224 val last_status = ref Time.zeroTime;
   227 val last_status = Unsynchronized.ref Time.zeroTime;
   225 val next_status = Time.fromMilliseconds 500;
   228 val next_status = Time.fromMilliseconds 500;
   226 val next_round = Time.fromMilliseconds 50;
   229 val next_round = Time.fromMilliseconds 50;
   227 
   230 
   228 fun scheduler_next () = (*requires SYNCHRONIZED*)
   231 fun scheduler_next () = (*requires SYNCHRONIZED*)
   229   let
   232   let
   261     val mm = if m = 9999 then 1 else m * 2;
   264     val mm = if m = 9999 then 1 else m * 2;
   262     val l = length (! workers);
   265     val l = length (! workers);
   263     val _ = excessive := l - mm;
   266     val _ = excessive := l - mm;
   264     val _ =
   267     val _ =
   265       if mm > l then
   268       if mm > l then
   266         funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
   269         funpow (mm - l) (fn () =>
       
   270           worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
   267       else ();
   271       else ();
   268 
   272 
   269     (*canceled groups*)
   273     (*canceled groups*)
   270     val _ =
   274     val _ =
   271       if null (! canceled) then ()
   275       if null (! canceled) then ()
   272       else
   276       else
   273        (Multithreading.tracing 1 (fn () =>
   277        (Multithreading.tracing 1 (fn () =>
   274           string_of_int (length (! canceled)) ^ " canceled groups");
   278           string_of_int (length (! canceled)) ^ " canceled groups");
   275         change canceled (filter_out (Task_Queue.cancel (! queue)));
   279         Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
   276         broadcast_work ());
   280         broadcast_work ());
   277 
   281 
   278     (*delay loop*)
   282     (*delay loop*)
   279     val _ = Exn.release (wait_timeout next_round scheduler_event);
   283     val _ = Exn.release (wait_timeout next_round scheduler_event);
   280 
   284 
   315         SOME group => group
   319         SOME group => group
   316       | NONE => Task_Queue.new_group (worker_group ()));
   320       | NONE => Task_Queue.new_group (worker_group ()));
   317     val (result, job) = future_job group e;
   321     val (result, job) = future_job group e;
   318     val task = SYNCHRONIZED "enqueue" (fn () =>
   322     val task = SYNCHRONIZED "enqueue" (fn () =>
   319       let
   323       let
   320         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   324         val (task, minimal) =
       
   325           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   321         val _ = if minimal then signal work_available else ();
   326         val _ = if minimal then signal work_available else ();
   322         val _ = scheduler_check ();
   327         val _ = scheduler_check ();
   323       in task end);
   328       in task end);
   324   in Future {task = task, group = group, result = result} end;
   329   in Future {task = task, group = group, result = result} end;
   325 
   330 
   345   Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
   350   Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
   346 
   351 
   347 fun join_next deps = (*requires SYNCHRONIZED*)
   352 fun join_next deps = (*requires SYNCHRONIZED*)
   348   if null deps then NONE
   353   if null deps then NONE
   349   else
   354   else
   350     (case change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   355     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   351       (NONE, []) => NONE
   356       (NONE, []) => NONE
   352     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   357     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   353     | (SOME work, deps') => SOME (work, deps'));
   358     | (SOME work, deps') => SOME (work, deps'));
   354 
   359 
   355 fun join_work deps =
   360 fun join_work deps =