--- a/src/Pure/Concurrent/future.ML Wed Feb 02 13:44:40 2011 +0100
+++ b/src/Pure/Concurrent/future.ML Wed Feb 02 15:04:09 2011 +0100
@@ -32,19 +32,16 @@
signature FUTURE =
sig
- type task = Task_Queue.task
- type group = Task_Queue.group
- val is_worker: unit -> bool
val worker_task: unit -> Task_Queue.task option
val worker_group: unit -> Task_Queue.group option
val worker_subgroup: unit -> Task_Queue.group
type 'a future
- val task_of: 'a future -> task
- val group_of: 'a future -> group
+ val task_of: 'a future -> Task_Queue.task
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
- val forks: {name: string, group: group option, deps: task list, pri: int} ->
- (unit -> 'a) list -> 'a future list
+ val forks:
+ {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
+ (unit -> 'a) list -> 'a future list
val fork_pri: int -> (unit -> 'a) -> 'a future
val fork: (unit -> 'a) -> 'a future
val join_results: 'a future list -> 'a Exn.result list
@@ -52,12 +49,12 @@
val join: 'a future -> 'a
val value: 'a -> 'a future
val map: ('a -> 'b) -> 'a future -> 'b future
- val promise_group: group -> 'a future
+ val promise_group: Task_Queue.group -> 'a future
val promise: unit -> 'a future
val fulfill_result: 'a future -> 'a Exn.result -> unit
val fulfill: 'a future -> 'a -> unit
val interruptible_task: ('a -> 'b) -> 'a -> 'b
- val cancel_group: group -> unit
+ val cancel_group: Task_Queue.group -> unit
val cancel: 'a future -> unit
val shutdown: unit -> unit
val status: (unit -> 'a) -> 'a
@@ -70,20 +67,15 @@
(* identifiers *)
-type task = Task_Queue.task;
-type group = Task_Queue.group;
-
local
- val tag = Universal.tag () : (task * group) option Universal.tag;
+ val tag = Universal.tag () : Task_Queue.task option Universal.tag;
in
- fun thread_data () = the_default NONE (Thread.getLocal tag);
- fun setmp_thread_data data f x =
- Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
+ fun worker_task () = the_default NONE (Thread.getLocal tag);
+ fun setmp_worker_task data f x =
+ Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
end;
-val is_worker = is_some o thread_data;
-val worker_task = Option.map #1 o thread_data;
-val worker_group = Option.map #2 o thread_data;
+val worker_group = Option.map Task_Queue.group_of_task o worker_task;
fun worker_subgroup () = Task_Queue.new_group (worker_group ());
fun worker_joining e =
@@ -103,12 +95,10 @@
datatype 'a future = Future of
{promised: bool,
- task: task,
- group: group,
+ task: Task_Queue.task,
result: 'a result};
fun task_of (Future {task, ...}) = task;
-fun group_of (Future {group, ...}) = group;
fun result_of (Future {result, ...}) = result;
fun peek x = Single_Assignment.peek (result_of x);
@@ -204,12 +194,13 @@
(Unsynchronized.change canceled (insert Task_Queue.eq_group group);
broadcast scheduler_event);
-fun execute (task, group, jobs) =
+fun execute (task, jobs) =
let
+ val group = Task_Queue.group_of_task task;
val valid = not (Task_Queue.is_canceled group);
val ok =
Task_Queue.running task (fn () =>
- setmp_thread_data (task, group) (fn () =>
+ setmp_worker_task task (fn () =>
fold (fn job => fn ok => job valid andalso ok) jobs true) ());
val _ = Multithreading.tracing 1 (fn () =>
let
@@ -416,7 +407,7 @@
let
val (result, job) = future_job grp e;
val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
- val future = Future {promised = false, task = task, group = grp, result = result};
+ val future = Future {promised = false, task = task, result = result};
in (future, (minimal orelse minimal', queue')) end;
in
SYNCHRONIZED "enqueue" (fn () =>
@@ -443,7 +434,7 @@
NONE => Exn.Exn (Fail "Unfinished future")
| SOME res =>
if Exn.is_interrupt_exn res then
- (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
+ (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of
[] => res
| exns => Exn.Exn (Exn.EXCEPTIONS exns))
else res);
@@ -470,7 +461,7 @@
if forall is_finished xs then ()
else if Multithreading.self_critical () then
error "Cannot join future values within critical section"
- else if is_some (thread_data ()) then
+ else if is_some (worker_task ()) then
join_work (Task_Queue.init_deps (map task_of xs))
else List.app (ignore o Single_Assignment.await o result_of) xs;
in map get_result xs end;
@@ -485,15 +476,16 @@
fun value (x: 'a) =
let
- val group = Task_Queue.new_group NONE;
+ val task = Task_Queue.dummy_task ();
+ val group = Task_Queue.group_of_task task;
val result = Single_Assignment.var "value" : 'a result;
val _ = assign_result group result (Exn.Result x);
- in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
+ in Future {promised = false, task = task, result = result} end;
fun map_future f x =
let
val task = task_of x;
- val group = Task_Queue.new_group (SOME (group_of x));
+ val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
val (result, job) = future_job group (fn () => f (join x));
val extended = SYNCHRONIZED "extend" (fn () =>
@@ -501,7 +493,7 @@
SOME queue' => (queue := queue'; true)
| NONE => false));
in
- if extended then Future {promised = false, task = task, group = group, result = result}
+ if extended then Future {promised = false, task = task, result = result}
else
singleton
(forks {name = "Future.map", group = SOME group,
@@ -522,14 +514,15 @@
else reraise exn;
val task = SYNCHRONIZED "enqueue_passive" (fn () =>
Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
- in Future {promised = true, task = task, group = group, result = result} end;
+ in Future {promised = true, task = task, result = result} end;
fun promise () = promise_group (worker_subgroup ());
-fun fulfill_result (Future {promised, task, group, result}) res =
+fun fulfill_result (Future {promised, task, result}) res =
if not promised then raise Fail "Not a promised future"
else
let
+ val group = Task_Queue.group_of_task task;
fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
val _ =
Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
@@ -538,7 +531,7 @@
SYNCHRONIZED "fulfill_result" (fn () =>
Unsynchronized.change_result queue
(Task_Queue.dequeue_passive (Thread.self ()) task));
- in if still_passive then execute (task, group, [job]) else () end);
+ in if still_passive then execute (task, [job]) else () end);
val _ =
worker_waiting (Task_Queue.init_deps [task])
(fn () => Single_Assignment.await result);
@@ -552,7 +545,7 @@
fun interruptible_task f x =
if Multithreading.available then
Multithreading.with_attributes
- (if is_worker ()
+ (if is_some (worker_task ())
then Multithreading.private_interrupts
else Multithreading.public_interrupts)
(fn _ => f x)
@@ -563,7 +556,7 @@
(if cancel_now group then () else cancel_later group;
signal work_available; scheduler_check ()));
-fun cancel x = cancel_group (group_of x);
+fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
(* shutdown *)