src/Pure/Concurrent/future.ML
changeset 32246 d4f7934e9555
parent 32230 9f6461b1c9cc
child 32247 3e7d1673f96e
--- a/src/Pure/Concurrent/future.ML	Tue Jul 28 08:49:03 2009 +0200
+++ b/src/Pure/Concurrent/future.ML	Tue Jul 28 14:04:33 2009 +0200
@@ -1,7 +1,8 @@
 (*  Title:      Pure/Concurrent/future.ML
     Author:     Makarius
 
-Future values.
+Future values, see also
+http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
 
 Notes:
 
@@ -144,18 +145,6 @@
 end;
 
 
-(* worker activity *)
-
-fun count_active ws =
-  fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
-
-fun change_active active = (*requires SYNCHRONIZED*)
-  change workers (AList.update Thread.equal (Thread.self (), active));
-
-fun overloaded () =
-  count_active (! workers) > Multithreading.max_threads_value ();
-
-
 (* execute future jobs *)
 
 fun future_job group (e: unit -> 'a) =
@@ -186,7 +175,7 @@
     val valid = not (Task_Queue.is_canceled group);
     val ok = setmp_thread_data (name, task, group) (fn () =>
       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
-    val _ = SYNCHRONIZED "execute" (fn () =>
+    val _ = SYNCHRONIZED "finish" (fn () =>
       let
         val maximal = change_result queue (Task_Queue.finish task);
         val _ =
@@ -199,6 +188,15 @@
   in () end;
 
 
+(* worker activity *)
+
+fun count_active () = (*requires SYNCHRONIZED*)
+  fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
+
+fun change_active active = (*requires SYNCHRONIZED*)
+  change workers (AList.update Thread.equal (Thread.self (), active));
+
+
 (* worker threads *)
 
 fun worker_wait cond = (*requires SYNCHRONIZED*)
@@ -212,7 +210,8 @@
      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
      broadcast scheduler_event;
      NONE)
-  else if overloaded () then (worker_wait scheduler_event; worker_next ())
+  else if count_active () > Multithreading.max_threads_value () then
+    (worker_wait scheduler_event; worker_next ())
   else
     (case change_result queue Task_Queue.dequeue of
       NONE => (worker_wait work_available; worker_next ())
@@ -243,7 +242,7 @@
             let
               val {ready, pending, running} = Task_Queue.status (! queue);
               val total = length (! workers);
-              val active = count_active (! workers);
+              val active = count_active ();
             in
               "SCHEDULE: " ^
                 string_of_int ready ^ " ready, " ^
@@ -319,7 +318,7 @@
         SOME group => group
       | NONE => Task_Queue.new_group (worker_group ()));
     val (result, job) = future_job group e;
-    val task = SYNCHRONIZED "future" (fn () =>
+    val task = SYNCHRONIZED "enqueue" (fn () =>
       let
         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
         val _ = if minimal then signal work_available else ();
@@ -366,14 +365,13 @@
 
 fun join_results xs =
   if forall is_finished xs then map get_result xs
+  else if Multithreading.self_critical () then
+    error "Cannot join future values within critical section"
   else uninterruptible (fn _ => fn () =>
-    let
-      val _ = Multithreading.self_critical () andalso
-        error "Cannot join future values within critical section";
-      val _ =
-        if is_worker () then join_work (map task_of xs)
-        else List.app join_wait xs;
-    in map get_result xs end) ();
+     (if is_worker ()
+      then join_work (map task_of xs)
+      else List.app join_wait xs;
+      map get_result xs)) ();
 
 end;
 
@@ -389,7 +387,7 @@
     val group = Task_Queue.new_group (SOME (group_of x));
     val (result, job) = future_job group (fn () => f (join x));
 
-    val extended = SYNCHRONIZED "map_future" (fn () =>
+    val extended = SYNCHRONIZED "extend" (fn () =>
       (case Task_Queue.extend task job (! queue) of
         SOME queue' => (queue := queue'; true)
       | NONE => false));