renamed structure TaskQueue to Task_Queue;
authorwenzelm
Tue, 16 Dec 2008 16:25:18 +0100
changeset 29119 99941fd0cb0e
parent 29118 8f2481aa363d
child 29120 8a904ff43f28
renamed structure TaskQueue to Task_Queue; generalized fork_background to fork_pri; reduced tracing; map: inherit task priority; removed unused focus;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Tue Dec 16 12:13:53 2008 +0100
+++ b/src/Pure/Concurrent/future.ML	Tue Dec 16 16:25:18 2008 +0100
@@ -1,5 +1,4 @@
 (*  Title:      Pure/Concurrent/future.ML
-    ID:         $Id$
     Author:     Makarius
 
 Future values.
@@ -28,8 +27,8 @@
 signature FUTURE =
 sig
   val enabled: unit -> bool
-  type task = TaskQueue.task
-  type group = TaskQueue.group
+  type task = Task_Queue.task
+  type group = Task_Queue.group
   val thread_data: unit -> (string * task) option
   type 'a future
   val task_of: 'a future -> task
@@ -40,12 +39,11 @@
   val fork: (unit -> 'a) -> 'a future
   val fork_group: group -> (unit -> 'a) -> 'a future
   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
-  val fork_background: (unit -> 'a) -> 'a future
+  val fork_pri: int -> (unit -> 'a) -> 'a future
   val join_results: 'a future list -> 'a Exn.result list
   val join_result: 'a future -> 'a Exn.result
   val join: 'a future -> 'a
   val map: ('a -> 'b) -> 'a future -> 'b future
-  val focus: task list -> unit
   val interrupt_task: string -> unit
   val cancel: 'a future -> unit
   val shutdown: unit -> unit
@@ -63,8 +61,8 @@
 
 (* identifiers *)
 
-type task = TaskQueue.task;
-type group = TaskQueue.group;
+type task = Task_Queue.task;
+type group = Task_Queue.group;
 
 local val tag = Universal.tag () : (string * task) option Universal.tag in
   fun thread_data () = the_default NONE (Thread.getLocal tag);
@@ -86,8 +84,8 @@
 fun is_finished x = is_some (peek x);
 
 fun value x = Future
- {task = TaskQueue.new_task (),
-  group = TaskQueue.new_group (),
+ {task = Task_Queue.new_task 0,
+  group = Task_Queue.new_group (),
   result = ref (SOME (Exn.Result x))};
 
 
@@ -96,12 +94,12 @@
 
 (* global state *)
 
-val queue = ref TaskQueue.empty;
+val queue = ref Task_Queue.empty;
 val next = ref 0;
 val workers = ref ([]: (Thread.thread * bool) list);
 val scheduler = ref (NONE: Thread.thread option);
 val excessive = ref 0;
-val canceled = ref ([]: TaskQueue.group list);
+val canceled = ref ([]: Task_Queue.group list);
 val do_shutdown = ref false;
 
 
@@ -114,15 +112,11 @@
 
 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
 
-fun wait name = (*requires SYNCHRONIZED*)
- (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
+fun wait () = (*requires SYNCHRONIZED*)
   ConditionVar.wait (cond, lock);
-  Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
 
-fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
- (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
+fun wait_timeout timeout = (*requires SYNCHRONIZED*)
   ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
-  Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
 
 fun notify_all () = (*requires SYNCHRONIZED*)
   ConditionVar.broadcast cond;
@@ -150,9 +144,9 @@
     val _ = trace_active ();
     val ok = setmp_thread_data (name, task) run ();
     val _ = SYNCHRONIZED "execute" (fn () =>
-     (change queue (TaskQueue.finish task);
+     (change queue (Task_Queue.finish task);
       if ok then ()
-      else if TaskQueue.cancel (! queue) group then ()
+      else if Task_Queue.cancel (! queue) group then ()
       else change canceled (cons group);
       notify_all ()));
   in () end;
@@ -160,23 +154,23 @@
 
 (* worker threads *)
 
-fun worker_wait name = (*requires SYNCHRONIZED*)
-  (change_active false; wait name; change_active true);
+fun worker_wait () = (*requires SYNCHRONIZED*)
+  (change_active false; wait (); change_active true);
 
-fun worker_next name = (*requires SYNCHRONIZED*)
+fun worker_next () = (*requires SYNCHRONIZED*)
   if ! excessive > 0 then
     (dec excessive;
      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
      notify_all ();
      NONE)
   else
-    (case change_result queue TaskQueue.dequeue of
-      NONE => (worker_wait name; worker_next name)
+    (case change_result queue Task_Queue.dequeue of
+      NONE => (worker_wait (); worker_next ())
     | some => some);
 
 fun worker_loop name =
-  (case SYNCHRONIZED name (fn () => worker_next name) of
-    NONE => Multithreading.tracing 3 (fn () => name ^ ": exit")
+  (case SYNCHRONIZED name worker_next of
+    NONE => ()
   | SOME work => (execute name work; worker_loop name));
 
 fun worker_start name = (*requires SYNCHRONIZED*)
@@ -204,27 +198,25 @@
       else ();
 
     (*canceled groups*)
-    val _ =  change canceled (filter_out (TaskQueue.cancel (! queue)));
+    val _ =  change canceled (filter_out (Task_Queue.cancel (! queue)));
 
     (*shutdown*)
     val continue = not (! do_shutdown andalso null (! workers));
     val _ = if continue then () else scheduler := NONE;
 
     val _ = notify_all ();
-    val _ = wait_timeout "scheduler" (Time.fromSeconds 3);
+    val _ = wait_timeout (Time.fromSeconds 3);
   in continue end;
 
 fun scheduler_loop () =
- (while SYNCHRONIZED "scheduler" scheduler_next do ();
-  Multithreading.tracing 3 (fn () => "scheduler: exit"));
+  while SYNCHRONIZED "scheduler" scheduler_next do ();
 
 fun scheduler_active () = (*requires SYNCHRONIZED*)
   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
 
 fun scheduler_check name = SYNCHRONIZED name (fn () =>
   if not (scheduler_active ()) then
-    (Multithreading.tracing 3 (fn () => "scheduler: fork");
-     do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
+    (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
   else if ! do_shutdown then error "Scheduler shutdown in progress"
   else ());
 
@@ -235,7 +227,7 @@
   let
     val _ = scheduler_check "future check";
 
-    val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
+    val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
 
     val result = ref (NONE: 'a Exn.result option);
     val run = Multithreading.with_attributes (Thread.getAttributes ())
@@ -246,18 +238,18 @@
           val res_ok =
             (case res of
               Exn.Result _ => true
-            | Exn.Exn Exn.Interrupt => (TaskQueue.invalidate_group group; true)
+            | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
             | _ => false);
         in res_ok end);
 
     val task = SYNCHRONIZED "future" (fn () =>
-      change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
+      change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ());
   in Future {task = task, group = group, result = result} end;
 
-fun fork e = future NONE [] true e;
-fun fork_group group e = future (SOME group) [] true e;
-fun fork_deps deps e = future NONE (map task_of deps) true e;
-fun fork_background e = future NONE [] false e;
+fun fork e = future NONE [] 0 e;
+fun fork_group group e = future (SOME group) [] 0 e;
+fun fork_deps deps e = future NONE (map task_of deps) 0 e;
+fun fork_pri pri e = future NONE [] pri e;
 
 
 (* join: retrieve results *)
@@ -273,7 +265,7 @@
         fun join_loop _ [] = ()
           | join_loop name tasks =
               (case SYNCHRONIZED name (fn () =>
-                  change_result queue (TaskQueue.dequeue_towards tasks)) of
+                  change_result queue (Task_Queue.dequeue_towards tasks)) of
                 NONE => ()
               | SOME (work, tasks') => (execute name work; join_loop name tasks'));
         val _ =
@@ -281,18 +273,18 @@
             NONE =>
               (*alien thread -- refrain from contending for resources*)
               while exists (not o is_finished) xs
-              do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
+              do SYNCHRONIZED "join_thread" (fn () => wait ())
           | SOME (name, task) =>
               (*proper task -- actively work towards results*)
               let
                 val unfinished = xs |> map_filter
                   (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
                 val _ = SYNCHRONIZED "join" (fn () =>
-                  (change queue (TaskQueue.depend unfinished task); notify_all ()));
+                  (change queue (Task_Queue.depend unfinished task); notify_all ()));
                 val _ = join_loop ("join_loop: " ^ name) unfinished;
                 val _ =
                   while exists (not o is_finished) xs
-                  do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task");
+                  do SYNCHRONIZED "join_task" (fn () => worker_wait ());
               in () end);
 
       in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
@@ -300,18 +292,16 @@
 fun join_result x = singleton join_results x;
 fun join x = Exn.release (join_result x);
 
-fun map f x = fork_deps [x] (fn () => f (join x));
+fun map f x =
+  let val task = task_of x
+  in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end;
 
 
 (* misc operations *)
 
-(*focus: collection of high-priority task*)
-fun focus tasks = SYNCHRONIZED "focus" (fn () =>
-  change queue (TaskQueue.focus tasks));
-
 (*interrupt: permissive signal, may get ignored*)
 fun interrupt_task id = SYNCHRONIZED "interrupt"
-  (fn () => TaskQueue.interrupt_external (! queue) id);
+  (fn () => Task_Queue.interrupt_external (! queue) id);
 
 (*cancel: present and future group members will be interrupted eventually*)
 fun cancel x =
@@ -324,12 +314,12 @@
   if Multithreading.available then
    (scheduler_check "shutdown check";
     SYNCHRONIZED "shutdown" (fn () =>
-     (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
-      while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
+     (while not (scheduler_active ()) do wait ();
+      while not (Task_Queue.is_empty (! queue)) do wait ();
       do_shutdown := true;
       notify_all ();
-      while not (null (! workers)) do wait "shutdown: workers";
-      while scheduler_active () do wait "shutdown: scheduler still active";
+      while not (null (! workers)) do wait ();
+      while scheduler_active () do wait ();
       OS.Process.sleep (Time.fromMilliseconds 300))))
   else ();