# HG changeset patch # User wenzelm # Date 1220825441 -7200 # Node ID 55772e4e95e092ea032fe011f8a7b8998975dd2d # Parent 7718587e510e4be57887f5bc57fdadd1ef479c3c tuned check_cache; removed broken self_synchronized, which cannot be used in conjunction with condition variables; more precise use of SYNCHRONIZED vs. wait; tuned worker_loop; diff -r 7718587e510e -r 55772e4e95e0 src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Sun Sep 07 22:20:15 2008 +0200 +++ b/src/Pure/Concurrent/future.ML Mon Sep 08 00:10:41 2008 +0200 @@ -22,27 +22,16 @@ (* synchronized execution *) local - val thread = ref (NONE: Thread.thread option); val lock = Mutex.mutex (); val cond = ConditionVar.conditionVar (); in -fun self_synchronized () = - (case ! thread of - NONE => false - | SOME t => Thread.equal (t, Thread.self ())); - -fun SYNCHRONIZED e = - if self_synchronized () then e () - else - uninterruptible (fn restore_attributes => fn () => - let - val _ = Mutex.lock lock; - val _ = thread := SOME (Thread.self ()); - val result = Exn.capture (restore_attributes e) (); - val _ = thread := NONE; - val _ = Mutex.unlock lock; - in Exn.release result end) (); +fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () => + let + val _ = Mutex.lock lock; + val result = Exn.capture (restore_attributes e) (); + val _ = Mutex.unlock lock; + in Exn.release result end) (); fun wait () = ConditionVar.wait (cond, lock); fun wait_timeout timeout = ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)); @@ -78,12 +67,8 @@ fun check_cache (queue as Queue (tasks, cache)) = if not (Queue.is_empty cache) then queue else - let - val cache' = fold (fn id => - (case IntGraph.get_node tasks id of - Task task => Queue.enqueue (id, task) - | Running _ => I)) (IntGraph.minimals tasks) Queue.empty; - in Queue (tasks, cache') end; + let fun ready (id, (Task task, ([], _))) = Queue.enqueue (id, task) | ready _ = I + in Queue (tasks, IntGraph.fold ready tasks Queue.empty) end; val next_task = check_cache #> (fn queue as Queue (tasks, cache) => if Queue.is_empty cache then (NONE, queue) @@ -109,17 +94,6 @@ (* global state *) -local val active = ref 0 in - -fun change_active b = SYNCHRONIZED (fn () => - let - val _ = change active (fn n => if b then n + 1 else n - 1); - val n = ! active; - val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active tasks"); - in () end); - -end; - val tasks = ref empty_queue; val scheduler = ref (NONE: Thread.thread option); val workers = ref ([]: Thread.thread list); @@ -133,33 +107,46 @@ (* worker thread *) +local val active = ref 0 in + +fun change_active b = (*requires SYNCHRONIZED*) + let + val _ = change active (fn n => if b then n + 1 else n - 1); + val n = ! active; + val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active"); + in () end; + +end; + fun excessive_threads () = false; (* FIXME *) -fun worker_stop () = - (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ()))))); +fun worker_stop () = SYNCHRONIZED (fn () => + (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ())))))); -fun worker_wait () = - (change_active false; wait (); change_active true); +fun worker_wait () = SYNCHRONIZED (fn () => + (change_active false; wait (); change_active true)); fun worker_loop () = - (case SYNCHRONIZED (fn () => change_result tasks next_task) of - SOME (id, task) => - let - val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ()))); - val _ = task (); - val _ = SYNCHRONIZED (fn () => change tasks (finished_task id)); - val _ = notify_all (); - in if excessive_threads () then worker_stop () else worker_loop () end - | NONE => (worker_wait (); worker_loop ())); + if excessive_threads () then worker_stop () + else + (case SYNCHRONIZED (fn () => change_result tasks next_task) of + NONE => (worker_wait (); worker_loop ()) + | SOME (id, task) => + let + val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ()))); + val _ = task (); + val _ = SYNCHRONIZED (fn () => change tasks (finished_task id)); + val _ = notify_all (); + in worker_loop () end); -fun worker_start () = +fun worker_start () = SYNCHRONIZED (fn () => (change_active true; - change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts)))); + change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts))))); (* scheduler *) -fun scheduler_loop () = +fun scheduler_loop () = (*requires SYNCHRONIZED*) let val m = Multithreading.max_threads_value (); val k = m - length (! workers); @@ -174,7 +161,8 @@ | SOME t => Thread.isActive t); in if scheduler_active then () - else scheduler := SOME (Thread.fork (SYNCHRONIZED scheduler_loop, Multithreading.no_interrupts)) + else scheduler := + SOME (Thread.fork (SYNCHRONIZED o scheduler_loop, Multithreading.no_interrupts)) end); @@ -187,9 +175,11 @@ val r = ref (NONE: 'a Exn.result option); val task = Multithreading.with_attributes (Thread.getAttributes ()) (fn _ => fn () => r := SOME (Exn.capture e ())); + val id = serial (); val _ = SYNCHRONIZED (fn () => change tasks (new_task deps id task)); val _ = notify_all (); + in Future (id, r) end; fun future e = dependent_future [] e;