cancel: check_scheduler;
authorwenzelm
Tue, 09 Sep 2008 23:30:05 +0200
changeset 28186 6a8417f36837
parent 28185 0f20cbce4935
child 28187 4062882c7df3
cancel: check_scheduler; adapted to simplified TaskQueue.cancel; improved join/join_all: actively work towards results, i.e. do not yield unnecessarily; misc tuning;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Tue Sep 09 23:30:00 2008 +0200
+++ b/src/Pure/Concurrent/future.ML	Tue Sep 09 23:30:05 2008 +0200
@@ -13,9 +13,10 @@
   val task_of: 'a T -> task
   val group_of: 'a T -> group
   val shutdown_request: unit -> unit
-  val cancel: 'a T -> unit
   val future: bool -> task list -> (unit -> 'a) -> 'a T
   val fork: (unit -> 'a) -> 'a T
+  val cancel: 'a T -> unit
+  val join_all: 'a T list -> 'a list
   val join: 'a T -> 'a
 end;
 
@@ -69,7 +70,6 @@
 
 fun shutdown_request () = Mailbox.send requests Shutdown;
 fun cancel_request group = Mailbox.send requests (Cancel group);
-fun cancel x = cancel_request (group_of x);
 
 
 (* synchronization *)
@@ -101,11 +101,6 @@
 
 (* execute *)
 
-fun cancel_group group = (*requires SYNCHRONIZED*)
-  (case change_result queue (TaskQueue.cancel group) of
-    [] => true
-  | running => (List.app (fn t => Thread.interrupt t handle Thread _ => ()) running; false));
-
 fun execute name (task, group, run) =
   let
     val _ = set_thread_data (SOME (task, group));
@@ -115,7 +110,9 @@
     val _ = set_thread_data NONE;
     val _ = SYNCHRONIZED (fn () =>
      (change queue (TaskQueue.finish task);
-      if ok then () else if cancel_group group then () else cancel_request group;
+      if ok then ()
+      else if change_result queue (TaskQueue.cancel group) then ()
+      else cancel_request group;
       notify_all ()));
   in () end;
 
@@ -123,7 +120,10 @@
 (* worker threads *)
 
 fun change_active b = (*requires SYNCHRONIZED*)
- (change active (fn n => if b then n + 1 else n - 1); trace_active ());
+  (change active (fn n => if b then n + 1 else n - 1); trace_active ());
+
+fun worker_wait name = (*requires SYNCHRONIZED*)
+  (change_active false; wait name; change_active true);
 
 fun worker_next name = (*requires SYNCHRONIZED*)
   if ! excessive > 0 then
@@ -132,8 +132,8 @@
      change workers (remove Thread.equal (Thread.self ()));
      NONE)
   else
-    (case change_result queue (TaskQueue.dequeue (Thread.self ())) of
-      NONE => (change_active false; wait name; change_active true; worker_next name)
+    (case change_result queue TaskQueue.dequeue of
+      NONE => (worker_wait name; worker_next name)
     | some => some);
 
 fun worker_loop name =
@@ -158,7 +158,8 @@
 
 fun scheduler_loop canceled =
   let
-    val canceled' = SYNCHRONIZED (fn () => filter_out cancel_group canceled);
+    val canceled' = SYNCHRONIZED (fn () =>
+      filter_out (change_result queue o TaskQueue.cancel) canceled);
     val _ = scheduler_fork ();
   in
     (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of
@@ -195,14 +196,44 @@
 
 fun fork e = future false [] e;
 
-fun join (Future {result, ...}) =
+fun cancel x = (check_scheduler (); cancel_request (group_of x));
+
+
+(* join *)
+
+fun join_all xs =
   let
     val _ = check_scheduler ();
 
-    fun passive_loop () =
-      (case ! result of
-        NONE => (wait "join"; passive_loop ())
-      | SOME res => res);
-  in Exn.release (SYNCHRONIZED passive_loop) end;
+    fun unfinished () =
+      xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
+
+    (*alien thread -- refrain from contending for resources*)
+    fun passive_join () = (*requires SYNCHRONIZED*)
+      (case unfinished () of [] => ()
+      | _ => (wait "join"; passive_join ()));
+
+    (*proper worker thread -- actively work towards results*)
+    fun active_join () = (*requires SYNCHRONIZED*)
+      (case unfinished () of [] => ()
+      | tasks =>
+          (case change_result queue (TaskQueue.dequeue_towards tasks) of
+            NONE => (worker_wait "join"; active_join ())
+          | SOME work => (execute "join" work; active_join ())));
+
+    val _ =
+      (case thread_data () of
+        NONE => SYNCHRONIZED passive_join
+      | SOME (task, _) => SYNCHRONIZED (fn () =>
+         (change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
+
+    val res = xs |> map (fn Future {result = ref (SOME res), ...} => res);
+  in
+    (case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of
+      NONE => map Exn.release res
+    | SOME e => raise e)
+  end;
+
+fun join x = singleton join_all x;
 
 end;