--- a/src/Pure/Concurrent/future.ML Wed Aug 10 10:59:37 2011 +0200
+++ b/src/Pure/Concurrent/future.ML Wed Aug 10 14:04:45 2011 +0200
@@ -39,6 +39,8 @@
val task_of: 'a future -> Task_Queue.task
val peek: 'a future -> 'a Exn.result option
val is_finished: 'a future -> bool
+ val cancel_group: Task_Queue.group -> unit
+ val cancel: 'a future -> unit
val forks:
{name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
(unit -> 'a) list -> 'a future list
@@ -57,8 +59,6 @@
val fulfill_result: 'a future -> 'a Exn.result -> unit
val fulfill: 'a future -> 'a -> unit
val interruptible_task: ('a -> 'b) -> 'a -> 'b
- val cancel_group: Task_Queue.group -> unit
- val cancel: 'a future -> unit
val shutdown: unit -> unit
val status: (unit -> 'a) -> 'a
end;
@@ -74,8 +74,7 @@
val tag = Universal.tag () : Task_Queue.task option Universal.tag;
in
fun worker_task () = the_default NONE (Thread.getLocal tag);
- fun setmp_worker_task data f x =
- Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
+ fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
end;
val worker_group = Option.map Task_Queue.group_of_task o worker_task;
@@ -107,19 +106,6 @@
fun peek x = Single_Assignment.peek (result_of x);
fun is_finished x = is_some (peek x);
-fun assign_result group result res =
- let
- val _ = Single_Assignment.assign result res
- handle exn as Fail _ =>
- (case Single_Assignment.peek result of
- SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
- | _ => reraise exn);
- val ok =
- (case the (Single_Assignment.peek result) of
- Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
- | Exn.Res _ => true);
- in ok end;
-
(** scheduling **)
@@ -173,23 +159,16 @@
fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
-(* execute future jobs *)
+(* cancellation primitives *)
-fun future_job group (e: unit -> 'a) =
- let
- val result = Single_Assignment.var "future" : 'a result;
- val pos = Position.thread_data ();
- fun job ok =
- let
- val res =
- if ok then
- Exn.capture (fn () =>
- Multithreading.with_attributes Multithreading.private_interrupts
- (fn _ => Position.setmp_thread_data pos e ()) before
- Multithreading.interrupted ()) ()
- else Exn.interrupt_exn;
- in assign_result group result res end;
- in (result, job) end;
+fun interruptible_task f x =
+ if Multithreading.available then
+ Multithreading.with_attributes
+ (if is_some (worker_task ())
+ then Multithreading.private_interrupts
+ else Multithreading.public_interrupts)
+ (fn _ => f x)
+ else interruptible f x;
fun cancel_now group = (*requires SYNCHRONIZED*)
Task_Queue.cancel (! queue) group;
@@ -198,7 +177,10 @@
(Unsynchronized.change canceled (insert Task_Queue.eq_group group);
broadcast scheduler_event);
-fun execute (task, jobs) =
+
+(* worker threads *)
+
+fun worker_exec (task, jobs) =
let
val group = Task_Queue.group_of_task task;
val valid = not (Task_Queue.is_canceled group);
@@ -224,9 +206,6 @@
in () end);
in () end;
-
-(* worker threads *)
-
fun worker_wait active cond = (*requires SYNCHRONIZED*)
let
val state =
@@ -253,7 +232,7 @@
fun worker_loop name =
(case SYNCHRONIZED name (fn () => worker_next ()) of
NONE => ()
- | SOME work => (Exn.capture Multithreading.interrupted (); execute work; worker_loop name));
+ | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name));
fun worker_start name = (*requires SYNCHRONIZED*)
Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
@@ -397,6 +376,48 @@
(** futures **)
+(* cancellation *)
+
+(*cancel: present and future group members will be interrupted eventually*)
+fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
+ (if cancel_now group then () else cancel_later group;
+ signal work_available; scheduler_check ()));
+
+fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
+
+
+(* future jobs *)
+
+fun assign_result group result res =
+ let
+ val _ = Single_Assignment.assign result res
+ handle exn as Fail _ =>
+ (case Single_Assignment.peek result of
+ SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
+ | _ => reraise exn);
+ val ok =
+ (case the (Single_Assignment.peek result) of
+ Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
+ | Exn.Res _ => true);
+ in ok end;
+
+fun future_job group (e: unit -> 'a) =
+ let
+ val result = Single_Assignment.var "future" : 'a result;
+ val pos = Position.thread_data ();
+ fun job ok =
+ let
+ val res =
+ if ok then
+ Exn.capture (fn () =>
+ Multithreading.with_attributes Multithreading.private_interrupts
+ (fn _ => Position.setmp_thread_data pos e ()) before
+ Multithreading.interrupted ()) ()
+ else Exn.interrupt_exn;
+ in assign_result group result res end;
+ in (result, job) end;
+
+
(* fork *)
fun forks {name, group, deps, pri} es =
@@ -452,7 +473,8 @@
| (SOME work, deps') => SOME (work, deps'));
fun execute_work NONE = ()
- | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
+ | execute_work (SOME (work, deps')) =
+ (worker_joining (fn () => worker_exec work); join_work deps')
and join_work deps =
Multithreading.with_attributes Multithreading.no_interrupts
(fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
@@ -475,7 +497,7 @@
fun join x = Exn.release (join_result x);
-(* fast-path versions -- bypassing full task management *)
+(* fast-path versions -- bypassing task queue *)
fun value (x: 'a) =
let
@@ -538,7 +560,7 @@
SYNCHRONIZED "fulfill_result" (fn () =>
Unsynchronized.change_result queue
(Task_Queue.dequeue_passive (Thread.self ()) task));
- in if still_passive then execute (task, [job]) else () end);
+ in if still_passive then worker_exec (task, [job]) else () end);
val _ =
if is_some (Single_Assignment.peek result) then ()
else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
@@ -547,25 +569,6 @@
fun fulfill x res = fulfill_result x (Exn.Res res);
-(* cancellation *)
-
-fun interruptible_task f x =
- if Multithreading.available then
- Multithreading.with_attributes
- (if is_some (worker_task ())
- then Multithreading.private_interrupts
- else Multithreading.public_interrupts)
- (fn _ => f x)
- else interruptible f x;
-
-(*cancel: present and future group members will be interrupted eventually*)
-fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
- (if cancel_now group then () else cancel_later group;
- signal work_available; scheduler_check ()));
-
-fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
-
-
(* shutdown *)
fun shutdown () =