added release_results (formerly in par_list.ML);
authorwenzelm
Sat, 27 Sep 2008 18:18:06 +0200
changeset 28382 a97fa342540d
parent 28381 0b8237df37bd
child 28383 3c5b4f636159
added release_results (formerly in par_list.ML); more informative trace_active; join_results: avoid deadlock via nested SYNCHRONOIZED, more robust active join, less rechecking of tasks;
src/Pure/Concurrent/future.ML
--- a/src/Pure/Concurrent/future.ML	Sat Sep 27 18:18:05 2008 +0200
+++ b/src/Pure/Concurrent/future.ML	Sat Sep 27 18:18:06 2008 +0200
@@ -38,6 +38,7 @@
   val fork_irrelevant: (unit -> 'a) -> 'a T
   val join_results: 'a T list -> 'a Exn.result list
   val join: 'a T -> 'a
+  val release_results: 'a Exn.result list -> 'a list
   val focus: task list -> unit
   val interrupt_task: string -> unit
   val cancel: 'a T -> unit
@@ -115,10 +116,24 @@
 end;
 
 
+(* worker activity *)
+
+fun trace_active () =
+  let
+    val ws = ! workers;
+    val m = string_of_int (length ws);
+    val n = string_of_int (length (filter #2 ws));
+  in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
+
+fun change_active active = (*requires SYNCHRONIZED*)
+  change workers (AList.update Thread.equal (Thread.self (), active));
+
+
 (* execute *)
 
 fun execute name (task, group, run) =
   let
+    val _ = trace_active ();
     val _ = set_thread_data (SOME (task, group));
     val _ = Multithreading.tracing 3 (fn () => name ^ ": running");
     val ok = run ();
@@ -135,15 +150,6 @@
 
 (* worker threads *)
 
-fun change_active active = (*requires SYNCHRONIZED*)
-  let
-    val _ = change workers (AList.update Thread.equal (Thread.self (), active));
-    val ws = ! workers;
-    val m = string_of_int (length ws);
-    val n = string_of_int (length (filter #2 ws));
-  in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
-
-
 fun worker_wait name = (*requires SYNCHRONIZED*)
   (change_active false; wait name; change_active true);
 
@@ -178,6 +184,7 @@
       | (active, inactive) =>
           (workers := active; Multithreading.tracing 0 (fn () =>
             "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
+    val _ = trace_active ();
 
     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
     val l = length (! workers);
@@ -242,33 +249,48 @@
         val _ = Multithreading.self_critical () andalso
           error "Cannot join future values within critical section";
 
-        fun unfinished () =
-          xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
-
-        (*alien thread -- refrain from contending for resources*)
-        fun passive_join () = (*requires SYNCHRONIZED*)
-          (case unfinished () of [] => ()
-          | _ => (wait "passive_join"; passive_join ()));
-
-        (*proper worker thread -- actively work towards results*)
-        fun active_join () = (*requires SYNCHRONIZED*)
-          (case unfinished () of [] => ()
-          | tasks =>
-              (case change_result queue (TaskQueue.dequeue_towards tasks) of
-                NONE => (worker_wait "active_join"; active_join ())
-              | SOME work => (execute "active_join" work; active_join ())));
-
+        fun join_loop [] = ()
+          | join_loop tasks =
+              (case SYNCHRONIZED "join_loop" (fn () =>
+                  change_result queue (TaskQueue.dequeue_towards tasks)) of
+                NONE => ()
+              | SOME (work, tasks') => (execute "join_loop" work; join_loop tasks'));
         val _ =
           (case thread_data () of
-            NONE => SYNCHRONIZED "passive_join" passive_join
-          | SOME (task, _) => SYNCHRONIZED "active_join" (fn () =>
-             (change queue (TaskQueue.depend (unfinished ()) task); active_join ())));
+            NONE =>
+              (*alien thread -- refrain from contending for resources*)
+              while exists (not o is_finished) xs
+              do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
+          | SOME (task, _) =>
+              (*proper task -- actively work towards results*)
+              let
+                val unfinished = xs |> map_filter
+                  (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
+                val _ = SYNCHRONIZED "join" (fn () =>
+                  (change queue (TaskQueue.depend unfinished task); notify_all ()));
+                val _ = join_loop unfinished;
+                val _ =
+                  while exists (not o is_finished) xs
+                  do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task");
+              in () end);
 
       in xs |> map (fn Future {result = ref (SOME res), ...} => res) end;
 
 fun join x = Exn.release (singleton join_results x);
 
 
+(* release results *)
+
+fun proper_exn (Exn.Result _) = NONE
+  | proper_exn (Exn.Exn Interrupt) = NONE
+  | proper_exn (Exn.Exn exn) = SOME exn;
+
+fun release_results results =
+  (case get_first proper_exn results of
+    SOME exn => raise exn
+  | NONE => List.map Exn.release results);
+
+
 (* misc operations *)
 
 (*focus: collection of high-priority task*)