src/Pure/Concurrent/future.ML
changeset 32738 15bb09ca0378
parent 32724 aaeeb0ba2035
child 32814 81897d30b97f
--- a/src/Pure/Concurrent/future.ML	Tue Sep 29 11:48:32 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Tue Sep 29 11:49:22 2009 +0200
@@ -99,13 +99,13 @@
 
 (* global state *)
 
-val queue = ref Task_Queue.empty;
-val next = ref 0;
-val workers = ref ([]: (Thread.thread * bool) list);
-val scheduler = ref (NONE: Thread.thread option);
-val excessive = ref 0;
-val canceled = ref ([]: Task_Queue.group list);
-val do_shutdown = ref false;
+val queue = Unsynchronized.ref Task_Queue.empty;
+val next = Unsynchronized.ref 0;
+val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
+val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
+val excessive = Unsynchronized.ref 0;
+val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
+val do_shutdown = Unsynchronized.ref false;
 
 
 (* synchronization *)
@@ -162,7 +162,8 @@
   in (result, job) end;
 
 fun do_cancel group = (*requires SYNCHRONIZED*)
- (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
+ (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
+  broadcast scheduler_event);
 
 fun execute name (task, group, jobs) =
   let
@@ -171,7 +172,7 @@
       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
     val _ = SYNCHRONIZED "finish" (fn () =>
       let
-        val maximal = change_result queue (Task_Queue.finish task);
+        val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
         val _ =
           if ok then ()
           else if Task_Queue.cancel (! queue) group then ()
@@ -188,7 +189,8 @@
   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
 
 fun change_active active = (*requires SYNCHRONIZED*)
-  change workers (AList.update Thread.equal (Thread.self (), active));
+  Unsynchronized.change workers
+    (AList.update Thread.equal (Thread.self (), active));
 
 
 (* worker threads *)
@@ -198,14 +200,15 @@
 
 fun worker_next () = (*requires SYNCHRONIZED*)
   if ! excessive > 0 then
-    (dec excessive;
-     change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
+    (Unsynchronized.dec excessive;
+     Unsynchronized.change workers
+      (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
      broadcast scheduler_event;
      NONE)
   else if count_active () > Multithreading.max_threads_value () then
     (worker_wait scheduler_event; worker_next ())
   else
-    (case change_result queue (Task_Queue.dequeue (Thread.self ())) of
+    (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
       NONE => (worker_wait work_available; worker_next ())
     | some => some);
 
@@ -215,13 +218,13 @@
   | SOME work => (execute name work; worker_loop name));
 
 fun worker_start name = (*requires SYNCHRONIZED*)
-  change workers (cons (SimpleThread.fork false (fn () =>
+  Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
      (broadcast scheduler_event; worker_loop name)), true));
 
 
 (* scheduler *)
 
-val last_status = ref Time.zeroTime;
+val last_status = Unsynchronized.ref Time.zeroTime;
 val next_status = Time.fromMilliseconds 500;
 val next_round = Time.fromMilliseconds 50;
 
@@ -263,7 +266,8 @@
     val _ = excessive := l - mm;
     val _ =
       if mm > l then
-        funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
+        funpow (mm - l) (fn () =>
+          worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
       else ();
 
     (*canceled groups*)
@@ -272,7 +276,7 @@
       else
        (Multithreading.tracing 1 (fn () =>
           string_of_int (length (! canceled)) ^ " canceled groups");
-        change canceled (filter_out (Task_Queue.cancel (! queue)));
+        Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
         broadcast_work ());
 
     (*delay loop*)
@@ -317,7 +321,8 @@
     val (result, job) = future_job group e;
     val task = SYNCHRONIZED "enqueue" (fn () =>
       let
-        val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
+        val (task, minimal) =
+          Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
         val _ = if minimal then signal work_available else ();
         val _ = scheduler_check ();
       in task end);
@@ -347,7 +352,7 @@
 fun join_next deps = (*requires SYNCHRONIZED*)
   if null deps then NONE
   else
-    (case change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
+    (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
       (NONE, []) => NONE
     | (NONE, deps') => (worker_wait work_finished; join_next deps')
     | (SOME work, deps') => SOME (work, deps'));