added Future.promise/fulfill -- promised futures that are fulfilled by external means;
Future.value: official result assignment -- produces immutable ref;
Future.shutdown: raw_wait keeps raw task attributes, e.g. asynchronous interrupts of toplevel;
Task_Queue: passive tasks track dependencies, but lack any evaluation process;
tuned;
--- a/src/Pure/Concurrent/future.ML Tue Jan 05 18:20:18 2010 +0100
+++ b/src/Pure/Concurrent/future.ML Tue Jan 05 23:38:10 2010 +0100
@@ -23,6 +23,10 @@
of runtime resources is distorted either if workers yield CPU time
(e.g. via system sleep or wait operations), or if non-worker
threads contend for significant runtime resources independently.
+
+ * Promised futures are fulfilled by external means. There is no
+ associated evaluation task, but other futures can depend on them
+ as usual.
*)
signature FUTURE =
@@ -37,7 +41,6 @@
val group_of: 'a future -> group
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
- val value: 'a -> 'a future
val fork_group: group -> (unit -> 'a) -> 'a future
val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
@@ -46,7 +49,12 @@
val join_results: 'a future list -> 'a Exn.result list
val join_result: 'a future -> 'a Exn.result
val join: 'a future -> 'a
+ val value: 'a -> 'a future
val map: ('a -> 'b) -> 'a future -> 'b future
+ val promise_group: group -> 'a future
+ val promise: unit -> 'a future
+ val fulfill_result: 'a future -> 'a Exn.result -> unit
+ val fulfill: 'a future -> 'a -> unit
val interruptible_task: ('a -> 'b) -> 'a -> 'b
val cancel_group: group -> unit
val cancel: 'a future -> unit
@@ -75,11 +83,14 @@
val worker_task = Option.map #1 o thread_data;
val worker_group = Option.map #2 o thread_data;
+fun new_group () = Task_Queue.new_group (worker_group ());
+
(* datatype future *)
datatype 'a future = Future of
- {task: task,
+ {promised: bool,
+ task: task,
group: group,
result: 'a Exn.result option Synchronized.var};
@@ -90,10 +101,14 @@
fun peek x = Synchronized.value (result_of x);
fun is_finished x = is_some (peek x);
-fun value x = Future
- {task = Task_Queue.new_task 0,
- group = Task_Queue.new_group NONE,
- result = Synchronized.var "future" (SOME (Exn.Result x))};
+fun assign_result group result res =
+ let
+ val _ = Synchronized.assign result (K (SOME res));
+ val ok =
+ (case res of
+ Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
+ | Exn.Result _ => true);
+ in ok end;
@@ -111,6 +126,9 @@
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
+fun raw_wait cond = (*requires SYNCHRONIZED*)
+ ConditionVar.wait (cond, lock);
+
fun wait cond = (*requires SYNCHRONIZED*)
Multithreading.sync_wait NONE NONE cond lock;
@@ -160,12 +178,7 @@
Exn.capture (fn () =>
Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
else Exn.Exn Exn.Interrupt;
- val _ = Synchronized.assign result (K (SOME res));
- in
- (case res of
- Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
- | Exn.Result _ => true)
- end;
+ in assign_result group result res end;
in (result, job) end;
fun do_cancel group = (*requires SYNCHRONIZED*)
@@ -247,7 +260,7 @@
if tick andalso ! status_ticks = 0 then
Multithreading.tracing 1 (fn () =>
let
- val {ready, pending, running} = Task_Queue.status (! queue);
+ val {ready, pending, running, passive} = Task_Queue.status (! queue);
val total = length (! workers);
val active = count_workers Working;
val waiting = count_workers Waiting;
@@ -255,7 +268,8 @@
"SCHEDULE " ^ Time.toString now ^ ": " ^
string_of_int ready ^ " ready, " ^
string_of_int pending ^ " pending, " ^
- string_of_int running ^ " running; " ^
+ string_of_int running ^ " running, " ^
+ string_of_int passive ^ " passive; " ^
string_of_int total ^ " workers, " ^
string_of_int active ^ " active, " ^
string_of_int waiting ^ " waiting "
@@ -329,7 +343,7 @@
(* shutdown *)
- val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
+ val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
val continue = not (! do_shutdown andalso null (! workers));
val _ = if continue then () else scheduler := NONE;
@@ -364,8 +378,8 @@
let
val group =
(case opt_group of
- SOME group => group
- | NONE => Task_Queue.new_group (worker_group ()));
+ NONE => new_group ()
+ | SOME group => group);
val (result, job) = future_job group e;
val task = SYNCHRONIZED "enqueue" (fn () =>
let
@@ -374,7 +388,7 @@
val _ = if minimal then signal work_available else ();
val _ = scheduler_check ();
in task end);
- in Future {task = task, group = group, result = result} end;
+ in Future {promised = false, task = task, group = group, result = result} end;
fun fork_group group e = fork_future (SOME group) [] 0 e;
fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
@@ -432,7 +446,14 @@
fun join x = Exn.release (join_result x);
-(* map *)
+(* fast-path versions -- bypassing full task management *)
+
+fun value (x: 'a) =
+ let
+ val group = Task_Queue.new_group NONE;
+ val result = Synchronized.var "value" NONE : 'a Exn.result option Synchronized.var;
+ val _ = assign_result group result (Exn.Result x);
+ in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
fun map_future f x =
let
@@ -445,11 +466,32 @@
SOME queue' => (queue := queue'; true)
| NONE => false));
in
- if extended then Future {task = task, group = group, result = result}
+ if extended then Future {promised = false, task = task, group = group, result = result}
else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
end;
+(* promised futures -- fulfilled by external means *)
+
+fun promise_group group : 'a future =
+ let
+ val result = Synchronized.var "promise" (NONE: 'a Exn.result option);
+ val task = SYNCHRONIZED "enqueue" (fn () =>
+ Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
+ in Future {promised = true, task = task, group = group, result = result} end;
+
+fun promise () = promise_group (new_group ());
+
+fun fulfill_result (Future {promised, task, group, result}) res =
+ let
+ val _ = promised orelse raise Fail "Not a promised future";
+ fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
+ val _ = execute (task, group, [job]);
+ in () end;
+
+fun fulfill x res = fulfill_result x (Exn.Result res);
+
+
(* cancellation *)
fun interruptible_task f x =
@@ -472,7 +514,7 @@
if Multithreading.available then
SYNCHRONIZED "shutdown" (fn () =>
while scheduler_active () do
- (wait scheduler_event; broadcast_work ()))
+ (raw_wait scheduler_event; broadcast_work ()))
else ();
--- a/src/Pure/Concurrent/task_queue.ML Tue Jan 05 18:20:18 2010 +0100
+++ b/src/Pure/Concurrent/task_queue.ML Tue Jan 05 23:38:10 2010 +0100
@@ -7,7 +7,7 @@
signature TASK_QUEUE =
sig
type task
- val new_task: int -> task
+ val dummy_task: task
val pri_of_task: task -> int
val str_of_task: task -> string
type group
@@ -20,10 +20,11 @@
val str_of_group: group -> string
type queue
val empty: queue
- val is_empty: queue -> bool
- val status: queue -> {ready: int, pending: int, running: int}
+ val all_passive: queue -> bool
+ val status: queue -> {ready: int, pending: int, running: int, passive: int}
val cancel: queue -> group -> bool
val cancel_all: queue -> group list
+ val enqueue_passive: group -> queue -> task * queue
val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
val extend: task -> (bool -> bool) -> queue -> queue option
val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
@@ -38,13 +39,14 @@
(* tasks *)
-datatype task = Task of int * serial;
+datatype task = Task of int option * serial;
+val dummy_task = Task (NONE, ~1);
fun new_task pri = Task (pri, serial ());
-fun pri_of_task (Task (pri, _)) = pri;
+fun pri_of_task (Task (pri, _)) = the_default 0 pri;
fun str_of_task (Task (_, i)) = string_of_int i;
-fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
+fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
structure Task_Graph = Graph(type key = task val ord = task_ord);
@@ -91,7 +93,8 @@
datatype job =
Job of (bool -> bool) list |
- Running of Thread.thread;
+ Running of Thread.thread |
+ Passive;
type jobs = (group * job) Task_Graph.T;
@@ -123,20 +126,24 @@
fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
-fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
+
+fun all_passive (Queue {jobs, ...}) =
+ Task_Graph.get_first NONE
+ ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none;
(* queue status *)
fun status (Queue {jobs, ...}) =
let
- val (x, y, z) =
- Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z) =>
+ val (x, y, z, w) =
+ Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
(case job of
- Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z)
- | Running _ => (x, y, z + 1)))
- jobs (0, 0, 0);
- in {ready = x, pending = y, running = z} end;
+ Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
+ | Running _ => (x, y, z + 1, w)
+ | Passive => (x, y, z, w + 1)))
+ jobs (0, 0, 0, 0);
+ in {ready = x, pending = y, running = z, passive = w} end;
(* cancel -- peers and sub-groups *)
@@ -154,18 +161,27 @@
let
fun cancel_job (group, job) (groups, running) =
(cancel_group group Exn.Interrupt;
- (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
+ (case job of
+ Running t => (insert eq_group group groups, insert Thread.equal t running)
| _ => (groups, running)));
- val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
+ val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
val _ = List.app SimpleThread.interrupt running;
- in groups end;
+ in running_groups end;
(* enqueue *)
+fun enqueue_passive group (Queue {groups, jobs, cache}) =
+ let
+ val task = new_task NONE;
+ val groups' = groups
+ |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
+ val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
+ in (task, make_queue groups' jobs' cache) end;
+
fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
let
- val task = new_task pri;
+ val task = new_task (SOME pri);
val groups' = groups
|> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
val jobs' = jobs