shutdown: global join-and-shutdown operation;
authorwenzelm
Thu, 11 Sep 2008 21:04:05 +0200
changeset 28203 88f18387f1c9
parent 28202 23cb9a974630
child 28204 2d93b158ad99
shutdown: global join-and-shutdown operation; reduced trace_active; scheduler_fork: always notify; tracing for thread exit; unique ids for workers;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Thu Sep 11 18:07:58 2008 +0200
+++ b/src/Pure/Concurrent/future.ML	Thu Sep 11 21:04:05 2008 +0200
@@ -32,7 +32,6 @@
   type 'a T
   val task_of: 'a T -> task
   val group_of: 'a T -> group
-  val shutdown_request: unit -> unit
   val future: group option -> task list -> (unit -> 'a) -> 'a T
   val fork: (unit -> 'a) -> 'a T
   val join_results: 'a T list -> 'a Exn.result list
@@ -40,6 +39,7 @@
   val focus: task list -> unit
   val cancel: 'a T -> unit
   val interrupt_task: string -> unit
+  val shutdown: unit -> unit
 end;
 
 structure Future: FUTURE =
@@ -80,13 +80,6 @@
 
 val excessive = ref 0;
 
-fun trace_active () =
-  let
-    val ws = ! workers;
-    val m = string_of_int (length ws);
-    val n = string_of_int (length (filter #2 ws));
-  in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
-
 
 (* requests *)
 
@@ -148,7 +141,13 @@
 (* worker threads *)
 
 fun change_active active = (*requires SYNCHRONIZED*)
-  (change workers (AList.update Thread.equal (Thread.self (), active)); trace_active ());
+  let
+    val _ = change workers (AList.update Thread.equal (Thread.self (), active));
+    val ws = ! workers;
+    val m = string_of_int (length ws);
+    val n = string_of_int (length (filter #2 ws));
+  in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
+
 
 fun worker_wait name = (*requires SYNCHRONIZED*)
   (change_active false; wait name; change_active true);
@@ -157,6 +156,7 @@
   if ! excessive > 0 then
     (dec excessive;
      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
+     notify_all ();
      NONE)
   else
     (case change_result queue TaskQueue.dequeue of
@@ -165,7 +165,7 @@
 
 fun worker_loop name =
   (case SYNCHRONIZED name (fn () => worker_next name) of
-    NONE => ()
+    NONE => Multithreading.tracing 4 (fn () => name ^ ": exit")
   | SOME work => (execute name work; worker_loop name));
 
 fun worker_start name = (*requires SYNCHRONIZED*)
@@ -177,7 +177,6 @@
 
 fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () =>
   let
-    val _ = trace_active ();
     val _ =
       (case List.partition (Thread.isActive o #1) (! workers) of
         (_, []) => ()
@@ -188,12 +187,16 @@
     val m = if shutdown then 0 else Multithreading.max_threads_value ();
     val l = length (! workers);
     val _ = excessive := l - m;
-    val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1);
-    val _ = if shutdown then notify_all () else ();
-  in shutdown andalso null (! workers) end);
+    val _ =
+      if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) ()
+      else ();
+    val terminate = shutdown andalso null (! workers);
+    val _ = if terminate then scheduler := NONE else ();
+    val _ = notify_all ();
+  in terminate end);
 
 fun scheduler_loop (shutdown, canceled) =
-  if scheduler_fork shutdown then ()
+  if scheduler_fork shutdown then Multithreading.tracing 4 (fn () => "scheduler: exit")
   else
     let
       val canceled' = SYNCHRONIZED "scheduler"
@@ -205,6 +208,9 @@
       | NONE => scheduler_loop (shutdown, canceled'))
     end;
 
+fun scheduler_active () = (*requires SYNCHRONIZED*)
+  (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
+
 fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () =>
   if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then ()
   else scheduler :=
@@ -246,20 +252,20 @@
     (*alien thread -- refrain from contending for resources*)
     fun passive_join () = (*requires SYNCHRONIZED*)
       (case unfinished () of [] => ()
-      | _ => (wait "join"; passive_join ()));
+      | _ => (wait "passive_join"; passive_join ()));
 
     (*proper worker thread -- actively work towards results*)
     fun active_join () = (*requires SYNCHRONIZED*)
       (case unfinished () of [] => ()
       | tasks =>
           (case change_result queue (TaskQueue.dequeue_towards tasks) of
-            NONE => (worker_wait "join"; active_join ())
-          | SOME work => (execute "join" work; active_join ())));
+            NONE => (worker_wait "active_join"; active_join ())
+          | SOME work => (execute "active_join" work; active_join ())));
 
     val _ =
       (case thread_data () of
-        NONE => SYNCHRONIZED "join" passive_join
-      | SOME (task, _) => SYNCHRONIZED "join" (fn () =>
+        NONE => SYNCHRONIZED "passive_join" passive_join
+      | SOME (task, _) => SYNCHRONIZED "active_join" (fn () =>
          (change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
 
   in xs |> map (fn Future {result = ref (SOME res), ...} => res) end;
@@ -280,4 +286,13 @@
 fun interrupt_task id = SYNCHRONIZED "interrupt"
   (fn () => TaskQueue.interrupt_external (! queue) id);
 
+(*global join and shutdown*)
+fun shutdown () =
+ (scheduler_check ();
+  SYNCHRONIZED "shutdown" (fn () =>
+   (while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
+    shutdown_request ();
+    while not (null (! workers)) do wait "shutdown: workers";
+    while scheduler_active () do wait "shutdown: scheduler")));
+
 end;