| author | wenzelm | 
| Wed, 10 Aug 2011 16:05:14 +0200 | |
| changeset 44113 | 0baa8bbd355a | 
| parent 44111 | 2d16c693d536 | 
| child 44115 | 5d9821493bc1 | 
| permissions | -rw-r--r-- | 
| 28156 | 1  | 
(* Title: Pure/Concurrent/future.ML  | 
2  | 
Author: Makarius  | 
|
3  | 
||
| 32246 | 4  | 
Future values, see also  | 
5  | 
http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf  | 
|
| 37904 | 6  | 
http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf  | 
| 28201 | 7  | 
|
8  | 
Notes:  | 
|
9  | 
||
10  | 
* Futures are similar to delayed evaluation, i.e. delay/force is  | 
|
11  | 
generalized to fork/join (and variants). The idea is to model  | 
|
12  | 
parallel value-oriented computations, but *not* communicating  | 
|
13  | 
processes.  | 
|
14  | 
||
15  | 
* Futures are grouped; failure of one group member causes the whole  | 
|
| 32220 | 16  | 
group to be interrupted eventually. Groups are block-structured.  | 
| 28201 | 17  | 
|
18  | 
* Forked futures are evaluated spontaneously by a farm of worker  | 
|
19  | 
threads in the background; join resynchronizes the computation and  | 
|
20  | 
delivers results (values or exceptions).  | 
|
21  | 
||
22  | 
* The pool of worker threads is limited, usually in correlation with  | 
|
23  | 
the number of physical cores on the machine. Note that allocation  | 
|
24  | 
of runtime resources is distorted either if workers yield CPU time  | 
|
25  | 
(e.g. via system sleep or wait operations), or if non-worker  | 
|
26  | 
threads contend for significant runtime resources independently.  | 
|
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
27  | 
|
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
28  | 
* Promised futures are fulfilled by external means. There is no  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
29  | 
associated evaluation task, but other futures can depend on them  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
30  | 
as usual.  | 
| 28156 | 31  | 
*)  | 
32  | 
||
33  | 
signature FUTURE =  | 
|
34  | 
sig  | 
|
| 
32814
 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 
wenzelm 
parents: 
32738 
diff
changeset
 | 
35  | 
val worker_task: unit -> Task_Queue.task option  | 
| 32102 | 36  | 
val worker_group: unit -> Task_Queue.group option  | 
| 
37865
 
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
 
wenzelm 
parents: 
37854 
diff
changeset
 | 
