--- a/src/Pure/Concurrent/future.ML Tue Jan 05 18:20:18 2010 +0100
+++ b/src/Pure/Concurrent/future.ML Tue Jan 05 23:38:10 2010 +0100
@@ -23,6 +23,10 @@
of runtime resources is distorted either if workers yield CPU time
(e.g. via system sleep or wait operations), or if non-worker
threads contend for significant runtime resources independently.
+
+ * Promised futures are fulfilled by external means. There is no
+ associated evaluation task, but other futures can depend on them
+ as usual.
*)
signature FUTURE =
@@ -37,7 +41,6 @@
val group_of: 'a future -> group
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
- val value: 'a -> 'a future
val fork_group: group -> (unit -> 'a) -> 'a future
val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
@@ -46,7 +49,12 @@
val join_results: 'a future list -> 'a Exn.result list
val join_result: 'a future -> 'a Exn.result
val join: 'a future -> 'a
+ val value: 'a -> 'a future
val map: ('a -> 'b) -> 'a future -> 'b future
+ val promise_group: group -> 'a future
+ val promise: unit -> 'a future
+ val fulfill_result: 'a future -> 'a Exn.result -> unit
+ val fulfill: 'a future -> 'a -> unit
val interruptible_task: ('a -> 'b) -> 'a -> 'b
val cancel_group: group -> unit
val cancel: 'a future -> unit
@@ -75,11 +83,14 @@
val worker_task = Option.map #1 o thread_data;
val worker_group = Option.map #2 o thread_data;
+fun new_group () = Task_Queue.new_group (worker_group ());
+
(* datatype future *)
datatype 'a future = Future of
- {task: task,
+ {promised: bool,
+ task: task,
group: group,
result: 'a Exn.result option Synchronized.var};
@@ -90,10 +101,14 @@
fun peek x = Synchronized.value (result_of x);
fun is_finished x = is_some (peek x);
-fun value x = Future
- {task = Task_Queue.new_task 0,
- group = Task_Queue.new_group NONE,
- result = Synchronized.var "future" (SOME (Exn.Result x))};
+fun assign_result group result res =
+ let
+ val _ = Synchronized.assign result (K (SOME res));
+ val ok =
+ (case res of
+ Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
+ | Exn.Result _ => true);
+ in ok end;
@@ -111,6 +126,9 @@
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
+fun raw_wait cond = (*requires SYNCHRONIZED*)
+ ConditionVar.wait (cond, lock);
+
fun wait cond = (*requires SYNCHRONIZED*)
Multithreading.sync_wait NONE NONE cond lock;
@@ -160,12 +178,7 @@
Exn.capture (fn () =>
Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
else Exn.Exn Exn.Interrupt;
- val _ = Synchronized.assign result (K (SOME res));
- in
- (case res of
- Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
- | Exn.Result _ => true)
- end;
+ in assign_result group result res end;
in (result, job) end;
fun do_cancel group = (*requires SYNCHRONIZED*)
@@ -247,7 +260,7 @@
if tick andalso ! status_ticks = 0 then
Multithreading.tracing 1 (fn () =>
let
- val {ready, pending, running} = Task_Queue.status (! queue);
+ val {ready, pending, running, passive} = Task_Queue.status (! queue);
val total = length (! workers);
val active = count_workers Working;
val waiting = count_workers Waiting;
@@ -255,7 +268,8 @@
"SCHEDULE " ^ Time.toString now ^ ": " ^
string_of_int ready ^ " ready, " ^
string_of_int pending ^ " pending, " ^
- string_of_int running ^ " running; " ^
+ string_of_int running ^ " running, " ^
+ string_of_int passive ^ " passive; " ^
string_of_int total ^ " workers, " ^
string_of_int active ^ " active, " ^
string_of_int waiting ^ " waiting "
@@ -329,7 +343,7 @@
(* shutdown *)
- val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
+ val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
val continue = not (! do_shutdown andalso null (! workers));
val _ = if continue then () else scheduler := NONE;
@@ -364,8 +378,8 @@
let
val group =
(case opt_group of
- SOME group => group
- | NONE => Task_Queue.new_group (worker_group ()));
+ NONE => new_group ()
+ | SOME group => group);
val (result, job) = future_job group e;
val task = SYNCHRONIZED "enqueue" (fn () =>
let
@@ -374,7 +388,7 @@
val _ = if minimal then signal work_available else ();
val _ = scheduler_check ();
in task end);
- in Future {task = task, group = group, result = result} end;
+ in Future {promised = false, task = task, group = group, result = result} end;
fun fork_group group e = fork_future (SOME group) [] 0 e;
fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
@@ -432,7 +446,14 @@
fun join x = Exn.release (join_result x);
-(* map *)
+(* fast-path versions -- bypassing full task management *)
+
+fun value (x: 'a) =
+ let
+ val group = Task_Queue.new_group NONE;
+ val result = Synchronized.var "value" NONE : 'a Exn.result option Synchronized.var;
+ val _ = assign_result group result (Exn.Result x);
+ in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
fun map_future f x =
let
@@ -445,11 +466,32 @@
SOME queue' => (queue := queue'; true)
| NONE => false));
in
- if extended then Future {task = task, group = group, result = result}
+ if extended then Future {promised = false, task = task, group = group, result = result}
else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
end;
+(* promised futures -- fulfilled by external means *)
+
+fun promise_group group : 'a future =
+ let
+ val result = Synchronized.var "promise" (NONE: 'a Exn.result option);
+ val task = SYNCHRONIZED "enqueue" (fn () =>
+ Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
+ in Future {promised = true, task = task, group = group, result = result} end;
+
+fun promise () = promise_group (new_group ());
+
+fun fulfill_result (Future {promised, task, group, result}) res =
+ let
+ val _ = promised orelse raise Fail "Not a promised future";
+ fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
+ val _ = execute (task, group, [job]);
+ in () end;
+
+fun fulfill x res = fulfill_result x (Exn.Result res);
+
+
(* cancellation *)
fun interruptible_task f x =
@@ -472,7 +514,7 @@
if Multithreading.available then
SYNCHRONIZED "shutdown" (fn () =>
while scheduler_active () do
- (wait scheduler_event; broadcast_work ()))
+ (raw_wait scheduler_event; broadcast_work ()))
else ();