--- a/src/Pure/Concurrent/future.ML Tue Jul 28 08:49:03 2009 +0200
+++ b/src/Pure/Concurrent/future.ML Tue Jul 28 14:04:33 2009 +0200
@@ -1,7 +1,8 @@
(* Title: Pure/Concurrent/future.ML
Author: Makarius
-Future values.
+Future values, see also
+http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
Notes:
@@ -144,18 +145,6 @@
end;
-(* worker activity *)
-
-fun count_active ws =
- fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
-
-fun change_active active = (*requires SYNCHRONIZED*)
- change workers (AList.update Thread.equal (Thread.self (), active));
-
-fun overloaded () =
- count_active (! workers) > Multithreading.max_threads_value ();
-
-
(* execute future jobs *)
fun future_job group (e: unit -> 'a) =
@@ -186,7 +175,7 @@
val valid = not (Task_Queue.is_canceled group);
val ok = setmp_thread_data (name, task, group) (fn () =>
fold (fn job => fn ok => job valid andalso ok) jobs true) ();
- val _ = SYNCHRONIZED "execute" (fn () =>
+ val _ = SYNCHRONIZED "finish" (fn () =>
let
val maximal = change_result queue (Task_Queue.finish task);
val _ =
@@ -199,6 +188,15 @@
in () end;
+(* worker activity *)
+
+fun count_active () = (*requires SYNCHRONIZED*)
+ fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
+
+fun change_active active = (*requires SYNCHRONIZED*)
+ change workers (AList.update Thread.equal (Thread.self (), active));
+
+
(* worker threads *)
fun worker_wait cond = (*requires SYNCHRONIZED*)
@@ -212,7 +210,8 @@
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
broadcast scheduler_event;
NONE)
- else if overloaded () then (worker_wait scheduler_event; worker_next ())
+ else if count_active () > Multithreading.max_threads_value () then
+ (worker_wait scheduler_event; worker_next ())
else
(case change_result queue Task_Queue.dequeue of
NONE => (worker_wait work_available; worker_next ())
@@ -243,7 +242,7 @@
let
val {ready, pending, running} = Task_Queue.status (! queue);
val total = length (! workers);
- val active = count_active (! workers);
+ val active = count_active ();
in
"SCHEDULE: " ^
string_of_int ready ^ " ready, " ^
@@ -319,7 +318,7 @@
SOME group => group
| NONE => Task_Queue.new_group (worker_group ()));
val (result, job) = future_job group e;
- val task = SYNCHRONIZED "future" (fn () =>
+ val task = SYNCHRONIZED "enqueue" (fn () =>
let
val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
val _ = if minimal then signal work_available else ();
@@ -366,14 +365,13 @@
fun join_results xs =
if forall is_finished xs then map get_result xs
+ else if Multithreading.self_critical () then
+ error "Cannot join future values within critical section"
else uninterruptible (fn _ => fn () =>
- let
- val _ = Multithreading.self_critical () andalso
- error "Cannot join future values within critical section";
- val _ =
- if is_worker () then join_work (map task_of xs)
- else List.app join_wait xs;
- in map get_result xs end) ();
+ (if is_worker ()
+ then join_work (map task_of xs)
+ else List.app join_wait xs;
+ map get_result xs)) ();
end;
@@ -389,7 +387,7 @@
val group = Task_Queue.new_group (SOME (group_of x));
val (result, job) = future_job group (fn () => f (join x));
- val extended = SYNCHRONIZED "map_future" (fn () =>
+ val extended = SYNCHRONIZED "extend" (fn () =>
(case Task_Queue.extend task job (! queue) of
SOME queue' => (queue := queue'; true)
| NONE => false));