| author | wenzelm | 
| Tue, 21 Jul 2009 11:30:12 +0200 | |
| changeset 32095 | ad4be204fdfe | 
| parent 32058 | c76fd93b3b99 | 
| child 32096 | cb9adb13f892 | 
| permissions | -rw-r--r-- | 
| 28156 | 1  | 
(* Title: Pure/Concurrent/future.ML  | 
2  | 
Author: Makarius  | 
|
3  | 
||
| 28201 | 4  | 
Future values.  | 
5  | 
||
6  | 
Notes:  | 
|
7  | 
||
8  | 
* Futures are similar to delayed evaluation, i.e. delay/force is  | 
|
9  | 
generalized to fork/join (and variants). The idea is to model  | 
|
10  | 
parallel value-oriented computations, but *not* communicating  | 
|
11  | 
processes.  | 
|
12  | 
||
13  | 
* Futures are grouped; failure of one group member causes the whole  | 
|
14  | 
group to be interrupted eventually.  | 
|
15  | 
||
16  | 
* Forked futures are evaluated spontaneously by a farm of worker  | 
|
17  | 
threads in the background; join resynchronizes the computation and  | 
|
18  | 
delivers results (values or exceptions).  | 
|
19  | 
||
20  | 
* The pool of worker threads is limited, usually in correlation with  | 
|
21  | 
the number of physical cores on the machine. Note that allocation  | 
|
22  | 
of runtime resources is distorted either if workers yield CPU time  | 
|
23  | 
(e.g. via system sleep or wait operations), or if non-worker  | 
|
24  | 
threads contend for significant runtime resources independently.  | 
|
| 28156 | 25  | 
*)  | 
26  | 
||
27  | 
signature FUTURE =  | 
|
28  | 
sig  | 
|
| 28645 | 29  | 
val enabled: unit -> bool  | 
| 29119 | 30  | 
type task = Task_Queue.task  | 
31  | 
type group = Task_Queue.group  | 
|
| 32058 | 32  | 
val is_worker: unit -> bool  | 
| 28972 | 33  | 
type 'a future  | 
34  | 
val task_of: 'a future -> task  | 
|
35  | 
val group_of: 'a future -> group  | 
|
36  | 
val peek: 'a future -> 'a Exn.result option  | 
|
37  | 
val is_finished: 'a future -> bool  | 
|
| 28997 | 38  | 
val value: 'a -> 'a future  | 
| 28972 | 39  | 
val fork: (unit -> 'a) -> 'a future  | 
| 
28979
 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 
wenzelm 
parents: 
28972 
diff
changeset
 | 
40  | 
val fork_group: group -> (unit -> 'a) -> 'a future  | 
| 
 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 
wenzelm 
parents: 
28972 
diff
changeset
 | 
41  | 
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future  | 
| 29119 | 42  | 
val fork_pri: int -> (unit -> 'a) -> 'a future  | 
| 32058 | 43  | 
val fork_local: int -> (unit -> 'a) -> 'a future  | 
| 28972 | 44  | 
val join_results: 'a future list -> 'a Exn.result list  | 
45  | 
val join_result: 'a future -> 'a Exn.result  | 
|
46  | 
val join: 'a future -> 'a  | 
|
47  | 
  val map: ('a -> 'b) -> 'a future -> 'b future
 | 
|
| 
30618
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
48  | 
  val interruptible_task: ('a -> 'b) -> 'a -> 'b
 | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
49  | 
val interrupt_task: string -> unit  | 
| 29431 | 50  | 
val cancel_group: group -> unit  | 
| 28972 | 51  | 
val cancel: 'a future -> unit  | 
| 28203 | 52  | 
val shutdown: unit -> unit  | 
| 28156 | 53  | 
end;  | 
54  | 
||
55  | 
structure Future: FUTURE =  | 
|
56  | 
struct  | 
|
57  | 
||
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
58  | 
(** future values **)  | 
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
59  | 
|
| 28645 | 60  | 
fun enabled () =  | 
| 29118 | 61  | 
Multithreading.enabled () andalso  | 
| 28645 | 62  | 
not (Multithreading.self_critical ());  | 
63  | 
||
64  | 
||
| 28167 | 65  | 
(* identifiers *)  | 
66  | 
||
| 29119 | 67  | 
type task = Task_Queue.task;  | 
68  | 
type group = Task_Queue.group;  | 
|
| 28167 | 69  | 
|
| 32058 | 70  | 
local  | 
71  | 
val tag = Universal.tag () : (string * task * group) option Universal.tag;  | 
|
72  | 
in  | 
|
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
73  | 
fun thread_data () = the_default NONE (Thread.getLocal tag);  | 
| 32058 | 74  | 
fun setmp_thread_data data f x =  | 
75  | 
Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;  | 
|
| 28167 | 76  | 
end;  | 
77  | 
||
| 32058 | 78  | 
val is_worker = is_some o thread_data;  | 
79  | 
||
| 28167 | 80  | 
|
81  | 
(* datatype future *)  | 
|
82  | 
||
| 28972 | 83  | 
datatype 'a future = Future of  | 
| 28167 | 84  | 
 {task: task,
 | 
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
85  | 
group: group,  | 
| 28167 | 86  | 
result: 'a Exn.result option ref};  | 
87  | 
||
88  | 
fun task_of (Future {task, ...}) = task;
 | 
|
89  | 
fun group_of (Future {group, ...}) = group;
 | 
|
90  | 
||
| 28558 | 91  | 
fun peek (Future {result, ...}) = ! result;
 | 
92  | 
fun is_finished x = is_some (peek x);  | 
|
| 28320 | 93  | 
|
| 28997 | 94  | 
fun value x = Future  | 
| 29119 | 95  | 
 {task = Task_Queue.new_task 0,
 | 
96  | 
group = Task_Queue.new_group (),  | 
|
| 28997 | 97  | 
result = ref (SOME (Exn.Result x))};  | 
98  | 
||
| 28167 | 99  | 
|
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
100  | 
|
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
101  | 
(** scheduling **)  | 
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
102  | 
|
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
103  | 
(* global state *)  | 
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
104  | 
|
| 29119 | 105  | 
val queue = ref Task_Queue.empty;  | 
| 28468 | 106  | 
val next = ref 0;  | 
| 28192 | 107  | 
val workers = ref ([]: (Thread.thread * bool) list);  | 
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
108  | 
val scheduler = ref (NONE: Thread.thread option);  | 
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
109  | 
val excessive = ref 0;  | 
| 29119 | 110  | 
val canceled = ref ([]: Task_Queue.group list);  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
111  | 
val do_shutdown = ref false;  | 
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
112  | 
|
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
113  | 
|
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
114  | 
(* synchronization *)  | 
| 28156 | 115  | 
|
116  | 
local  | 
|
117  | 
val lock = Mutex.mutex ();  | 
|
118  | 
val cond = ConditionVar.conditionVar ();  | 
|
119  | 
in  | 
|
120  | 
||
| 28575 | 121  | 
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;  | 
| 28156 | 122  | 
|
| 29119 | 123  | 
fun wait () = (*requires SYNCHRONIZED*)  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
124  | 
ConditionVar.wait (cond, lock);  | 
| 
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
125  | 
|
| 29119 | 126  | 
fun wait_timeout timeout = (*requires SYNCHRONIZED*)  | 
| 
29341
 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 
wenzelm 
parents: 
29119 
diff
changeset
 | 
127  | 
ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
128  | 
|
| 
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
129  | 
fun notify_all () = (*requires SYNCHRONIZED*)  | 
| 
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
130  | 
ConditionVar.broadcast cond;  | 
| 28156 | 131  | 
|
132  | 
end;  | 
|
133  | 
||
134  | 
||
| 28382 | 135  | 
(* worker activity *)  | 
136  | 
||
| 32095 | 137  | 
fun count_active ws =  | 
138  | 
fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;  | 
|
139  | 
||
140  | 
fun trace_active () = Multithreading.tracing 1 (fn () =>  | 
|
| 28382 | 141  | 
let  | 
142  | 
val ws = ! workers;  | 
|
143  | 
val m = string_of_int (length ws);  | 
|
| 32095 | 144  | 
val n = string_of_int (count_active ws);  | 
145  | 
in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end);  | 
|
| 28382 | 146  | 
|
147  | 
fun change_active active = (*requires SYNCHRONIZED*)  | 
|
148  | 
change workers (AList.update Thread.equal (Thread.self (), active));  | 
|
149  | 
||
| 32095 | 150  | 
fun overloaded () =  | 
151  | 
count_active (! workers) > Multithreading.max_threads_value ();  | 
|
152  | 
||
| 28382 | 153  | 
|
| 29366 | 154  | 
(* execute jobs *)  | 
| 28156 | 155  | 
|
| 
29341
 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 
wenzelm 
parents: 
29119 
diff
changeset
 | 
156  | 
fun do_cancel group = (*requires SYNCHRONIZED*)  | 
| 
 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 
wenzelm 
parents: 
29119 
diff
changeset
 | 
157  | 
change canceled (insert Task_Queue.eq_group group);  | 
| 
 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 
wenzelm 
parents: 
29119 
diff
changeset
 | 
158  | 
|
| 29366 | 159  | 
fun execute name (task, group, jobs) =  | 
| 28167 | 160  | 
let  | 
| 28382 | 161  | 
val _ = trace_active ();  | 
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
162  | 
val valid = Task_Queue.is_valid group;  | 
| 32058 | 163  | 
val ok = setmp_thread_data (name, task, group) (fn () =>  | 
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
164  | 
fold (fn job => fn ok => job valid andalso ok) jobs true) ();  | 
| 28192 | 165  | 
val _ = SYNCHRONIZED "execute" (fn () =>  | 
| 29119 | 166  | 
(change queue (Task_Queue.finish task);  | 
| 28186 | 167  | 
if ok then ()  | 
| 29119 | 168  | 
else if Task_Queue.cancel (! queue) group then ()  | 
| 
29341
 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 
wenzelm 
parents: 
29119 
diff
changeset
 | 
169  | 
else do_cancel group;  | 
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
170  | 
notify_all ()));  | 
| 28167 | 171  | 
in () end;  | 
172  | 
||
173  | 
||
174  | 
(* worker threads *)  | 
|
175  | 
||
| 29119 | 176  | 
fun worker_wait () = (*requires SYNCHRONIZED*)  | 
177  | 
(change_active false; wait (); change_active true);  | 
|
| 28162 | 178  | 
|
| 29119 | 179  | 
fun worker_next () = (*requires SYNCHRONIZED*)  | 
| 28167 | 180  | 
if ! excessive > 0 then  | 
181  | 
(dec excessive;  | 
|
| 28192 | 182  | 
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));  | 
| 28203 | 183  | 
notify_all ();  | 
| 28167 | 184  | 
NONE)  | 
| 32095 | 185  | 
else if overloaded () then (worker_wait (); worker_next ())  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
186  | 
else  | 
| 29119 | 187  | 
(case change_result queue Task_Queue.dequeue of  | 
188  | 
NONE => (worker_wait (); worker_next ())  | 
|
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
189  | 
| some => some);  | 
| 28156 | 190  | 
|
| 28167 | 191  | 
fun worker_loop name =  | 
| 29119 | 192  | 
(case SYNCHRONIZED name worker_next of  | 
193  | 
NONE => ()  | 
|
| 28167 | 194  | 
| SOME work => (execute name work; worker_loop name));  | 
| 28156 | 195  | 
|
| 28167 | 196  | 
fun worker_start name = (*requires SYNCHRONIZED*)  | 
| 28242 | 197  | 
change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));  | 
| 28156 | 198  | 
|
199  | 
||
200  | 
(* scheduler *)  | 
|
201  | 
||
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
202  | 
fun scheduler_next () = (*requires SYNCHRONIZED*)  | 
| 28156 | 203  | 
let  | 
| 32053 | 204  | 
(*queue status*)  | 
205  | 
val _ = Multithreading.tracing 1 (fn () =>  | 
|
206  | 
      let val {ready, pending, running} = Task_Queue.status (! queue) in
 | 
|
207  | 
"SCHEDULE: " ^  | 
|
208  | 
string_of_int ready ^ " ready, " ^  | 
|
209  | 
string_of_int pending ^ " pending, " ^  | 
|
210  | 
string_of_int running ^ " running"  | 
|
211  | 
end);  | 
|
212  | 
||
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
213  | 
(*worker threads*)  | 
| 32095 | 214  | 
val ws = ! workers;  | 
| 28191 | 215  | 
val _ =  | 
| 32095 | 216  | 
if forall (Thread.isActive o #1) ws then ()  | 
217  | 
else  | 
|
218  | 
(case List.partition (Thread.isActive o #1) ws of  | 
|
219  | 
(_, []) => ()  | 
|
220  | 
| (active, inactive) =>  | 
|
221  | 
(workers := active; Multithreading.tracing 0 (fn () =>  | 
|
222  | 
"SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));  | 
|
| 28382 | 223  | 
val _ = trace_active ();  | 
| 28191 | 224  | 
|
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
225  | 
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();  | 
| 32095 | 226  | 
val mm = (m * 3) div 2;  | 
227  | 
val l = length ws;  | 
|
228  | 
val _ = excessive := l - mm;  | 
|
| 28203 | 229  | 
val _ =  | 
| 32095 | 230  | 
if mm > l then  | 
231  | 
        funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
 | 
|
| 28203 | 232  | 
else ();  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
233  | 
|
| 
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
234  | 
(*canceled groups*)  | 
| 32095 | 235  | 
val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
236  | 
|
| 
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
237  | 
(*shutdown*)  | 
| 32095 | 238  | 
val continue = not (! do_shutdown andalso null ws);  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
239  | 
val _ = if continue then () else scheduler := NONE;  | 
| 28167 | 240  | 
|
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
241  | 
val _ = notify_all ();  | 
| 
30666
 
d6248d4508d5
future scheduler: reduced wait timeout if tasks need to be canceled -- to improve reactivity of interrupts;
 
wenzelm 
parents: 
30618 
diff
changeset
 | 
242  | 
val _ = interruptible (fn () =>  | 
| 
 
d6248d4508d5
future scheduler: reduced wait timeout if tasks need to be canceled -- to improve reactivity of interrupts;
 
wenzelm 
parents: 
30618 
diff
changeset
 | 
243  | 
wait_timeout (Time.fromMilliseconds (if null (! canceled) then 1000 else 50))) ()  | 
| 
29341
 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 
wenzelm 
parents: 
29119 
diff
changeset
 | 
244  | 
handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
245  | 
in continue end;  | 
| 
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
246  | 
|
| 
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
247  | 
fun scheduler_loop () =  | 
| 29119 | 248  | 
while SYNCHRONIZED "scheduler" scheduler_next do ();  | 
| 28156 | 249  | 
|
| 28203 | 250  | 
fun scheduler_active () = (*requires SYNCHRONIZED*)  | 
251  | 
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);  | 
|
252  | 
||
| 28464 | 253  | 
fun scheduler_check name = SYNCHRONIZED name (fn () =>  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
254  | 
if not (scheduler_active ()) then  | 
| 29119 | 255  | 
(do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
256  | 
else if ! do_shutdown then error "Scheduler shutdown in progress"  | 
| 
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
257  | 
else ());  | 
| 28156 | 258  | 
|
259  | 
||
| 29366 | 260  | 
|
261  | 
(** futures **)  | 
|
| 28156 | 262  | 
|
| 29366 | 263  | 
(* future job: fill result *)  | 
264  | 
||
265  | 
fun future_job group (e: unit -> 'a) =  | 
|
| 28156 | 266  | 
let  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
267  | 
val result = ref (NONE: 'a Exn.result option);  | 
| 
30612
 
cb6421b6a18f
future_job: do not inherit attributes, but enforce restricted interrupts -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
29551 
diff
changeset
 | 
268  | 
val job = Multithreading.with_attributes Multithreading.restricted_interrupts  | 
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
269  | 
(fn _ => fn ok =>  | 
| 28532 | 270  | 
let  | 
271  | 
val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt;  | 
|
| 
28548
 
003f52c2bb8f
future result: Interrupt invalidates group, but pretends success otherwise;
 
wenzelm 
parents: 
28534 
diff
changeset
 | 
272  | 
val _ = result := SOME res;  | 
| 28532 | 273  | 
val res_ok =  | 
274  | 
(case res of  | 
|
275  | 
Exn.Result _ => true  | 
|
| 29119 | 276  | 
| Exn.Exn Exn.Interrupt => (Task_Queue.invalidate_group group; true)  | 
| 28532 | 277  | 
| _ => false);  | 
| 
28548
 
003f52c2bb8f
future result: Interrupt invalidates group, but pretends success otherwise;
 
wenzelm 
parents: 
28534 
diff
changeset
 | 
278  | 
in res_ok end);  | 
| 29366 | 279  | 
in (result, job) end;  | 
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
280  | 
|
| 29366 | 281  | 
|
282  | 
(* fork *)  | 
|
283  | 
||
284  | 
fun fork_future opt_group deps pri e =  | 
|
285  | 
let  | 
|
286  | 
val _ = scheduler_check "future check";  | 
|
287  | 
||
288  | 
val group = (case opt_group of SOME group => group | NONE => Task_Queue.new_group ());  | 
|
289  | 
val (result, job) = future_job group e;  | 
|
| 28192 | 290  | 
val task = SYNCHRONIZED "future" (fn () =>  | 
| 29366 | 291  | 
change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
292  | 
  in Future {task = task, group = group, result = result} end;
 | 
| 28162 | 293  | 
|
| 29366 | 294  | 
fun fork e = fork_future NONE [] 0 e;  | 
295  | 
fun fork_group group e = fork_future (SOME group) [] 0 e;  | 
|
296  | 
fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;  | 
|
297  | 
fun fork_pri pri e = fork_future NONE [] pri e;  | 
|
| 32058 | 298  | 
fun fork_local pri e = fork_future (Option.map #3 (thread_data ())) [] pri e;  | 
| 28186 | 299  | 
|
300  | 
||
| 29366 | 301  | 
(* join *)  | 
302  | 
||
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
303  | 
local  | 
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
304  | 
|
| 29366 | 305  | 
fun get_result x = the_default (Exn.Exn (SYS_ERROR "unfinished future")) (peek x);  | 
| 28186 | 306  | 
|
| 32095 | 307  | 
fun join_next deps = (*requires SYNCHRONIZED*)  | 
308  | 
if overloaded () then (worker_wait (); join_next deps)  | 
|
309  | 
else change_result queue (Task_Queue.dequeue_towards deps);  | 
|
310  | 
||
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
311  | 
fun join_deps deps =  | 
| 32095 | 312  | 
(case SYNCHRONIZED "join" (fn () => join_next deps) of  | 
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
313  | 
NONE => ()  | 
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
314  | 
| SOME (work, deps') => (execute "join" work; join_deps deps'));  | 
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
315  | 
|
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
316  | 
in  | 
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
317  | 
|
| 29366 | 318  | 
fun join_results xs =  | 
319  | 
if forall is_finished xs then map get_result xs  | 
|
320  | 
else uninterruptible (fn _ => fn () =>  | 
|
321  | 
let  | 
|
322  | 
val _ = scheduler_check "join check";  | 
|
323  | 
val _ = Multithreading.self_critical () andalso  | 
|
324  | 
error "Cannot join future values within critical section";  | 
|
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
325  | 
|
| 32058 | 326  | 
val worker = is_worker ();  | 
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
327  | 
fun join_wait x =  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
328  | 
if SYNCHRONIZED "join_wait" (fn () =>  | 
| 32058 | 329  | 
is_finished x orelse (if worker then worker_wait () else wait (); false))  | 
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
330  | 
then () else join_wait x;  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
331  | 
|
| 32058 | 332  | 
val _ = if worker then join_deps (map task_of xs) else ();  | 
| 
32055
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
333  | 
val _ = List.app join_wait xs;  | 
| 
 
6a46898aa805
recovered a version of dequeue_towards (cf. bb7b5a5942c7);
 
wenzelm 
parents: 
32053 
diff
changeset
 | 
334  | 
|
| 29366 | 335  | 
in map get_result xs end) ();  | 
| 28186 | 336  | 
|
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
337  | 
end;  | 
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
338  | 
|
| 
28647
 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 
wenzelm 
parents: 
28645 
diff
changeset
 | 
339  | 
fun join_result x = singleton join_results x;  | 
| 
 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 
wenzelm 
parents: 
28645 
diff
changeset
 | 
340  | 
fun join x = Exn.release (join_result x);  | 
| 28156 | 341  | 
|
| 29366 | 342  | 
|
343  | 
(* map *)  | 
|
344  | 
||
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
345  | 
fun map_future f x =  | 
| 29366 | 346  | 
let  | 
347  | 
val _ = scheduler_check "map_future check";  | 
|
348  | 
||
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
349  | 
val task = task_of x;  | 
| 
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
350  | 
val group = Task_Queue.new_group ();  | 
| 
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
351  | 
val (result, job) = future_job group (fn () => f (join x));  | 
| 
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
352  | 
|
| 29366 | 353  | 
val extended = SYNCHRONIZED "map_future" (fn () =>  | 
354  | 
(case Task_Queue.extend task job (! queue) of  | 
|
355  | 
SOME queue' => (queue := queue'; true)  | 
|
356  | 
| NONE => false));  | 
|
357  | 
in  | 
|
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
358  | 
    if extended then Future {task = task, group = group, result = result}
 | 
| 
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
359  | 
else fork_future NONE [task] (Task_Queue.pri_of_task task) (fn () => f (join x))  | 
| 29366 | 360  | 
end;  | 
| 
28979
 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 
wenzelm 
parents: 
28972 
diff
changeset
 | 
361  | 
|
| 28191 | 362  | 
|
| 29431 | 363  | 
(* cancellation *)  | 
| 
28202
 
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
 
wenzelm 
parents: 
28201 
diff
changeset
 | 
364  | 
|
| 
30618
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
365  | 
fun interruptible_task f x =  | 
| 
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
366  | 
if Multithreading.available then  | 
| 
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
367  | 
Multithreading.with_attributes  | 
| 32058 | 368  | 
(if is_worker ()  | 
| 
30618
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
369  | 
then Multithreading.restricted_interrupts  | 
| 
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
370  | 
else Multithreading.regular_interrupts)  | 
| 
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
371  | 
(fn _ => f) x  | 
| 
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
372  | 
else interruptible f x;  | 
| 
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
373  | 
|
| 
28202
 
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
 
wenzelm 
parents: 
28201 
diff
changeset
 | 
374  | 
(*interrupt: permissive signal, may get ignored*)  | 
| 28197 | 375  | 
fun interrupt_task id = SYNCHRONIZED "interrupt"  | 
| 29119 | 376  | 
(fn () => Task_Queue.interrupt_external (! queue) id);  | 
| 28191 | 377  | 
|
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
378  | 
(*cancel: present and future group members will be interrupted eventually*)  | 
| 29431 | 379  | 
fun cancel_group group =  | 
| 28464 | 380  | 
(scheduler_check "cancel check";  | 
| 29431 | 381  | 
SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ())));  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
382  | 
|
| 29431 | 383  | 
fun cancel x = cancel_group (group_of x);  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
384  | 
|
| 29366 | 385  | 
|
386  | 
(** global join and shutdown **)  | 
|
387  | 
||
| 28203 | 388  | 
fun shutdown () =  | 
| 28276 | 389  | 
if Multithreading.available then  | 
| 28464 | 390  | 
(scheduler_check "shutdown check";  | 
| 28276 | 391  | 
SYNCHRONIZED "shutdown" (fn () =>  | 
| 29119 | 392  | 
(while not (scheduler_active ()) do wait ();  | 
393  | 
while not (Task_Queue.is_empty (! queue)) do wait ();  | 
|
| 28276 | 394  | 
do_shutdown := true;  | 
395  | 
notify_all ();  | 
|
| 29119 | 396  | 
while not (null (! workers)) do wait ();  | 
397  | 
while scheduler_active () do wait ();  | 
|
| 28470 | 398  | 
OS.Process.sleep (Time.fromMilliseconds 300))))  | 
| 28276 | 399  | 
else ();  | 
| 28203 | 400  | 
|
| 29366 | 401  | 
|
402  | 
(*final declarations of this structure!*)  | 
|
403  | 
val map = map_future;  | 
|
404  | 
||
| 28156 | 405  | 
end;  | 
| 28972 | 406  | 
|
407  | 
type 'a future = 'a Future.future;  | 
|
408  |