eliminated requests, use global state variables uniformly;
authorwenzelm
Thu, 11 Sep 2008 21:53:53 +0200
changeset 28206 bcd48c6897d4
parent 28205 17a81e481142
child 28207 e2431cc4dc66
eliminated requests, use global state variables uniformly; more robust shutdown;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Thu Sep 11 21:04:09 2008 +0200
+++ b/src/Pure/Concurrent/future.ML	Thu Sep 11 21:53:53 2008 +0200
@@ -37,8 +37,8 @@
   val join_results: 'a T list -> 'a Exn.result list
   val join: 'a T -> 'a
   val focus: task list -> unit
+  val interrupt_task: string -> unit
   val cancel: 'a T -> unit
-  val interrupt_task: string -> unit
   val shutdown: unit -> unit
 end;
 
@@ -77,17 +77,9 @@
 val queue = ref TaskQueue.empty;
 val workers = ref ([]: (Thread.thread * bool) list);
 val scheduler = ref (NONE: Thread.thread option);
-
 val excessive = ref 0;
-
-
-(* requests *)
-
-datatype request = Shutdown | Cancel of group;
-val requests = Mailbox.create () : request Mailbox.T;
-
-fun shutdown_request () = Mailbox.send requests Shutdown;
-fun cancel_request group = Mailbox.send requests (Cancel group);
+val canceled = ref ([]: TaskQueue.group list);
+val do_shutdown = ref false;
 
 
 (* synchronization *)
@@ -108,11 +100,10 @@
   in Exn.release result end) ();
 
 fun wait name = (*requires SYNCHRONIZED*)
-  let
-    val _ = Multithreading.tracing 4 (fn () => name ^ ": waiting");
-    val _ = ConditionVar.wait (cond, lock);
-    val _ = Multithreading.tracing 4 (fn () => name ^ ": notified");
-  in () end;
+  ConditionVar.wait (cond, lock);
+
+fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
+  ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
 
 fun notify_all () = (*requires SYNCHRONIZED*)
   ConditionVar.broadcast cond;
@@ -133,7 +124,7 @@
      (change queue (TaskQueue.finish task);
       if ok then ()
       else if TaskQueue.cancel (! queue) group then ()
-      else cancel_request group;
+      else change canceled (cons group);
       notify_all ()));
   in () end;
 
@@ -175,8 +166,9 @@
 
 (* scheduler *)
 
-fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () =>
+fun scheduler_next () = (*requires SYNCHRONIZED*)
   let
+    (*worker threads*)
     val _ =
       (case List.partition (Thread.isActive o #1) (! workers) of
         (_, []) => ()
@@ -184,37 +176,37 @@
           (workers := active; Multithreading.tracing 0 (fn () =>
             "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
 
-    val m = if shutdown then 0 else Multithreading.max_threads_value ();
+    val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
     val l = length (! workers);
     val _ = excessive := l - m;
     val _ =
       if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ serial_string ())) ()
       else ();
-    val terminate = shutdown andalso null (! workers);
-    val _ = if terminate then scheduler := NONE else ();
-    val _ = notify_all ();
-  in terminate end);
+
+    (*canceled groups*)
+    val _ =  change canceled (filter_out (TaskQueue.cancel (! queue)));
+
+    (*shutdown*)
+    val continue = not (! do_shutdown andalso null (! workers));
+    val _ = if continue then () else scheduler := NONE;
 
-fun scheduler_loop (shutdown, canceled) =
-  if scheduler_fork shutdown then Multithreading.tracing 4 (fn () => "scheduler: exit")
-  else
-    let
-      val canceled' = SYNCHRONIZED "scheduler"
-        (fn () => filter_out (TaskQueue.cancel (! queue)) canceled);
-    in
-      (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
-        SOME Shutdown => scheduler_loop (true, canceled')
-      | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled')
-      | NONE => scheduler_loop (shutdown, canceled'))
-    end;
+    val _ = notify_all ();
+    val _ = wait_timeout "scheduler" (Time.fromSeconds 1);
+  in continue end;
+
+fun scheduler_loop () =
+ (while SYNCHRONIZED "scheduler" scheduler_next do ();
+  Multithreading.tracing 4 (fn () => "scheduler: exit"));
 
 fun scheduler_active () = (*requires SYNCHRONIZED*)
   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
 
 fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () =>
-  if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then ()
-  else scheduler :=
-    SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts)));
+  if not (scheduler_active ()) then
+   (do_shutdown := false;
+    scheduler := SOME (Thread.fork (scheduler_loop, Multithreading.no_interrupts)))
+  else if ! do_shutdown then error "Scheduler shutdown in progress"
+  else ());
 
 
 (* future values: fork independent computation *)
@@ -242,9 +234,9 @@
 
 fun join_results xs =
   let
+    val _ = scheduler_check ();
     val _ = Multithreading.self_critical () andalso
       error "Cannot join future values within critical section";
-    val _ = scheduler_check ();
 
     fun unfinished () =
       xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
@@ -279,20 +271,23 @@
 fun focus tasks = SYNCHRONIZED "interrupt" (fn () =>
   change queue (TaskQueue.focus tasks));
 
-(*cancel: present and future group members will be interrupted eventually*)
-fun cancel x = (scheduler_check (); cancel_request (group_of x));
-
 (*interrupt: permissive signal, may get ignored*)
 fun interrupt_task id = SYNCHRONIZED "interrupt"
   (fn () => TaskQueue.interrupt_external (! queue) id);
 
+(*cancel: present and future group members will be interrupted eventually*)
+fun cancel x =
+ (scheduler_check (); SYNCHRONIZED "cancel" (fn () => change canceled (cons (group_of x))));
+
+
 (*global join and shutdown*)
 fun shutdown () =
  (scheduler_check ();
   SYNCHRONIZED "shutdown" (fn () =>
-   (while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
-    shutdown_request ();
+   (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
+    while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
+    do_shutdown := true;
     while not (null (! workers)) do wait "shutdown: workers";
-    while scheduler_active () do wait "shutdown: scheduler")));
+    while scheduler_active () do wait "shutdown: scheduler still active")));
 
 end;