37  | 
val worker_subgroup: unit -> Task_Queue.group  | 
| 28972 | 38  | 
type 'a future  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
39  | 
val task_of: 'a future -> Task_Queue.task  | 
| 28972 | 40  | 
val peek: 'a future -> 'a Exn.result option  | 
41  | 
val is_finished: 'a future -> bool  | 
|
| 44110 | 42  | 
val cancel_group: Task_Queue.group -> unit  | 
43  | 
val cancel: 'a future -> unit  | 
|
| 44113 | 44  | 
type fork_params =  | 
45  | 
   {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
 | 
|
46  | 
pri: int, interrupts: bool}  | 
|
47  | 
val forks: fork_params -> (unit -> 'a) list -> 'a future list  | 
|
| 29119 | 48  | 
val fork_pri: int -> (unit -> 'a) -> 'a future  | 
| 32724 | 49  | 
val fork: (unit -> 'a) -> 'a future  | 
| 28972 | 50  | 
val join_results: 'a future list -> 'a Exn.result list  | 
51  | 
val join_result: 'a future -> 'a Exn.result  | 
|
52  | 
val join: 'a future -> 'a  | 
|
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
53  | 
val value: 'a -> 'a future  | 
| 28972 | 54  | 
  val map: ('a -> 'b) -> 'a future -> 'b future
 | 
| 44113 | 55  | 
val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
56  | 
val promise_group: Task_Queue.group -> 'a future  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
57  | 
val promise: unit -> 'a future  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
58  | 
val fulfill_result: 'a future -> 'a Exn.result -> unit  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
59  | 
val fulfill: 'a future -> 'a -> unit  | 
| 
30618
 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 
wenzelm 
parents: 
30612 
diff
changeset
 | 
60  | 
  val interruptible_task: ('a -> 'b) -> 'a -> 'b
 | 
| 28203 | 61  | 
val shutdown: unit -> unit  | 
| 
38236
 
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
 
wenzelm 
parents: 
37904 
diff
changeset
 | 
62  | 
val status: (unit -> 'a) -> 'a  | 
| 28156 | 63  | 
end;  | 
64  | 
||
65  | 
structure Future: FUTURE =  | 
|
66  | 
struct  | 
|
67  | 
||
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
68  | 
(** future values **)  | 
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
69  | 
|
| 28167 | 70  | 
(* identifiers *)  | 
71  | 
||
| 32058 | 72  | 
local  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
73  | 
val tag = Universal.tag () : Task_Queue.task option Universal.tag;  | 
| 32058 | 74  | 
in  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
75  | 
fun worker_task () = the_default NONE (Thread.getLocal tag);  | 
| 44110 | 76  | 
fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;  | 
| 28167 | 77  | 
end;  | 
78  | 
||
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
79  | 
val worker_group = Option.map Task_Queue.group_of_task o worker_task;  | 
| 
37865
 
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
 
wenzelm 
parents: 
37854 
diff
changeset
 | 
80  | 
fun worker_subgroup () = Task_Queue.new_group (worker_group ());  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
81  | 
|
| 41679 | 82  | 
fun worker_joining e =  | 
83  | 
(case worker_task () of  | 
|
84  | 
NONE => e ()  | 
|
85  | 
| SOME task => Task_Queue.joining task e);  | 
|
86  | 
||
| 
41680
 
a4c822915eaa
more informative task timing: some dependency tracking;
 
wenzelm 
parents: 
41679 
diff
changeset
 | 
87  | 
fun worker_waiting deps e =  | 
| 41670 | 88  | 
(case worker_task () of  | 
89  | 
NONE => e ()  | 
|
| 
41680
 
a4c822915eaa
more informative task timing: some dependency tracking;
 
wenzelm 
parents: 
41679 
diff
changeset
 | 
90  | 
| SOME task => Task_Queue.waiting task deps e);  | 
| 41670 | 91  | 
|
| 28167 | 92  | 
|
93  | 
(* datatype future *)  | 
|
94  | 
||
| 35016 | 95  | 
type 'a result = 'a Exn.result Single_Assignment.var;  | 
96  | 
||
| 28972 | 97  | 
datatype 'a future = Future of  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
98  | 
 {promised: bool,
 | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
99  | 
task: Task_Queue.task,  | 
| 35016 | 100  | 
result: 'a result};  | 
| 28167 | 101  | 
|
102  | 
fun task_of (Future {task, ...}) = task;
 | 
|
| 32253 | 103  | 
fun result_of (Future {result, ...}) = result;
 | 
| 28167 | 104  | 
|
| 35016 | 105  | 
fun peek x = Single_Assignment.peek (result_of x);  | 
| 28558 | 106  | 
fun is_finished x = is_some (peek x);  | 
| 28320 | 107  | 
|
| 28167 | 108  | 
|
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
109  | 
|
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
110  | 
(** scheduling **)  | 
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
111  | 
|
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
112  | 
(* synchronization *)  | 
| 28156 | 113  | 
|
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
114  | 
val scheduler_event = ConditionVar.conditionVar ();  | 
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
115  | 
val work_available = ConditionVar.conditionVar ();  | 
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
116  | 
val work_finished = ConditionVar.conditionVar ();  | 
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
117  | 
|
| 28156 | 118  | 
local  | 
119  | 
val lock = Mutex.mutex ();  | 
|
120  | 
in  | 
|
121  | 
||
| 
37216
 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 
wenzelm 
parents: 
37182 
diff
changeset
 | 
122  | 
fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;  | 
| 28156 | 123  | 
|
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
124  | 
fun wait cond = (*requires SYNCHRONIZED*)  | 
| 
32295
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
125  | 
Multithreading.sync_wait NONE NONE cond lock;  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
126  | 
|
| 
32295
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
127  | 
fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)  | 
| 
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
128  | 
Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
129  | 
|
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
130  | 
fun signal cond = (*requires SYNCHRONIZED*)  | 
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
131  | 
ConditionVar.signal cond;  | 
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
132  | 
|
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
133  | 
fun broadcast cond = (*requires SYNCHRONIZED*)  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
134  | 
ConditionVar.broadcast cond;  | 
| 28156 | 135  | 
|
| 
32248
 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 
wenzelm 
parents: 
32247 
diff
changeset
 | 
136  | 
fun broadcast_work () = (*requires SYNCHRONIZED*)  | 
| 
 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 
wenzelm 
parents: 
32247 
diff
changeset
 | 
137  | 
(ConditionVar.broadcast work_available;  | 
| 
32225
 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 
wenzelm 
parents: 
32224 
diff
changeset
 | 
138  | 
ConditionVar.broadcast work_finished);  | 
| 
 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 
wenzelm 
parents: 
32224 
diff
changeset
 | 
139  | 
|
| 28156 | 140  | 
end;  | 
141  | 
||
142  | 
||
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
143  | 
(* global state *)  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
144  | 
|
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
145  | 
val queue = Unsynchronized.ref Task_Queue.empty;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
146  | 
val next = Unsynchronized.ref 0;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
147  | 
val scheduler = Unsynchronized.ref (NONE: Thread.thread option);  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
148  | 
val canceled = Unsynchronized.ref ([]: Task_Queue.group list);  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
149  | 
val do_shutdown = Unsynchronized.ref false;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
150  | 
val max_workers = Unsynchronized.ref 0;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
151  | 
val max_active = Unsynchronized.ref 0;  | 
| 
33411
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
152  | 
val worker_trend = Unsynchronized.ref 0;  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
153  | 
|
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
154  | 
datatype worker_state = Working | Waiting | Sleeping;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
155  | 
val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
156  | 
|
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
157  | 
fun count_workers state = (*requires SYNCHRONIZED*)  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
158  | 
fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
159  | 
|
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
160  | 
|
| 44110 | 161  | 
(* cancellation primitives *)  | 
| 32099 | 162  | 
|
| 44110 | 163  | 
fun interruptible_task f x =  | 
164  | 
if Multithreading.available then  | 
|
165  | 
Multithreading.with_attributes  | 
|
166  | 
(if is_some (worker_task ())  | 
|
167  | 
then Multithreading.private_interrupts  | 
|
168  | 
else Multithreading.public_interrupts)  | 
|
169  | 
(fn _ => f x)  | 
|
170  | 
else interruptible f x;  | 
|
| 28156 | 171  | 
|
| 
34279
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
172  | 
fun cancel_now group = (*requires SYNCHRONIZED*)  | 
| 
34280
 
16bf3e9786a3
eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
 
wenzelm 
parents: 
34279 
diff
changeset
 | 
173  | 
Task_Queue.cancel (! queue) group;  | 
| 
34279
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
174  | 
|
| 
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
175  | 
fun cancel_later group = (*requires SYNCHRONIZED*)  | 
| 32738 | 176  | 
(Unsynchronized.change canceled (insert Task_Queue.eq_group group);  | 
177  | 
broadcast scheduler_event);  | 
|
| 
29341
 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 
wenzelm 
parents: 
29119 
diff
changeset
 | 
178  | 
|
| 44110 | 179  | 
|
180  | 
(* worker threads *)  | 
|
181  | 
||
182  | 
fun worker_exec (task, jobs) =  | 
|
| 28167 | 183  | 
let  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
184  | 
val group = Task_Queue.group_of_task task;  | 
| 32102 | 185  | 
val valid = not (Task_Queue.is_canceled group);  | 
| 41670 | 186  | 
val ok =  | 
187  | 
Task_Queue.running task (fn () =>  | 
|
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
188  | 
setmp_worker_task task (fn () =>  | 
| 41670 | 189  | 
fold (fn job => fn ok => job valid andalso ok) jobs true) ());  | 
| 41776 | 190  | 
val _ = Multithreading.tracing 2 (fn () =>  | 
| 41670 | 191  | 
let  | 
| 43951 | 192  | 
val s = Task_Queue.str_of_task_groups task;  | 
| 41670 | 193  | 
fun micros time = string_of_int (Time.toNanoseconds time div 1000);  | 
| 
41680
 
a4c822915eaa
more informative task timing: some dependency tracking;
 
wenzelm 
parents: 
41679 
diff
changeset
 | 
194  | 
val (run, wait, deps) = Task_Queue.timing_of_task task;  | 
| 
 
a4c822915eaa
more informative task timing: some dependency tracking;
 
wenzelm 
parents: 
41679 
diff
changeset
 | 
195  | 
      in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
 | 
| 32246 | 196  | 
val _ = SYNCHRONIZED "finish" (fn () =>  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
197  | 
let  | 
| 32738 | 198  | 
val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);  | 
| 
44111
 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 
wenzelm 
parents: 
44110 
diff
changeset
 | 
199  | 
val _ = Exn.capture Multithreading.interrupted ();  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
200  | 
val _ =  | 
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
201  | 
if ok then ()  | 
| 
34279
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
202  | 
else if cancel_now group then ()  | 
| 
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
203  | 
else cancel_later group;  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
204  | 
val _ = broadcast work_finished;  | 
| 
33413
 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 
wenzelm 
parents: 
33411 
diff
changeset
 | 
205  | 
val _ = if maximal then () else signal work_available;  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
206  | 
in () end);  | 
| 28167 | 207  | 
in () end;  | 
208  | 
||
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
209  | 
fun worker_wait active cond = (*requires SYNCHRONIZED*)  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
210  | 
let  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
211  | 
val state =  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
212  | 
(case AList.lookup Thread.equal (! workers) (Thread.self ()) of  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
213  | 
SOME state => state  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
214  | 
| NONE => raise Fail "Unregistered worker thread");  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
215  | 
val _ = state := (if active then Waiting else Sleeping);  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
216  | 
val _ = wait cond;  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
217  | 
val _ = state := Working;  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
218  | 
in () end;  | 
| 28162 | 219  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
220  | 
fun worker_next () = (*requires SYNCHRONIZED*)  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
221  | 
if length (! workers) > ! max_workers then  | 
| 
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
222  | 
(Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
223  | 
signal work_available;  | 
| 28167 | 224  | 
NONE)  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
225  | 
else if count_workers Working > ! max_active then  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
226  | 
(worker_wait false work_available; worker_next ())  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
227  | 
else  | 
| 32738 | 228  | 
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
229  | 
NONE => (worker_wait false work_available; worker_next ())  | 
| 
33413
 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 
wenzelm 
parents: 
33411 
diff
changeset
 | 
230  | 
| some => (signal work_available; some));  | 
| 28156 | 231  | 
|
| 28167 | 232  | 
fun worker_loop name =  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
233  | 
(case SYNCHRONIZED name (fn () => worker_next ()) of  | 
| 29119 | 234  | 
NONE => ()  | 
| 44110 | 235  | 
| SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name));  | 
| 28156 | 236  | 
|
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
237  | 
fun worker_start name = (*requires SYNCHRONIZED*)  | 
| 
37216
 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 
wenzelm 
parents: 
37182 
diff
changeset
 | 
238  | 
Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
239  | 
Unsynchronized.ref Working));  | 
| 28156 | 240  | 
|
241  | 
||
242  | 
(* scheduler *)  | 
|
243  | 
||
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
244  | 
val status_ticks = Unsynchronized.ref 0;  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
245  | 
|
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
246  | 
val last_round = Unsynchronized.ref Time.zeroTime;  | 
| 40301 | 247  | 
val next_round = seconds 0.05;  | 
| 32226 | 248  | 
|
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
249  | 
fun scheduler_next () = (*requires SYNCHRONIZED*)  | 
| 28156 | 250  | 
let  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
251  | 
val now = Time.now ();  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
252  | 
val tick = Time.<= (Time.+ (! last_round, next_round), now);  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
253  | 
val _ = if tick then last_round := now else ();  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
254  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
255  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
256  | 
(* queue and worker status *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
257  | 
|
| 32226 | 258  | 
val _ =  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
259  | 
if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
260  | 
val _ =  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
261  | 
if tick andalso ! status_ticks = 0 then  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
262  | 
Multithreading.tracing 1 (fn () =>  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
263  | 
let  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
264  | 
            val {ready, pending, running, passive} = Task_Queue.status (! queue);
 | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
265  | 
val total = length (! workers);  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
266  | 
val active = count_workers Working;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
267  | 
val waiting = count_workers Waiting;  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
268  | 
in  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
269  | 
"SCHEDULE " ^ Time.toString now ^ ": " ^  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
270  | 
string_of_int ready ^ " ready, " ^  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
271  | 
string_of_int pending ^ " pending, " ^  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
272  | 
string_of_int running ^ " running, " ^  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
273  | 
string_of_int passive ^ " passive; " ^  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
274  | 
string_of_int total ^ " workers, " ^  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
275  | 
string_of_int active ^ " active, " ^  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
276  | 
string_of_int waiting ^ " waiting "  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
277  | 
end)  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
278  | 
else ();  | 
| 32053 | 279  | 
|
| 28191 | 280  | 
val _ =  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
281  | 
if forall (Thread.isActive o #1) (! workers) then ()  | 
| 32095 | 282  | 
else  | 
| 33409 | 283  | 
let  | 
| 37682 | 284  | 
val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);  | 
| 33409 | 285  | 
val _ = workers := alive;  | 
286  | 
in  | 
|
287  | 
Multithreading.tracing 0 (fn () =>  | 
|
288  | 
"SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")  | 
|
289  | 
end;  | 
|
| 28191 | 290  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
291  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
292  | 
(* worker pool adjustments *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
293  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
294  | 
val max_active0 = ! max_active;  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
295  | 
val max_workers0 = ! max_workers;  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
296  | 
|
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
297  | 
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
298  | 
val _ = max_active := m;  | 
| 
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
299  | 
|
| 
33411
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
300  | 
val mm =  | 
| 
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
301  | 
if ! do_shutdown then 0  | 
| 
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
302  | 
else if m = 9999 then 1  | 
| 
33413
 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 
wenzelm 
parents: 
33411 
diff
changeset
 | 
303  | 
else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);  | 
| 
33411
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
304  | 
val _ =  | 
| 
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
305  | 
if tick andalso mm > ! max_workers then  | 
| 
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
306  | 
Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)  | 
| 
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
307  | 
else if tick andalso mm < ! max_workers then  | 
| 
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
308  | 
Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)  | 
| 
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
309  | 
else ();  | 
| 
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
310  | 
val _ =  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
311  | 
if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
312  | 
max_workers := mm  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
313  | 
else if ! worker_trend > 5 andalso ! max_workers < 2 * m then  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
314  | 
max_workers := Int.min (mm, 2 * m)  | 
| 
33411
 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 
