# HG changeset patch # User wenzelm # Date 1221071281 -7200 # Node ID 6d977729c8fadf19456c0d3cd09d63844c35866e # Parent 9e5f556409c637c4c22089f71d6f78e177bec619 workers: explicit activity flag; SYNCHRONIZED: optional tracing; diff -r 9e5f556409c6 -r 6d977729c8fa src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Wed Sep 10 19:44:29 2008 +0200 +++ b/src/Pure/Concurrent/future.ML Wed Sep 10 20:28:01 2008 +0200 @@ -53,14 +53,17 @@ (* global state *) val queue = ref TaskQueue.empty; -val workers = ref ([]: Thread.thread list); +val workers = ref ([]: (Thread.thread * bool) list); val scheduler = ref (NONE: Thread.thread option); val excessive = ref 0; -val active = ref 0; fun trace_active () = - Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int (! active) ^ " active"); + let + val ws = ! workers; + val m = string_of_int (length ws); + val n = string_of_int (length (filter #2 ws)); + in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; (* requests *) @@ -79,18 +82,21 @@ val cond = ConditionVar.conditionVar (); in -fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () => +fun SYNCHRONIZED name e = uninterruptible (fn restore_attributes => fn () => let + val _ = Multithreading.tracing 4 (fn () => name ^ ": locking"); val _ = Mutex.lock lock; + val _ = Multithreading.tracing 4 (fn () => name ^ ": locked"); val result = Exn.capture (restore_attributes e) (); val _ = Mutex.unlock lock; + val _ = Multithreading.tracing 4 (fn () => name ^ ": unlocked"); in Exn.release result end) (); fun wait name = (*requires SYNCHRONIZED*) let - val _ = Multithreading.tracing 4 (fn () => name ^ " : waiting"); + val _ = Multithreading.tracing 4 (fn () => name ^ ": waiting"); val _ = ConditionVar.wait (cond, lock); - val _ = Multithreading.tracing 4 (fn () => name ^ " : notified"); + val _ = Multithreading.tracing 4 (fn () => name ^ ": notified"); in () end; fun notify_all () = (*requires SYNCHRONIZED*) @@ -108,7 +114,7 @@ val ok = run (); val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); val _ = set_thread_data NONE; - val _ = SYNCHRONIZED (fn () => + val _ = SYNCHRONIZED "execute" (fn () => (change queue (TaskQueue.finish task); if ok then () else if TaskQueue.cancel (! queue) group then () @@ -119,8 +125,8 @@ (* worker threads *) -fun change_active b = (*requires SYNCHRONIZED*) - (change active (fn n => if b then n + 1 else n - 1); trace_active ()); +fun change_active active = (*requires SYNCHRONIZED*) + (change workers (AList.update Thread.equal (Thread.self (), active)); trace_active ()); fun worker_wait name = (*requires SYNCHRONIZED*) (change_active false; wait name; change_active true); @@ -128,8 +134,7 @@ fun worker_next name = (*requires SYNCHRONIZED*) if ! excessive > 0 then (dec excessive; - change_active false; - change workers (remove Thread.equal (Thread.self ())); + change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); NONE) else (case change_result queue TaskQueue.dequeue of @@ -137,44 +142,48 @@ | some => some); fun worker_loop name = - (case SYNCHRONIZED (fn () => worker_next name) of + (case SYNCHRONIZED name (fn () => worker_next name) of NONE => () | SOME work => (execute name work; worker_loop name)); fun worker_start name = (*requires SYNCHRONIZED*) - (change_active true; - change workers (cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts)))); + change workers + (cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts), true)); (* scheduler *) -fun scheduler_fork shutdown = SYNCHRONIZED (fn () => +fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () => let val _ = trace_active (); val _ = - (case List.partition Thread.isActive (! workers) of + (case List.partition (Thread.isActive o #1) (! workers) of (_, []) => () | (active, inactive) => (workers := active; Multithreading.tracing 0 (fn () => - "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " inactive worker threads"))); + "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); val m = if shutdown then 0 else Multithreading.max_threads_value (); val l = length (! workers); val _ = excessive := l - m; val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1); - in null (! workers) end); + val _ = if shutdown then notify_all () else (); + in shutdown andalso null (! workers) end); fun scheduler_loop (shutdown, canceled) = if scheduler_fork shutdown then () else - let val canceled' = SYNCHRONIZED (fn () => filter_out (TaskQueue.cancel (! queue)) canceled) in + let + val canceled' = SYNCHRONIZED "scheduler" + (fn () => filter_out (TaskQueue.cancel (! queue)) canceled); + in (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of SOME Shutdown => scheduler_loop (true, canceled') | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled') | NONE => scheduler_loop (shutdown, canceled')) end; -fun scheduler_check () = SYNCHRONIZED (fn () => +fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () => if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () else scheduler := SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts))); @@ -194,7 +203,7 @@ let val res = if ok then Exn.capture e () else Exn.Exn Interrupt in result := SOME res; is_some (Exn.get_result res) end); - val task = SYNCHRONIZED (fn () => + val task = SYNCHRONIZED "future" (fn () => change_result queue (TaskQueue.enqueue group deps run) before notify_all ()); in Future {task = task, group = group, result = result} end; @@ -225,8 +234,8 @@ val _ = (case thread_data () of - NONE => SYNCHRONIZED passive_join - | SOME (task, _) => SYNCHRONIZED (fn () => + NONE => SYNCHRONIZED "join" passive_join + | SOME (task, _) => SYNCHRONIZED "join" (fn () => (change queue (TaskQueue.depend (unfinished ()) task); active_join ()))); val res = xs |> map (fn Future {result = ref (SOME res), ...} => res); @@ -245,6 +254,6 @@ fun cancel x = (scheduler_check (); cancel_request (group_of x)); (*interrupt: adhoc signal, permissive, may get ignored*) -fun interrupt_task id = SYNCHRONIZED (fn () => TaskQueue.interrupt (! queue) id); +fun interrupt_task id = SYNCHRONIZED "interrupt" (fn () => TaskQueue.interrupt (! queue) id); end;