--- a/src/Pure/Concurrent/future.ML Tue Dec 16 12:13:53 2008 +0100
+++ b/src/Pure/Concurrent/future.ML Tue Dec 16 16:25:18 2008 +0100
@@ -1,5 +1,4 @@
(* Title: Pure/Concurrent/future.ML
- ID: $Id$
Author: Makarius
Future values.
@@ -28,8 +27,8 @@
signature FUTURE =
sig
val enabled: unit -> bool
- type task = TaskQueue.task
- type group = TaskQueue.group
+ type task = Task_Queue.task
+ type group = Task_Queue.group
val thread_data: unit -> (string * task) option
type 'a future
val task_of: 'a future -> task
@@ -40,12 +39,11 @@
val fork: (unit -> 'a) -> 'a future
val fork_group: group -> (unit -> 'a) -> 'a future
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
- val fork_background: (unit -> 'a) -> 'a future
+ val fork_pri: int -> (unit -> 'a) -> 'a future
val join_results: 'a future list -> 'a Exn.result list
val join_result: 'a future -> 'a Exn.result
val join: 'a future -> 'a
val map: ('a -> 'b) -> 'a future -> 'b future
- val focus: task list -> unit
val interrupt_task: string -> unit
val cancel: 'a future -> unit
val shutdown: unit -> unit
@@ -63,8 +61,8 @@
(* identifiers *)
-type task = TaskQueue.task;
-type group = TaskQueue.group;
+type task = Task_Queue.task;
+type group = Task_Queue.group;
local val tag = Universal.tag () : (string * task) option Universal.tag in
fun thread_data () = the_default NONE (Thread.getLocal tag);
@@ -86,8 +84,8 @@
fun is_finished x = is_some (peek x);
fun value x = Future
- {task = TaskQueue.new_task (),
- group = TaskQueue.new_group (),
+ {task = Task_Queue.new_task 0,
+ group = Task_Queue.new_group (),
result = ref (SOME (Exn.Result x))};
@@ -96,12 +94,12 @@
(* global state *)
-val queue = ref TaskQueue.empty;
+val queue = ref Task_Queue.empty;
val next = ref 0;
val workers = ref ([]: (Thread.thread * bool) list);
val scheduler = ref (NONE: Thread.thread option);
val excessive = ref 0;
-val canceled = ref ([]: TaskQueue.group list);
+val canceled = ref ([]: Task_Queue.group list);
val do_shutdown = ref false;
@@ -114,15 +112,11 @@
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
-fun wait name = (*requires SYNCHRONIZED*)
- (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
+fun wait () = (*requires SYNCHRONIZED*)
ConditionVar.wait (cond, lock);
- Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
-fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
- (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
+fun wait_timeout timeout = (*requires SYNCHRONIZED*)
ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
- Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
fun notify_all () = (*requires SYNCHRONIZED*)
ConditionVar.broadcast cond;
@@ -150,9 +144,9 @@
val _ = trace_active ();
val ok = setmp_thread_data (name, task) run ();
val _ = SYNCHRONIZED "execute" (fn () =>
- (change queue (TaskQueue.finish task);
+ (change queue (Task_Queue.finish task);
if ok then ()
- else if TaskQueue.cancel (! queue) group then ()
+ else if Task_Queue.cancel (! queue) group then ()
else change canceled (cons group);
notify_all ()));
in () end;
@@ -160,23 +154,23 @@
(* worker threads *)
-fun worker_wait name = (*requires SYNCHRONIZED*)
- (change_active false; wait name; change_active true);
+fun worker_wait () = (*requires SYNCHRONIZED*)
+ (change_active false; wait (); change_active true);
-fun worker_next name = (*requires SYNCHRONIZED*)
+fun worker_next () = (*requires SYNCHRONIZED*)
if ! excessive > 0 then
(dec excessive;
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
notify_all ();
NONE)
else
- (case change_result queue TaskQueue.dequeue of
- NONE => (worker_wait name; worker_next name)
+ (case change_result queue Task_Queue.dequeue of
+ NONE => (worker_wait (); worker_next ())
| some => some);
fun worker_loop name =
- (case SYNCHRONIZED name (fn () => worker_next name) of
- NONE => Multithreading.tracing 3 (fn () => name ^ ": exit")
+ (case SYNCHRONIZED name worker_next of
+ NONE => ()
| SOME work => (execute name work; worker_loop name));
fun worker_start name = (*requires SYNCHRONIZED*)
@@ -204,27 +198,25 @@
else ();
(*canceled groups*)
- val _ = change canceled (filter_out (TaskQueue.cancel (! queue)));
+ val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
(*shutdown*)
val continue = not (! do_shutdown andalso null (! workers));
val _ = if continue then () else scheduler := NONE;
val _ = notify_all ();
- val _ = wait_timeout "scheduler" (Time.fromSeconds 3);
+ val _ = wait_timeout (Time.fromSeconds 3);
in continue end;
fun scheduler_loop () =
- (while SYNCHRONIZED "scheduler" scheduler_next do ();
- Multithreading.tracing 3 (fn () => "scheduler: exit"));
+ while SYNCHRONIZED "scheduler" scheduler_next do ();
fun scheduler_active () = (*requires SYNCHRONIZED*)
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
fun scheduler_check name = SYNCHRONIZED name (fn () =>
if not (scheduler_active ()) then
- (Multithreading.tracing 3 (fn () => "scheduler: fork");
- do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
+ (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
else if ! do_shutdown then error "Scheduler shutdown in progress"
else ());
@@ -235,7 +227,7 @@
let
val _ = scheduler_check "future check";
- val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
+ val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());
val result = ref (NONE: 'a Exn.result option);
val run = Multithreading.with_attributes (Thread.getAttributes ())
@@ -246,18 +238,18 @@
val res_ok =
(case res of
Exn.Result _ => true
- | Exn.Exn Exn.Interrupt => (TaskQueue.invalidate_group group; true)
+ | Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)
| _ => false);
in res_ok end);
val task = SYNCHRONIZED "future" (fn () =>
- change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
+ change_result queue (Task_Queue.enqueue group deps pri run) before notify_all ());
in Future {task = task, group = group, result = result} end;
-fun fork e = future NONE [] true e;
-fun fork_group group e = future (SOME group) [] true e;
-fun fork_deps deps e = future NONE (map task_of deps) true e;
-fun fork_background e = future NONE [] false e;
+fun fork e = future NONE [] 0 e;
+fun fork_group group e = future (SOME group) [] 0 e;
+fun fork_deps deps e = future NONE (map task_of deps) 0 e;
+fun fork_pri pri e = future NONE [] pri e;
(* join: retrieve results *)
@@ -273,7 +265,7 @@
fun join_loop _ [] = ()
| join_loop name tasks =
(case SYNCHRONIZED name (fn () =>
- change_result queue (TaskQueue.dequeue_towards tasks)) of
+ change_result queue (Task_Queue.dequeue_towards tasks)) of
NONE => ()
| SOME (work, tasks') => (execute name work; join_loop name tasks'));
val _ =
@@ -281,18 +273,18 @@
NONE =>
(*alien thread -- refrain from contending for resources*)
while exists (not o is_finished) xs
- do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
+ do SYNCHRONIZED "join_thread" (fn () => wait ())
| SOME (name, task) =>
(*proper task -- actively work towards results*)
let
val unfinished = xs |> map_filter
(fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
val _ = SYNCHRONIZED "join" (fn () =>
- (change queue (TaskQueue.depend unfinished task); notify_all ()));
+ (change queue (Task_Queue.depend unfinished task); notify_all ()));
val _ = join_loop ("join_loop: " ^ name) unfinished;
val _ =
while exists (not o is_finished) xs
- do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task");
+ do SYNCHRONIZED "join_task" (fn () => worker_wait ());
in () end);
in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
@@ -300,18 +292,16 @@
fun join_result x = singleton join_results x;
fun join x = Exn.release (join_result x);
-fun map f x = fork_deps [x] (fn () => f (join x));
+fun map f x =
+ let val task = task_of x
+ in future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) end;
(* misc operations *)
-(*focus: collection of high-priority task*)
-fun focus tasks = SYNCHRONIZED "focus" (fn () =>
- change queue (TaskQueue.focus tasks));
-
(*interrupt: permissive signal, may get ignored*)
fun interrupt_task id = SYNCHRONIZED "interrupt"
- (fn () => TaskQueue.interrupt_external (! queue) id);
+ (fn () => Task_Queue.interrupt_external (! queue) id);
(*cancel: present and future group members will be interrupted eventually*)
fun cancel x =
@@ -324,12 +314,12 @@
if Multithreading.available then
(scheduler_check "shutdown check";
SYNCHRONIZED "shutdown" (fn () =>
- (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
- while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
+ (while not (scheduler_active ()) do wait ();
+ while not (Task_Queue.is_empty (! queue)) do wait ();
do_shutdown := true;
notify_all ();
- while not (null (! workers)) do wait "shutdown: workers";
- while scheduler_active () do wait "shutdown: scheduler still active";
+ while not (null (! workers)) do wait ();
+ while scheduler_active () do wait ();
OS.Process.sleep (Time.fromMilliseconds 300))))
else ();