wenzelm 
parents: 
33410 
diff
changeset
 | 
315  | 
else ();  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
316  | 
|
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
317  | 
val missing = ! max_workers - length (! workers);  | 
| 28203 | 318  | 
val _ =  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
319  | 
if missing > 0 then  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
320  | 
funpow missing (fn () =>  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
321  | 
          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
 | 
| 28203 | 322  | 
else ();  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
323  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
324  | 
val _ =  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
325  | 
if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
326  | 
else signal work_available;  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
327  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
328  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
329  | 
(* canceled groups *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
330  | 
|
| 
32225
 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 
wenzelm 
parents: 
32224 
diff
changeset
 | 
331  | 
val _ =  | 
| 
 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 
wenzelm 
parents: 
32224 
diff
changeset
 | 
332  | 
if null (! canceled) then ()  | 
| 32293 | 333  | 
else  | 
334  | 
(Multithreading.tracing 1 (fn () =>  | 
|
335  | 
string_of_int (length (! canceled)) ^ " canceled groups");  | 
|
| 
34279
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
336  | 
Unsynchronized.change canceled (filter_out cancel_now);  | 
| 32293 | 337  | 
broadcast_work ());  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
338  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
339  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
340  | 
(* delay loop *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
341  | 
|
| 
32295
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
342  | 
val _ = Exn.release (wait_timeout next_round scheduler_event);  | 
| 28167 | 343  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
344  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
345  | 
(* shutdown *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
346  | 
|
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
347  | 
val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
348  | 
val continue = not (! do_shutdown andalso null (! workers));  | 
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
349  | 
val _ = if continue then () else scheduler := NONE;  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
350  | 
|
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
351  | 
val _ = broadcast scheduler_event;  | 
| 
32295
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
352  | 
in continue end  | 
| 
39232
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
353  | 
handle exn =>  | 
| 
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
354  | 
if Exn.is_interrupt exn then  | 
| 
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
355  | 
(Multithreading.tracing 1 (fn () => "Interrupt");  | 
| 
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
356  | 
List.app cancel_later (Task_Queue.cancel_all (! queue));  | 
| 
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
357  | 
broadcast_work (); true)  | 
| 
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
358  | 
else reraise exn;  | 
| 
32295
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
359  | 
|
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
360  | 
fun scheduler_loop () =  | 
| 
33416
 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 
wenzelm 
parents: 
33415 
diff
changeset
 | 
361  | 
while  | 
| 
 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 
wenzelm 
parents: 
33415 
diff
changeset
 | 
362  | 
Multithreading.with_attributes  | 
| 
 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 
wenzelm 
parents: 
33415 
diff
changeset
 | 
363  | 
(Multithreading.sync_interrupts Multithreading.public_interrupts)  | 
| 
 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 
wenzelm 
parents: 
33415 
diff
changeset
 | 
364  | 
(fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))  | 
| 
 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 
wenzelm 
parents: 
33415 
diff
changeset
 | 
365  | 
do ();  | 
| 28156 | 366  | 
|
| 28203 | 367  | 
fun scheduler_active () = (*requires SYNCHRONIZED*)  | 
368  | 
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);  | 
|
369  | 
||
| 
32228
 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 
wenzelm 
parents: 
32227 
diff
changeset
 | 
370  | 
fun scheduler_check () = (*requires SYNCHRONIZED*)  | 
| 
 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 
wenzelm 
parents: 
32227 
diff
changeset
 | 
371  | 
(do_shutdown := false;  | 
| 
32248
 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 
wenzelm 
parents: 
32247 
diff
changeset
 | 
372  | 
if scheduler_active () then ()  | 
| 
37216
 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 
wenzelm 
parents: 
37182 
diff
changeset
 | 
373  | 
else scheduler := SOME (Simple_Thread.fork false scheduler_loop));  | 
| 28156 | 374  | 
|
375  | 
||
| 29366 | 376  | 
|
377  | 
(** futures **)  | 
|
| 28156 | 378  | 
|
| 44110 | 379  | 
(* cancellation *)  | 
380  | 
||
381  | 
(*cancel: present and future group members will be interrupted eventually*)  | 
|
382  | 
fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>  | 
|
383  | 
(if cancel_now group then () else cancel_later group;  | 
|
384  | 
signal work_available; scheduler_check ()));  | 
|
385  | 
||
386  | 
fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));  | 
|
387  | 
||
388  | 
||
389  | 
(* future jobs *)  | 
|
390  | 
||
391  | 
fun assign_result group result res =  | 
|
392  | 
let  | 
|
393  | 
val _ = Single_Assignment.assign result res  | 
|
394  | 
handle exn as Fail _ =>  | 
|
395  | 
(case Single_Assignment.peek result of  | 
|
396  | 
SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)  | 
|
397  | 
| _ => reraise exn);  | 
|
398  | 
val ok =  | 
|
399  | 
(case the (Single_Assignment.peek result) of  | 
|
| 
44111
 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 
wenzelm 
parents: 
44110 
diff
changeset
 | 
400  | 
Exn.Exn exn =>  | 
| 
 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 
wenzelm 
parents: 
44110 
diff
changeset
 | 
401  | 
(SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)  | 
| 44110 | 402  | 
| Exn.Res _ => true);  | 
403  | 
in ok end;  | 
|
404  | 
||
| 44113 | 405  | 
fun future_job group interrupts (e: unit -> 'a) =  | 
| 44110 | 406  | 
let  | 
407  | 
val result = Single_Assignment.var "future" : 'a result;  | 
|
408  | 
val pos = Position.thread_data ();  | 
|
409  | 
fun job ok =  | 
|
410  | 
let  | 
|
411  | 
val res =  | 
|
412  | 
if ok then  | 
|
413  | 
Exn.capture (fn () =>  | 
|
| 44113 | 414  | 
Multithreading.with_attributes  | 
415  | 
(if interrupts  | 
|
416  | 
then Multithreading.private_interrupts else Multithreading.no_interrupts)  | 
|
| 44110 | 417  | 
(fn _ => Position.setmp_thread_data pos e ()) before  | 
418  | 
Multithreading.interrupted ()) ()  | 
|
419  | 
else Exn.interrupt_exn;  | 
|
420  | 
in assign_result group result res end;  | 
|
421  | 
in (result, job) end;  | 
|
422  | 
||
423  | 
||
| 29366 | 424  | 
(* fork *)  | 
425  | 
||
| 44113 | 426  | 
type fork_params =  | 
427  | 
 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
 | 
|
428  | 
pri: int, interrupts: bool};  | 
|
429  | 
||
430  | 
fun forks ({name, group, deps, pri, interrupts}: fork_params) es =
 | 
