# HG changeset patch # User wenzelm # Date 1229441118 -3600 # Node ID 99941fd0cb0e70f62a3774279eb59152b604b2f2 # Parent 8f2481aa363d886319fb925e35c40dd191f984d1 renamed structure TaskQueue to Task_Queue; generalized fork_background to fork_pri; reduced tracing; map: inherit task priority; removed unused focus; diff -r 8f2481aa363d -r 99941fd0cb0e src/Pure/Concurrent/future.ML --- a/src/Pure/Concurrent/future.ML Tue Dec 16 12:13:53 2008 +0100 +++ b/src/Pure/Concurrent/future.ML Tue Dec 16 16:25:18 2008 +0100 @@ -1,5 +1,4 @@ (* Title: Pure/Concurrent/future.ML - ID: $Id$ Author: Makarius Future values. @@ -28,8 +27,8 @@ signature FUTURE = sig val enabled: unit -> bool - type task = TaskQueue.task - type group = TaskQueue.group + type task = Task_Queue.task + type group = Task_Queue.group val thread_data: unit -> (string * task) option type 'a future val task_of: 'a future -> task @@ -40,12 +39,11 @@ val fork: (unit -> 'a) -> 'a future val fork_group: group -> (unit -> 'a) -> 'a future val fork_deps: 'b future list -> (unit -> 'a) -> 'a future - val fork_background: (unit -> 'a) -> 'a future + val fork_pri: int -> (unit -> 'a) -> 'a future val join_results: 'a future list -> 'a Exn.result list val join_result: 'a future -> 'a Exn.result val join: 'a future -> 'a val map: ('a -> 'b) -> 'a future -> 'b future - val focus: task list -> unit val interrupt_task: string -> unit val cancel: 'a future -> unit val shutdown: unit -> unit @@ -63,8 +61,8 @@ (* identifiers *) -type task = TaskQueue.task; -type group = TaskQueue.group; +type task = Task_Queue.task; +type group = Task_Queue.group; local val tag = Universal.tag () : (string * task) option Universal.tag in fun thread_data () = the_default NONE (Thread.getLocal tag); @@ -86,8 +84,8 @@ fun is_finished x = is_some (peek x); fun value x = Future - {task = TaskQueue.new_task (), - group = TaskQueue.new_group (), + {task = Task_Queue.new_task 0, + group = Task_Queue.new_group (), result = ref (SOME (Exn.Result x))}; @@ -96,12 +94,12 @@ (* global state *) -val queue = ref TaskQueue.empty; +val queue = ref Task_Queue.empty; val next = ref 0; val workers = ref ([]: (Thread.thread * bool) list); val scheduler = ref (NONE: Thread.thread option); val excessive = ref 0; -val canceled = ref ([]: TaskQueue.group list); +val canceled = ref ([]: Task_Queue.group list); val do_shutdown = ref false; @@ -114,15 +112,11 @@ fun SYNCHRONIZED name = SimpleThread.synchronized name lock; -fun wait name = (*requires SYNCHRONIZED*) - (Multithreading.tracing 3 (fn () => name ^ ": wait ..."); +fun wait () = (*requires SYNCHRONIZED*) ConditionVar.wait (cond, lock); - Multithreading.tracing 3 (fn () => name ^ ": ... continue")); -fun wait_timeout name timeout = (*requires SYNCHRONIZED*) - (Multithreading.tracing 3 (fn () => name ^ ": wait ..."); +fun wait_timeout timeout = (*requires SYNCHRONIZED*) ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)); - Multithreading.tracing 3 (fn () => name ^ ": ... continue")); fun notify_all () = (*requires SYNCHRONIZED*) ConditionVar.broadcast cond; @@ -150,9 +144,9 @@ val _ = trace_active (); val ok = setmp_thread_data (name, task) run (); val _ = SYNCHRONIZED "execute" (fn () => - (change queue (TaskQueue.finish task); + (change queue (Task_Queue.finish task); if ok then () - else if TaskQueue.cancel (! queue) group then () + else if Task_Queue.cancel (! queue) group then () else change canceled (cons group); notify_all ())); in () end; @@ -160,23 +154,23 @@ (* worker threads *) -fun worker_wait name = (*requires SYNCHRONIZED*) - (change_active false; wait name; change_active true); +fun worker_wait () = (*requires SYNCHRONIZED*) + (change_active false; wait (); change_active true); -fun worker_next name = (*requires SYNCHRONIZED*) +fun worker_next () = (*requires SYNCHRONIZED*) if ! excessive > 0 then (dec excessive; change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); notify_all (); NONE) else - (case change_result queue TaskQueue.dequeue of - NONE => (worker_wait name; worker_next name) + (case change_result queue Task_Queue.dequeue of + NONE => (worker_wait (); worker_next ()) | some => some); fun worker_loop name = - (case SYNCHRONIZED name (fn () => worker_next name) of - NONE => Multithreading.tracing 3 (fn () => name ^ ": exit") + (case SYNCHRONIZED name worker_next of + NONE => () | SOME work => (execute name work; worker_loop name)); fun worker_start name = (*requires SYNCHRONIZED*) @@ -204,27 +198,25 @@ else (); (*canceled groups*) - val _ = change canceled (filter_out (TaskQueue.cancel (! queue))); + val _ = change canceled (filter_out (Task_Queue.cancel (! queue))); (*shutdown*) val continue = not (! do_shutdown andalso null (! workers)); val _ = if continue then () else scheduler := NONE; val _ = notify_all (); - val _ = wait_timeout "scheduler" (Time.fromSeconds 3); + val _ = wait_timeout (Time.fromSeconds 3); in continue end; fun scheduler_loop () = - (while SYNCHRONIZED "scheduler" scheduler_next do (); - Multithreading.tracing 3 (fn () => "scheduler: exit")); + while SYNCHRONIZED "scheduler" scheduler_next do (); fun scheduler_active () = (*requires SYNCHRONIZED*) (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); fun scheduler_check name = SYNCHRONIZED name (fn () => if not (scheduler_active ()) then - (Multithreading.tracing 3 (fn () => "scheduler: fork"); - do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop)) + (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop)) else if ! do_shutdown then error "Scheduler shutdown in progress" else ()); @@ -235,7 +227,7 @@ let val _ = scheduler_check "future check"; - val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ()); + val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ()); val result = ref (NONE: 'a Exn.result option); val run = Multithreading.with_attributes (Thread.getAttributes ()) @@ -246,18 +238,18 @@ val res_ok = (case res of Exn.Result _ => true - | Exn.Exn Exn.Interrupt => (TaskQueue.invalidate_group group; true) + | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true) | _ => false); in res_ok end); val task = SYNCHRONIZED "future" (fn () => - change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ()); + change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ()); in Future {task = task, group = group, result = result} end; -fun fork e = future NONE [] true e; -fun fork_group group e = future (SOME group) [] true e; -fun fork_deps deps e = future NONE (map task_of deps) true e; -fun fork_background e = future NONE [] false e; +fun fork e = future NONE [] 0 e; +fun fork_group group e = future (SOME group) [] 0 e; +fun fork_deps deps e = future NONE (map task_of deps) 0 e; +fun fork_pri pri e = future NONE [] pri e; (* join: retrieve results *) @@ -273,7 +265,7 @@ fun join_loop _ [] = () | join_loop name tasks = (case SYNCHRONIZED name (fn () => - change_result queue (TaskQueue.dequeue_towards tasks)) of + change_result queue (Task_Queue.dequeue_towards tasks)) of NONE => () | SOME (work, tasks') => (execute name work; join_loop name tasks')); val _ = @@ -281,18 +273,18 @@ NONE => (*alien thread -- refrain from contending for resources*) while exists (not o is_finished) xs - do SYNCHRONIZED "join_thread" (fn () => wait "join_thread") + do SYNCHRONIZED "join_thread" (fn () => wait ()) | SOME (name, task) => (*proper task -- actively work towards results*) let val unfinished = xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); val _ = SYNCHRONIZED "join" (fn () => - (change queue (TaskQueue.depend unfinished task); notify_all ())); + (change queue (Task_Queue.depend unfinished task); notify_all ())); val _ = join_loop ("join_loop: " ^ name) unfinished; val _ = while exists (not o is_finished) xs - do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task"); + do SYNCHRONIZED "join_task" (fn () => worker_wait ()); in () end); in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) (); @@ -300,18 +292,16 @@ fun join_result x = singleton join_results x; fun join x = Exn.release (join_result x); -fun map f x = fork_deps [x] (fn () => f (join x)); +fun map f x = + let val task = task_of x + in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end; (* misc operations *) -(*focus: collection of high-priority task*) -fun focus tasks = SYNCHRONIZED "focus" (fn () => - change queue (TaskQueue.focus tasks)); - (*interrupt: permissive signal, may get ignored*) fun interrupt_task id = SYNCHRONIZED "interrupt" - (fn () => TaskQueue.interrupt_external (! queue) id); + (fn () => Task_Queue.interrupt_external (! queue) id); (*cancel: present and future group members will be interrupted eventually*) fun cancel x = @@ -324,12 +314,12 @@ if Multithreading.available then (scheduler_check "shutdown check"; SYNCHRONIZED "shutdown" (fn () => - (while not (scheduler_active ()) do wait "shutdown: scheduler inactive"; - while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join"; + (while not (scheduler_active ()) do wait (); + while not (Task_Queue.is_empty (! queue)) do wait (); do_shutdown := true; notify_all (); - while not (null (! workers)) do wait "shutdown: workers"; - while scheduler_active () do wait "shutdown: scheduler still active"; + while not (null (! workers)) do wait (); + while scheduler_active () do wait (); OS.Process.sleep (Time.fromMilliseconds 300)))) else ();