src/Pure/Concurrent/future.ML
changeset 32738 15bb09ca0378
parent 32724 aaeeb0ba2035
child 32814 81897d30b97f
     1.1 --- a/src/Pure/Concurrent/future.ML	Tue Sep 29 11:48:32 2009 +0200
     1.2 +++ b/src/Pure/Concurrent/future.ML	Tue Sep 29 11:49:22 2009 +0200
     1.3 @@ -99,13 +99,13 @@
     1.4  
     1.5  (* global state *)
     1.6  
     1.7 -val queue = ref Task_Queue.empty;
     1.8 -val next = ref 0;
     1.9 -val workers = ref ([]: (Thread.thread * bool) list);
    1.10 -val scheduler = ref (NONE: Thread.thread option);
    1.11 -val excessive = ref 0;
    1.12 -val canceled = ref ([]: Task_Queue.group list);
    1.13 -val do_shutdown = ref false;
    1.14 +val queue = Unsynchronized.ref Task_Queue.empty;
    1.15 +val next = Unsynchronized.ref 0;
    1.16 +val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
    1.17 +val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
    1.18 +val excessive = Unsynchronized.ref 0;
    1.19 +val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
    1.20 +val do_shutdown = Unsynchronized.ref false;
    1.21  
    1.22  
    1.23  (* synchronization *)
    1.24 @@ -162,7 +162,8 @@
    1.25    in (result, job) end;
    1.26  
    1.27  fun do_cancel group = (*requires SYNCHRONIZED*)
    1.28 - (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
    1.29 + (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
    1.30 +  broadcast scheduler_event);
    1.31  
    1.32  fun execute name (task, group, jobs) =
    1.33    let
    1.34 @@ -171,7 +172,7 @@
    1.35        fold (fn job => fn ok => job valid andalso ok) jobs true) ();
    1.36      val _ = SYNCHRONIZED "finish" (fn () =>
    1.37        let
    1.38 -        val maximal = change_result queue (Task_Queue.finish task);
    1.39 +        val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
    1.40          val _ =
    1.41            if ok then ()
    1.42            else if Task_Queue.cancel (! queue) group then ()
    1.43 @@ -188,7 +189,8 @@
    1.44    fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
    1.45  
    1.46  fun change_active active = (*requires SYNCHRONIZED*)
    1.47 -  change workers (AList.update Thread.equal (Thread.self (), active));
    1.48 +  Unsynchronized.change workers
    1.49 +    (AList.update Thread.equal (Thread.self (), active));
    1.50  
    1.51  
    1.52  (* worker threads *)
    1.53 @@ -198,14 +200,15 @@
    1.54  
    1.55  fun worker_next () = (*requires SYNCHRONIZED*)
    1.56    if ! excessive > 0 then
    1.57 -    (dec excessive;
    1.58 -     change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
    1.59 +    (Unsynchronized.dec excessive;
    1.60 +     Unsynchronized.change workers
    1.61 +      (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
    1.62       broadcast scheduler_event;
    1.63       NONE)
    1.64    else if count_active () > Multithreading.max_threads_value () then
    1.65      (worker_wait scheduler_event; worker_next ())
    1.66    else
    1.67 -    (case change_result queue (Task_Queue.dequeue (Thread.self ())) of
    1.68 +    (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
    1.69        NONE => (worker_wait work_available; worker_next ())
    1.70      | some => some);
    1.71  
    1.72 @@ -215,13 +218,13 @@
    1.73    | SOME work => (execute name work; worker_loop name));
    1.74  
    1.75  fun worker_start name = (*requires SYNCHRONIZED*)
    1.76 -  change workers (cons (SimpleThread.fork false (fn () =>
    1.77 +  Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
    1.78       (broadcast scheduler_event; worker_loop name)), true));
    1.79  
    1.80  
    1.81  (* scheduler *)
    1.82  
    1.83 -val last_status = ref Time.zeroTime;
    1.84 +val last_status = Unsynchronized.ref Time.zeroTime;
    1.85  val next_status = Time.fromMilliseconds 500;
    1.86  val next_round = Time.fromMilliseconds 50;
    1.87  
    1.88 @@ -263,7 +266,8 @@
    1.89      val _ = excessive := l - mm;
    1.90      val _ =
    1.91        if mm > l then
    1.92 -        funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
    1.93 +        funpow (mm - l) (fn () =>
    1.94 +          worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
    1.95        else ();
    1.96  
    1.97      (*canceled groups*)
    1.98 @@ -272,7 +276,7 @@
    1.99        else
   1.100         (Multithreading.tracing 1 (fn () =>
   1.101            string_of_int (length (! canceled)) ^ " canceled groups");
   1.102 -        change canceled (filter_out (Task_Queue.cancel (! queue)));
   1.103 +        Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
   1.104          broadcast_work ());
   1.105  
   1.106      (*delay loop*)
   1.107 @@ -317,7 +321,8 @@
   1.108      val (result, job) = future_job group e;
   1.109      val task = SYNCHRONIZED "enqueue" (fn () =>
   1.110        let
   1.111 -        val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   1.112 +        val (task, minimal) =
   1.113 +          Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   1.114          val _ = if minimal then signal work_available else ();
   1.115          val _ = scheduler_check ();
   1.116        in task end);
   1.117 @@ -347,7 +352,7 @@
   1.118  fun join_next deps = (*requires SYNCHRONIZED*)
   1.119    if null deps then NONE
   1.120    else
   1.121 -    (case change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   1.122 +    (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   1.123        (NONE, []) => NONE
   1.124      | (NONE, deps') => (worker_wait work_finished; join_next deps')
   1.125      | (SOME work, deps') => SOME (work, deps'));