|
| 41674 | 431  | 
if null es then []  | 
432  | 
else  | 
|
433  | 
let  | 
|
434  | 
val grp =  | 
|
435  | 
(case group of  | 
|
436  | 
NONE => worker_subgroup ()  | 
|
437  | 
| SOME grp => grp);  | 
|
| 41708 | 438  | 
fun enqueue e queue =  | 
| 41674 | 439  | 
let  | 
| 44113 | 440  | 
val (result, job) = future_job grp interrupts e;  | 
| 41708 | 441  | 
val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
442  | 
          val future = Future {promised = false, task = task, result = result};
 | 
| 41708 | 443  | 
in (future, queue') end;  | 
| 41674 | 444  | 
in  | 
445  | 
SYNCHRONIZED "enqueue" (fn () =>  | 
|
446  | 
let  | 
|
| 41708 | 447  | 
val (futures, queue') = fold_map enqueue es (! queue);  | 
448  | 
val _ = queue := queue';  | 
|
449  | 
val minimal = forall (not o Task_Queue.known_task queue') deps;  | 
|
| 41674 | 450  | 
val _ = if minimal then signal work_available else ();  | 
451  | 
val _ = scheduler_check ();  | 
|
452  | 
in futures end)  | 
|
453  | 
end;  | 
|
| 28162 | 454  | 
|
| 44113 | 455  | 
fun fork_pri pri e =  | 
456  | 
  singleton (forks {name = "", group = NONE, deps = [], pri = pri, interrupts = true}) e;
 | 
|
457  | 
||
| 
41672
 
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
 
wenzelm 
parents: 
41670 
diff
changeset
 | 
458  | 
fun fork e = fork_pri 0 e;  | 
| 28186 | 459  | 
|
460  | 
||
| 29366 | 461  | 
(* join *)  | 
462  | 
||
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
463  | 
local  | 
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
464  | 
|
| 32099 | 465  | 
fun get_result x =  | 
466  | 
(case peek x of  | 
|
| 
37852
 
a902f158b4fc
eliminated old-style sys_error/SYS_ERROR in favour of exception Fail -- after careful checking that there is no overlap with existing handling of that;
 
wenzelm 
parents: 
37690 
diff
changeset
 | 
467  | 
NONE => Exn.Exn (Fail "Unfinished future")  | 
| 
39232
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
468  | 
| SOME res =>  | 
| 
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
469  | 
if Exn.is_interrupt_exn res then  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
470  | 
(case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of  | 
| 
39232
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
471  | 
[] => res  | 
| 
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
472  | 
| exns => Exn.Exn (Exn.EXCEPTIONS exns))  | 
| 
 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 
wenzelm 
parents: 
38236 
diff
changeset
 | 
473  | 
else res);  | 
| 28186 | 474  | 
|
| 32095 | 475  | 
fun join_next deps = (*requires SYNCHRONIZED*)  | 
| 
41695
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
476  | 
if null deps then NONE  | 
| 32224 | 477  | 
else  | 
| 
41681
 
b5d7b15166bf
Future.join_results: discontinued post-hoc recording of dynamic dependencies;
 
wenzelm 
parents: 
41680 
diff
changeset
 | 
478  | 
(case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of  | 
| 
41695
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
479  | 
(NONE, []) => NONE  | 
| 
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
480  | 
| (NONE, deps') =>  | 
| 
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
481  | 
(worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')  | 
| 32224 | 482  | 
| (SOME work, deps') => SOME (work, deps'));  | 
| 32095 | 483  | 
|
| 
32814
 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 
wenzelm 
parents: 
32738 
diff
changeset
 | 
484  | 
fun execute_work NONE = ()  | 
| 44110 | 485  | 
| execute_work (SOME (work, deps')) =  | 
486  | 
(worker_joining (fn () => worker_exec work); join_work deps')  | 
|
| 
32814
 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 
wenzelm 
parents: 
32738 
diff
changeset
 | 
487  | 
and join_work deps =  | 
| 
43538
 
de5c79682b56
more robust join_results: join_work needs to be uninterruptible, otherwise the task being dequeued by join_next might be never executed/finished!
 
wenzelm 
parents: 
42128 
diff
changeset
 | 
488  | 
Multithreading.with_attributes Multithreading.no_interrupts  | 
| 
 
de5c79682b56
more robust join_results: join_work needs to be uninterruptible, otherwise the task being dequeued by join_next might be never executed/finished!
 
wenzelm 
parents: 
42128 
diff
changeset
 | 
489  | 
(fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));  | 
| 
32814
 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 
wenzelm 
parents: 
32738 
diff
changeset
 | 
490  | 
|
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
491  | 
in  | 
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
492  | 
|
| 29366 | 493  | 
fun join_results xs =  | 
| 41679 | 494  | 
let  | 
495  | 
val _ =  | 
|
496  | 
if forall is_finished xs then ()  | 
|
497  | 
else if Multithreading.self_critical () then  | 
|
498  | 
error "Cannot join future values within critical section"  | 
|
| 
41695
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
499  | 
else if is_some (worker_task ()) then join_work (map task_of xs)  | 
| 
41681
 
b5d7b15166bf
Future.join_results: discontinued post-hoc recording of dynamic dependencies;
 
wenzelm 
parents: 
41680 
diff
changeset
 | 
500  | 
else List.app (ignore o Single_Assignment.await o result_of) xs;  | 
| 41679 | 501  | 
in map get_result xs end;  | 
| 28186 | 502  | 
|
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
503  | 
end;  | 
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
504  | 
|
| 
28647
 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 
wenzelm 
parents: 
28645 
diff
changeset
 | 
505  | 
fun join_result x = singleton join_results x;  | 
| 
 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 
wenzelm 
parents: 
28645 
diff
changeset
 | 
506  | 
fun join x = Exn.release (join_result x);  | 
| 28156 | 507  | 
|
| 29366 | 508  | 
|
| 44110 | 509  | 
(* fast-path versions -- bypassing task queue *)  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
510  | 
|
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
511  | 
fun value (x: 'a) =  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
512  | 
let  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
513  | 
val task = Task_Queue.dummy_task ();  | 
| 
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
514  | 
val group = Task_Queue.group_of_task task;  | 
| 35016 | 515  | 
val result = Single_Assignment.var "value" : 'a result;  | 
| 
43761
 
e72ba84ae58f
tuned signature -- corresponding to Scala version;
 
wenzelm 
parents: 
43665 
diff
changeset
 | 
516  | 
val _ = assign_result group result (Exn.Res x);  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
517  | 
  in Future {promised = false, task = task, result = result} end;
 | 
| 29366 | 518  | 
|
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
519  | 
fun map_future f x =  | 
| 29366 | 520  | 
let  | 
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
521  | 
val task = task_of x;  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
522  | 
val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));  | 
| 44113 | 523  | 
val (result, job) = future_job group true (fn () => f (join x));  | 
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
524  | 
|
| 32246 | 525  | 
val extended = SYNCHRONIZED "extend" (fn () =>  | 
| 29366 | 526  | 
(case Task_Queue.extend task job (! queue) of  | 
527  | 
SOME queue' => (queue := queue'; true)  | 
|
528  | 
| NONE => false));  | 
|
529  | 
in  | 
|
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
530  | 
    if extended then Future {promised = false, task = task, result = result}
 | 
| 
41672
 
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
 
wenzelm 
parents: 
41670 
diff
changeset
 | 
531  | 
else  | 
| 41673 | 532  | 
singleton  | 
| 44113 | 533  | 
        (forks {name = "Future.map", group = SOME group, deps = [task],
 | 
534  | 
pri = Task_Queue.pri_of_task task, interrupts = true})  | 
|
| 
41672
 
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
 
wenzelm 
parents: 
41670 
diff
changeset
 | 
535  | 
(fn () => f (join x))  | 
| 29366 | 536  | 
end;  | 
| 
28979
 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 
wenzelm 
parents: 
28972 
diff
changeset
 | 
537  | 
|
| 42128 | 538  | 
fun cond_forks args es =  | 
539  | 
if Multithreading.enabled () then forks args es  | 
|
540  | 
else map (fn e => value (e ())) es;  | 
|
541  | 
||
| 28191 | 542  | 
|
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
543  | 
(* promised futures -- fulfilled by external means *)  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
544  | 
|
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
545  | 
fun promise_group group : 'a future =  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
546  | 
let  | 
| 35016 | 547  | 
val result = Single_Assignment.var "promise" : 'a result;  | 
| 
39243
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
548  | 
fun abort () = assign_result group result Exn.interrupt_exn  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
549  | 
handle Fail _ => true  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
550  | 
| exn =>  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
551  | 
if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
552  | 
else reraise exn;  | 
| 
37854
 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 
wenzelm 
parents: 
37852 
diff
changeset
 | 
553  | 
val task = SYNCHRONIZED "enqueue_passive" (fn () =>  | 
| 
 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 
wenzelm 
parents: 
37852 
diff
changeset
 | 
554  | 
Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
555  | 
  in Future {promised = true, task = task, result = result} end;
 | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
556  | 
|
| 
37865
 
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
 
wenzelm 
parents: 
37854 
diff
changeset
 | 
557  | 
fun promise () = promise_group (worker_subgroup ());  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
558  | 
|
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
559  | 
fun fulfill_result (Future {promised, task, result}) res =
 | 
| 
39243
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
560  | 
if not promised then raise Fail "Not a promised future"  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
561  | 
else  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
562  | 
let  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
563  | 
val group = Task_Queue.group_of_task task;  | 
| 
39243
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
564  | 
fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
565  | 
val _ =  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
566  | 
Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
567  | 
let  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
568  | 
val still_passive =  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
569  | 
SYNCHRONIZED "fulfill_result" (fn () =>  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
570  | 
Unsynchronized.change_result queue  | 
| 
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
571  | 
(Task_Queue.dequeue_passive (Thread.self ()) task));  | 
| 44110 | 572  | 
in if still_passive then worker_exec (task, [job]) else () end);  | 
| 
41681
 
b5d7b15166bf
Future.join_results: discontinued post-hoc recording of dynamic dependencies;
 
wenzelm 
parents: 
41680 
diff
changeset
 | 
573  | 
val _ =  | 
| 
41695
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
574  | 
if is_some (Single_Assignment.peek result) then ()  | 
| 
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
575  | 
else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));  | 
| 
39243
 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 
wenzelm 
parents: 
39232 
diff
changeset
 | 
576  | 
in () end;  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
577  | 
|
| 
43761
 
e72ba84ae58f
tuned signature -- corresponding to Scala version;
 
wenzelm 
parents: 
43665 
diff
changeset
 | 
578  | 
fun fulfill x res = fulfill_result x (Exn.Res res);  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
579  | 
|
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
580  | 
|
| 
32228
 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 
wenzelm 
parents: 
32227 
diff
changeset
 | 
581  | 
(* shutdown *)  | 
| 29366 | 582  | 
|
| 28203 | 583  | 
fun shutdown () =  | 
| 28276 | 584  | 
if Multithreading.available then  | 
585  | 
SYNCHRONIZED "shutdown" (fn () =>  | 
|
| 
32228
 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 
wenzelm 
parents: 
32227 
diff
changeset
 | 
586  | 
while scheduler_active () do  | 
| 
34279
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
587  | 
(wait scheduler_event; broadcast_work ()))  | 
| 28276 | 588  | 
else ();  | 
| 28203 | 589  | 
|
| 29366 | 590  | 
|
| 
38236
 
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
 
wenzelm 
parents: 
37904 
diff
changeset
 | 
591  | 
(* status markup *)  | 
| 
37690
 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 
wenzelm 
parents: 
37682 
diff
changeset
 | 
592  | 
|
| 
38236
 
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
 
wenzelm 
parents: 
37904 
diff
changeset
 | 
593  | 
fun status e =  | 
| 
37690
 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 
wenzelm 
parents: 
37682 
diff
changeset
 | 
594  | 
let  | 
| 40448 | 595  | 
val task_props =  | 
596  | 
(case worker_task () of  | 
|
597  | 
NONE => I  | 
|
598  | 
| SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);  | 
|
| 43665 | 599  | 
val _ = Output.status (Markup.markup_only (task_props Markup.forked));  | 
| 
37690
 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 
wenzelm 
parents: 
37682 
diff
changeset
 | 
600  | 
val x = e (); (*sic -- report "joined" only for success*)  | 
| 43665 | 601  | 
val _ = Output.status (Markup.markup_only (task_props Markup.joined));  | 
| 
37690
 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 
wenzelm 
parents: 
37682 
diff
changeset
 | 
602  | 
in x end;  | 
| 
 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 
wenzelm 
parents: 
37682 
diff
changeset
 | 
603  | 
|
| 
 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 
wenzelm 
parents: 
37682 
diff
changeset
 | 
604  | 
|
| 29366 | 605  | 
(*final declarations of this structure!*)  | 
606  | 
val map = map_future;  | 
|
607  | 
||
| 28156 | 608  | 
end;  | 
| 28972 | 609  | 
|
610  | 
type 'a future = 'a Future.future;  | 
|
611  |