--- a/src/Pure/Concurrent/future.ML Wed Aug 10 15:17:24 2011 +0200
+++ b/src/Pure/Concurrent/future.ML Wed Aug 10 16:05:14 2011 +0200
@@ -41,9 +41,10 @@
val is_finished: 'a future -> bool
val cancel_group: Task_Queue.group -> unit
val cancel: 'a future -> unit
- val forks:
- {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
- (unit -> 'a) list -> 'a future list
+ type fork_params =
+ {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
+ pri: int, interrupts: bool}
+ val forks: fork_params -> (unit -> 'a) list -> 'a future list
val fork_pri: int -> (unit -> 'a) -> 'a future
val fork: (unit -> 'a) -> 'a future
val join_results: 'a future list -> 'a Exn.result list
@@ -51,9 +52,7 @@
val join: 'a future -> 'a
val value: 'a -> 'a future
val map: ('a -> 'b) -> 'a future -> 'b future
- val cond_forks:
- {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
- (unit -> 'a) list -> 'a future list
+ val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
val promise_group: Task_Queue.group -> 'a future
val promise: unit -> 'a future
val fulfill_result: 'a future -> 'a Exn.result -> unit
@@ -403,7 +402,7 @@
| Exn.Res _ => true);
in ok end;
-fun future_job group (e: unit -> 'a) =
+fun future_job group interrupts (e: unit -> 'a) =
let
val result = Single_Assignment.var "future" : 'a result;
val pos = Position.thread_data ();
@@ -412,7 +411,9 @@
val res =
if ok then
Exn.capture (fn () =>
- Multithreading.with_attributes Multithreading.private_interrupts
+ Multithreading.with_attributes
+ (if interrupts
+ then Multithreading.private_interrupts else Multithreading.no_interrupts)
(fn _ => Position.setmp_thread_data pos e ()) before
Multithreading.interrupted ()) ()
else Exn.interrupt_exn;
@@ -422,7 +423,11 @@
(* fork *)
-fun forks {name, group, deps, pri} es =
+type fork_params =
+ {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
+ pri: int, interrupts: bool};
+
+fun forks ({name, group, deps, pri, interrupts}: fork_params) es =
if null es then []
else
let
@@ -432,7 +437,7 @@
| SOME grp => grp);
fun enqueue e queue =
let
- val (result, job) = future_job grp e;
+ val (result, job) = future_job grp interrupts e;
val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
val future = Future {promised = false, task = task, result = result};
in (future, queue') end;
@@ -447,7 +452,9 @@
in futures end)
end;
-fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e;
+fun fork_pri pri e =
+ singleton (forks {name = "", group = NONE, deps = [], pri = pri, interrupts = true}) e;
+
fun fork e = fork_pri 0 e;
@@ -513,7 +520,7 @@
let
val task = task_of x;
val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
- val (result, job) = future_job group (fn () => f (join x));
+ val (result, job) = future_job group true (fn () => f (join x));
val extended = SYNCHRONIZED "extend" (fn () =>
(case Task_Queue.extend task job (! queue) of
@@ -523,8 +530,8 @@
if extended then Future {promised = false, task = task, result = result}
else
singleton
- (forks {name = "Future.map", group = SOME group,
- deps = [task], pri = Task_Queue.pri_of_task task})
+ (forks {name = "Future.map", group = SOME group, deps = [task],
+ pri = Task_Queue.pri_of_task task, interrupts = true})
(fn () => f (join x))
end;