future: allow explicit group;
authorwenzelm
Wed, 10 Sep 2008 19:44:29 +0200
changeset 28191 9e5f556409c6
parent 28190 0a2434cf38c9
child 28192 6d977729c8fa
future: allow explicit group; cancel: invalidate group identifier for all future members; tuned comments; tuned;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Wed Sep 10 19:44:28 2008 +0200
+++ b/src/Pure/Concurrent/future.ML	Wed Sep 10 19:44:29 2008 +0200
@@ -13,7 +13,7 @@
   val task_of: 'a T -> task
   val group_of: 'a T -> group
   val shutdown_request: unit -> unit
-  val future: bool -> task list -> (unit -> 'a) -> 'a T
+  val future: group option -> task list -> (unit -> 'a) -> 'a T
   val fork: (unit -> 'a) -> 'a T
   val cancel: 'a T -> unit
   val join_all: 'a T list -> 'a list
@@ -111,7 +111,7 @@
     val _ = SYNCHRONIZED (fn () =>
      (change queue (TaskQueue.finish task);
       if ok then ()
-      else if change_result queue (TaskQueue.cancel group) then ()
+      else if TaskQueue.cancel (! queue) group then ()
       else cancel_request group;
       notify_all ()));
   in () end;
@@ -148,41 +148,45 @@
 
 (* scheduler *)
 
-fun scheduler_fork () = SYNCHRONIZED (fn () =>
+fun scheduler_fork shutdown = SYNCHRONIZED (fn () =>
   let
     val _ = trace_active ();
-    val m = Multithreading.max_threads_value ();
+    val _ =
+      (case List.partition Thread.isActive (! workers) of
+        (_, []) => ()
+      | (active, inactive) =>
+          (workers := active; Multithreading.tracing 0 (fn () =>
+            "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " inactive worker threads")));
+
+    val m = if shutdown then 0 else Multithreading.max_threads_value ();
     val l = length (! workers);
     val _ = excessive := l - m;
-  in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end);
+    val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1);
+  in null (! workers) end);
 
-fun scheduler_loop canceled =
-  let
-    val canceled' = SYNCHRONIZED (fn () =>
-      filter_out (change_result queue o TaskQueue.cancel) canceled);
-    val _ = scheduler_fork ();
-  in
-    (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
-      SOME Shutdown => ()   (* FIXME proper worker shutdown *)
-    | SOME (Cancel group) => scheduler_loop (group :: canceled')
-    | NONE => scheduler_loop canceled')
-  end;
+fun scheduler_loop (shutdown, canceled) =
+  if scheduler_fork shutdown then ()
+  else
+    let val canceled' = SYNCHRONIZED (fn () => filter_out (TaskQueue.cancel (! queue)) canceled) in
+      (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
+        SOME Shutdown => scheduler_loop (true, canceled')
+      | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled')
+      | NONE => scheduler_loop (shutdown, canceled'))
+    end;
 
-fun check_scheduler () = SYNCHRONIZED (fn () =>
+fun scheduler_check () = SYNCHRONIZED (fn () =>
   if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then ()
-  else scheduler := SOME (Thread.fork (fn () => scheduler_loop [], Multithreading.no_interrupts)));
+  else scheduler :=
+    SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts)));
 
 
-(* future values *)
+(* future values: fork independent computation *)
 
-fun future new_group deps (e: unit -> 'a) =
+fun future opt_group deps (e: unit -> 'a) =
   let
-    val _ = check_scheduler ();
+    val _ = scheduler_check ();
 
-    val group =
-      (case (new_group, thread_data ()) of
-        (false, SOME (_, group)) => group
-      | _ => TaskQueue.new_group ());
+    val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
 
     val result = ref (NONE: 'a Exn.result option);
     val run = Multithreading.with_attributes (Thread.getAttributes ())
@@ -194,16 +198,14 @@
       change_result queue (TaskQueue.enqueue group deps run) before notify_all ());
   in Future {task = task, group = group, result = result} end;
 
-fun fork e = future false [] e;
-
-fun cancel x = (check_scheduler (); cancel_request (group_of x));
+fun fork e = future (Option.map #2 (thread_data ())) [] e;
 
 
-(* join *)
+(* join: retrieve results *)
 
 fun join_all xs =
   let
-    val _ = check_scheduler ();
+    val _ = scheduler_check ();
 
     fun unfinished () =
       xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
@@ -236,4 +238,13 @@
 
 fun join x = singleton join_all x;
 
+
+(* termination *)
+
+(*cancel: present and future group members will be interrupted eventually*)
+fun cancel x = (scheduler_check (); cancel_request (group_of x));
+
+(*interrupt: adhoc signal, permissive, may get ignored*)
+fun interrupt_task id = SYNCHRONIZED (fn () => TaskQueue.interrupt (! queue) id);
+
 end;