tuned check_cache;
removed broken self_synchronized, which cannot be used in conjunction with condition variables;
more precise use of SYNCHRONIZED vs. wait;
tuned worker_loop;
--- a/src/Pure/Concurrent/future.ML Sun Sep 07 22:20:15 2008 +0200
+++ b/src/Pure/Concurrent/future.ML Mon Sep 08 00:10:41 2008 +0200
@@ -22,27 +22,16 @@
(* synchronized execution *)
local
- val thread = ref (NONE: Thread.thread option);
val lock = Mutex.mutex ();
val cond = ConditionVar.conditionVar ();
in
-fun self_synchronized () =
- (case ! thread of
- NONE => false
- | SOME t => Thread.equal (t, Thread.self ()));
-
-fun SYNCHRONIZED e =
- if self_synchronized () then e ()
- else
- uninterruptible (fn restore_attributes => fn () =>
- let
- val _ = Mutex.lock lock;
- val _ = thread := SOME (Thread.self ());
- val result = Exn.capture (restore_attributes e) ();
- val _ = thread := NONE;
- val _ = Mutex.unlock lock;
- in Exn.release result end) ();
+fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () =>
+ let
+ val _ = Mutex.lock lock;
+ val result = Exn.capture (restore_attributes e) ();
+ val _ = Mutex.unlock lock;
+ in Exn.release result end) ();
fun wait () = ConditionVar.wait (cond, lock);
fun wait_timeout timeout = ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
@@ -78,12 +67,8 @@
fun check_cache (queue as Queue (tasks, cache)) =
if not (Queue.is_empty cache) then queue
else
- let
- val cache' = fold (fn id =>
- (case IntGraph.get_node tasks id of
- Task task => Queue.enqueue (id, task)
- | Running _ => I)) (IntGraph.minimals tasks) Queue.empty;
- in Queue (tasks, cache') end;
+ let fun ready (id, (Task task, ([], _))) = Queue.enqueue (id, task) | ready _ = I
+ in Queue (tasks, IntGraph.fold ready tasks Queue.empty) end;
val next_task = check_cache #> (fn queue as Queue (tasks, cache) =>
if Queue.is_empty cache then (NONE, queue)
@@ -109,17 +94,6 @@
(* global state *)
-local val active = ref 0 in
-
-fun change_active b = SYNCHRONIZED (fn () =>
- let
- val _ = change active (fn n => if b then n + 1 else n - 1);
- val n = ! active;
- val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active tasks");
- in () end);
-
-end;
-
val tasks = ref empty_queue;
val scheduler = ref (NONE: Thread.thread option);
val workers = ref ([]: Thread.thread list);
@@ -133,33 +107,46 @@
(* worker thread *)
+local val active = ref 0 in
+
+fun change_active b = (*requires SYNCHRONIZED*)
+ let
+ val _ = change active (fn n => if b then n + 1 else n - 1);
+ val n = ! active;
+ val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active");
+ in () end;
+
+end;
+
fun excessive_threads () = false; (* FIXME *)
-fun worker_stop () =
- (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ())))));
+fun worker_stop () = SYNCHRONIZED (fn () =>
+ (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ()))))));
-fun worker_wait () =
- (change_active false; wait (); change_active true);
+fun worker_wait () = SYNCHRONIZED (fn () =>
+ (change_active false; wait (); change_active true));
fun worker_loop () =
- (case SYNCHRONIZED (fn () => change_result tasks next_task) of
- SOME (id, task) =>
- let
- val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ())));
- val _ = task ();
- val _ = SYNCHRONIZED (fn () => change tasks (finished_task id));
- val _ = notify_all ();
- in if excessive_threads () then worker_stop () else worker_loop () end
- | NONE => (worker_wait (); worker_loop ()));
+ if excessive_threads () then worker_stop ()
+ else
+ (case SYNCHRONIZED (fn () => change_result tasks next_task) of
+ NONE => (worker_wait (); worker_loop ())
+ | SOME (id, task) =>
+ let
+ val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ())));
+ val _ = task ();
+ val _ = SYNCHRONIZED (fn () => change tasks (finished_task id));
+ val _ = notify_all ();
+ in worker_loop () end);
-fun worker_start () =
+fun worker_start () = SYNCHRONIZED (fn () =>
(change_active true;
- change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts))));
+ change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts)))));
(* scheduler *)
-fun scheduler_loop () =
+fun scheduler_loop () = (*requires SYNCHRONIZED*)
let
val m = Multithreading.max_threads_value ();
val k = m - length (! workers);
@@ -174,7 +161,8 @@
| SOME t => Thread.isActive t);
in
if scheduler_active then ()
- else scheduler := SOME (Thread.fork (SYNCHRONIZED scheduler_loop, Multithreading.no_interrupts))
+ else scheduler :=
+ SOME (Thread.fork (SYNCHRONIZED o scheduler_loop, Multithreading.no_interrupts))
end);
@@ -187,9 +175,11 @@
val r = ref (NONE: 'a Exn.result option);
val task = Multithreading.with_attributes (Thread.getAttributes ())
(fn _ => fn () => r := SOME (Exn.capture e ()));
+
val id = serial ();
val _ = SYNCHRONIZED (fn () => change tasks (new_task deps id task));
val _ = notify_all ();
+
in Future (id, r) end;
fun future e = dependent_future [] e;