author | wenzelm |
Mon, 31 Jan 2011 21:54:49 +0100 | |
changeset 41672 | 2f70b1ddd09f |
parent 41670 | 74010c6af0a4 |
child 41673 | 1c191a39549f |
permissions | -rw-r--r-- |
28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
Author: Makarius |
|
3 |
||
32246 | 4 |
Future values, see also |
5 |
http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf |
|
37904 | 6 |
http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf |
28201 | 7 |
|
8 |
Notes: |
|
9 |
||
10 |
* Futures are similar to delayed evaluation, i.e. delay/force is |
|
11 |
generalized to fork/join (and variants). The idea is to model |
|
12 |
parallel value-oriented computations, but *not* communicating |
|
13 |
processes. |
|
14 |
||
15 |
* Futures are grouped; failure of one group member causes the whole |
|
32220 | 16 |
group to be interrupted eventually. Groups are block-structured. |
28201 | 17 |
|
18 |
* Forked futures are evaluated spontaneously by a farm of worker |
|
19 |
threads in the background; join resynchronizes the computation and |
|
20 |
delivers results (values or exceptions). |
|
21 |
||
22 |
* The pool of worker threads is limited, usually in correlation with |
|
23 |
the number of physical cores on the machine. Note that allocation |
|
24 |
of runtime resources is distorted either if workers yield CPU time |
|
25 |
(e.g. via system sleep or wait operations), or if non-worker |
|
26 |
threads contend for significant runtime resources independently. |
|
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
27 |
|
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
28 |
* Promised futures are fulfilled by external means. There is no |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
29 |
associated evaluation task, but other futures can depend on them |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
30 |
as usual. |
28156 | 31 |
*) |
32 |
||
33 |
signature FUTURE = |
|
34 |
sig |
|
29119 | 35 |
type task = Task_Queue.task |
36 |
type group = Task_Queue.group |
|
32058 | 37 |
val is_worker: unit -> bool |
32814
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
38 |
val worker_task: unit -> Task_Queue.task option |
32102 | 39 |
val worker_group: unit -> Task_Queue.group option |
37865
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
wenzelm
parents:
37854
diff
changeset
|
40 |
val worker_subgroup: unit -> Task_Queue.group |
41670 | 41 |
val worker_waiting: (unit -> 'a) -> 'a |
28972 | 42 |
type 'a future |
43 |
val task_of: 'a future -> task |
|
44 |
val group_of: 'a future -> group |
|
45 |
val peek: 'a future -> 'a Exn.result option |
|
46 |
val is_finished: 'a future -> bool |
|
41672
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
47 |
val bulk: {group: group option, deps: task list, pri: int} -> (unit -> 'a) list -> 'a future list |
29119 | 48 |
val fork_pri: int -> (unit -> 'a) -> 'a future |
32724 | 49 |
val fork: (unit -> 'a) -> 'a future |
28972 | 50 |
val join_results: 'a future list -> 'a Exn.result list |
51 |
val join_result: 'a future -> 'a Exn.result |
|
52 |
val join: 'a future -> 'a |
|
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
53 |
val value: 'a -> 'a future |
28972 | 54 |
val map: ('a -> 'b) -> 'a future -> 'b future |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
55 |
val promise_group: group -> 'a future |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
56 |
val promise: unit -> 'a future |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
57 |
val fulfill_result: 'a future -> 'a Exn.result -> unit |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
58 |
val fulfill: 'a future -> 'a -> unit |
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
59 |
val interruptible_task: ('a -> 'b) -> 'a -> 'b |
29431 | 60 |
val cancel_group: group -> unit |
28972 | 61 |
val cancel: 'a future -> unit |
28203 | 62 |
val shutdown: unit -> unit |
38236
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
wenzelm
parents:
37904
diff
changeset
|
63 |
val status: (unit -> 'a) -> 'a |
28156 | 64 |
end; |
65 |
||
66 |
structure Future: FUTURE = |
|
67 |
struct |
|
68 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
69 |
(** future values **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
70 |
|
28167 | 71 |
(* identifiers *) |
72 |
||
29119 | 73 |
type task = Task_Queue.task; |
74 |
type group = Task_Queue.group; |
|
28167 | 75 |
|
32058 | 76 |
local |
33408 | 77 |
val tag = Universal.tag () : (task * group) option Universal.tag; |
32058 | 78 |
in |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
79 |
fun thread_data () = the_default NONE (Thread.getLocal tag); |
32058 | 80 |
fun setmp_thread_data data f x = |
81 |
Library.setmp_thread_data tag (thread_data ()) (SOME data) f x; |
|
28167 | 82 |
end; |
83 |
||
32058 | 84 |
val is_worker = is_some o thread_data; |
33408 | 85 |
val worker_task = Option.map #1 o thread_data; |
86 |
val worker_group = Option.map #2 o thread_data; |
|
37865
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
wenzelm
parents:
37854
diff
changeset
|
87 |
fun worker_subgroup () = Task_Queue.new_group (worker_group ()); |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
88 |
|
41670 | 89 |
fun worker_waiting e = |
90 |
(case worker_task () of |
|
91 |
NONE => e () |
|
92 |
| SOME task => Task_Queue.waiting task e); |
|
93 |
||
28167 | 94 |
|
95 |
(* datatype future *) |
|
96 |
||
35016 | 97 |
type 'a result = 'a Exn.result Single_Assignment.var; |
98 |
||
28972 | 99 |
datatype 'a future = Future of |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
100 |
{promised: bool, |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
101 |
task: task, |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
102 |
group: group, |
35016 | 103 |
result: 'a result}; |
28167 | 104 |
|
105 |
fun task_of (Future {task, ...}) = task; |
|
106 |
fun group_of (Future {group, ...}) = group; |
|
32253 | 107 |
fun result_of (Future {result, ...}) = result; |
28167 | 108 |
|
35016 | 109 |
fun peek x = Single_Assignment.peek (result_of x); |
28558 | 110 |
fun is_finished x = is_some (peek x); |
28320 | 111 |
|
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
112 |
fun assign_result group result res = |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
113 |
let |
37854
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
114 |
val _ = Single_Assignment.assign result res |
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
115 |
handle exn as Fail _ => |
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
116 |
(case Single_Assignment.peek result of |
39232
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
117 |
SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) |
37854
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
118 |
| _ => reraise exn); |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
119 |
val ok = |
37854
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
120 |
(case the (Single_Assignment.peek result) of |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
121 |
Exn.Exn exn => (Task_Queue.cancel_group group exn; false) |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
122 |
| Exn.Result _ => true); |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
123 |
in ok end; |
28997 | 124 |
|
28167 | 125 |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
126 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
127 |
(** scheduling **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
128 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
129 |
(* synchronization *) |
28156 | 130 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
131 |
val scheduler_event = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
132 |
val work_available = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
133 |
val work_finished = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
134 |
|
28156 | 135 |
local |
136 |
val lock = Mutex.mutex (); |
|
137 |
in |
|
138 |
||
37216
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
wenzelm
parents:
37182
diff
changeset
|
139 |
fun SYNCHRONIZED name = Simple_Thread.synchronized name lock; |
28156 | 140 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
141 |
fun wait cond = (*requires SYNCHRONIZED*) |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
142 |
Multithreading.sync_wait NONE NONE cond lock; |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
143 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
144 |
fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
145 |
Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock; |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
146 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
147 |
fun signal cond = (*requires SYNCHRONIZED*) |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
148 |
ConditionVar.signal cond; |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
149 |
|
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
150 |
fun broadcast cond = (*requires SYNCHRONIZED*) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
151 |
ConditionVar.broadcast cond; |
28156 | 152 |
|
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
153 |
fun broadcast_work () = (*requires SYNCHRONIZED*) |
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
154 |
(ConditionVar.broadcast work_available; |
32225
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
155 |
ConditionVar.broadcast work_finished); |
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
156 |
|
28156 | 157 |
end; |
158 |
||
159 |
||
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
160 |
(* global state *) |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
161 |
|
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
162 |
val queue = Unsynchronized.ref Task_Queue.empty; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
163 |
val next = Unsynchronized.ref 0; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
164 |
val scheduler = Unsynchronized.ref (NONE: Thread.thread option); |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
165 |
val canceled = Unsynchronized.ref ([]: Task_Queue.group list); |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
166 |
val do_shutdown = Unsynchronized.ref false; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
167 |
val max_workers = Unsynchronized.ref 0; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
168 |
val max_active = Unsynchronized.ref 0; |
33411
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
169 |
val worker_trend = Unsynchronized.ref 0; |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
170 |
|
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
171 |
datatype worker_state = Working | Waiting | Sleeping; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
172 |
val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
173 |
|
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
174 |
fun count_workers state = (*requires SYNCHRONIZED*) |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
175 |
fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
176 |
|
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
177 |
|
32099 | 178 |
(* execute future jobs *) |
179 |
||
180 |
fun future_job group (e: unit -> 'a) = |
|
181 |
let |
|
35016 | 182 |
val result = Single_Assignment.var "future" : 'a result; |
37046
78d88b670a53
future_job: propagate current Position.thread_data to the forked job -- this is important to provide a default position, e.g. for parallelizied Goal.prove within a package (proper command transactions are wrapped via Toplevel.setmp_thread_position);
wenzelm
parents:
35016
diff
changeset
|
183 |
val pos = Position.thread_data (); |
32107
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
184 |
fun job ok = |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
185 |
let |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
186 |
val res = |
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
wenzelm
parents:
32102
diff
changeset
|
187 |
if ok then |
32230
9f6461b1c9cc
interruptible: Thread.testInterrupt before changing thread attributes;
wenzelm
parents:
32229
diff
changeset
|
188 |
Exn.capture (fn () => |
37046
78d88b670a53
future_job: propagate current Position.thread_data to the forked job -- this is important to provide a default position, e.g. for parallelizied Goal.prove within a package (proper command transactions are wrapped via Toplevel.setmp_thread_position);
wenzelm
parents:
35016
diff
changeset
|
189 |
Multithreading.with_attributes Multithreading.private_interrupts |
78d88b670a53
future_job: propagate current Position.thread_data to the forked job -- this is important to provide a default position, e.g. for parallelizied Goal.prove within a package (proper command transactions are wrapped via Toplevel.setmp_thread_position);
wenzelm
parents:
35016
diff
changeset
|
190 |
(fn _ => Position.setmp_thread_data pos e ())) () |
39232
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
191 |
else Exn.interrupt_exn; |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
192 |
in assign_result group result res end; |
32099 | 193 |
in (result, job) end; |
28156 | 194 |
|
34279
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
195 |
fun cancel_now group = (*requires SYNCHRONIZED*) |
34280
16bf3e9786a3
eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
wenzelm
parents:
34279
diff
changeset
|
196 |
Task_Queue.cancel (! queue) group; |
34279
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
197 |
|
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
198 |
fun cancel_later group = (*requires SYNCHRONIZED*) |
32738 | 199 |
(Unsynchronized.change canceled (insert Task_Queue.eq_group group); |
200 |
broadcast scheduler_event); |
|
29341
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
201 |
|
33408 | 202 |
fun execute (task, group, jobs) = |
28167 | 203 |
let |
32102 | 204 |
val valid = not (Task_Queue.is_canceled group); |
41670 | 205 |
val ok = |
206 |
Task_Queue.running task (fn () => |
|
207 |
setmp_thread_data (task, group) (fn () => |
|
208 |
fold (fn job => fn ok => job valid andalso ok) jobs true) ()); |
|
209 |
val _ = Multithreading.tracing 1 (fn () => |
|
210 |
let |
|
211 |
val s = Task_Queue.str_of_task task; |
|
212 |
fun micros time = string_of_int (Time.toNanoseconds time div 1000); |
|
213 |
val (run, wait) = pairself micros (Task_Queue.timing_of_task task); |
|
214 |
in "TASK " ^ s ^ " " ^ run ^ " " ^ wait end); |
|
32246 | 215 |
val _ = SYNCHRONIZED "finish" (fn () => |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
216 |
let |
32738 | 217 |
val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
218 |
val _ = |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
219 |
if ok then () |
34279
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
220 |
else if cancel_now group then () |
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
221 |
else cancel_later group; |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
222 |
val _ = broadcast work_finished; |
33413
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
wenzelm
parents:
33411
diff
changeset
|
223 |
val _ = if maximal then () else signal work_available; |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
224 |
in () end); |
28167 | 225 |
in () end; |
226 |
||
227 |
||
228 |
(* worker threads *) |
|
229 |
||
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
230 |
fun worker_wait active cond = (*requires SYNCHRONIZED*) |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
231 |
let |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
232 |
val state = |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
233 |
(case AList.lookup Thread.equal (! workers) (Thread.self ()) of |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
234 |
SOME state => state |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
235 |
| NONE => raise Fail "Unregistered worker thread"); |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
236 |
val _ = state := (if active then Waiting else Sleeping); |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
237 |
val _ = wait cond; |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
238 |
val _ = state := Working; |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
239 |
in () end; |
28162 | 240 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
241 |
fun worker_next () = (*requires SYNCHRONIZED*) |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
242 |
if length (! workers) > ! max_workers then |
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
243 |
(Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
244 |
signal work_available; |
28167 | 245 |
NONE) |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
246 |
else if count_workers Working > ! max_active then |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
247 |
(worker_wait false work_available; worker_next ()) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
248 |
else |
32738 | 249 |
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
250 |
NONE => (worker_wait false work_available; worker_next ()) |
33413
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
wenzelm
parents:
33411
diff
changeset
|
251 |
| some => (signal work_available; some)); |
28156 | 252 |
|
28167 | 253 |
fun worker_loop name = |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
254 |
(case SYNCHRONIZED name (fn () => worker_next ()) of |
29119 | 255 |
NONE => () |
33408 | 256 |
| SOME work => (execute work; worker_loop name)); |
28156 | 257 |
|
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
258 |
fun worker_start name = (*requires SYNCHRONIZED*) |
37216
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
wenzelm
parents:
37182
diff
changeset
|
259 |
Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
260 |
Unsynchronized.ref Working)); |
28156 | 261 |
|
262 |
||
263 |
(* scheduler *) |
|
264 |
||
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
265 |
val status_ticks = Unsynchronized.ref 0; |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
266 |
|
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
267 |
val last_round = Unsynchronized.ref Time.zeroTime; |
40301 | 268 |
val next_round = seconds 0.05; |
32226 | 269 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
270 |
fun scheduler_next () = (*requires SYNCHRONIZED*) |
28156 | 271 |
let |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
272 |
val now = Time.now (); |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
273 |
val tick = Time.<= (Time.+ (! last_round, next_round), now); |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
274 |
val _ = if tick then last_round := now else (); |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
275 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
276 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
277 |
(* queue and worker status *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
278 |
|
32226 | 279 |
val _ = |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
280 |
if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
281 |
val _ = |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
282 |
if tick andalso ! status_ticks = 0 then |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
283 |
Multithreading.tracing 1 (fn () => |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
284 |
let |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
285 |
val {ready, pending, running, passive} = Task_Queue.status (! queue); |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
286 |
val total = length (! workers); |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
287 |
val active = count_workers Working; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
288 |
val waiting = count_workers Waiting; |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
289 |
in |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
290 |
"SCHEDULE " ^ Time.toString now ^ ": " ^ |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
291 |
string_of_int ready ^ " ready, " ^ |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
292 |
string_of_int pending ^ " pending, " ^ |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
293 |
string_of_int running ^ " running, " ^ |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
294 |
string_of_int passive ^ " passive; " ^ |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
295 |
string_of_int total ^ " workers, " ^ |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
296 |
string_of_int active ^ " active, " ^ |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
297 |
string_of_int waiting ^ " waiting " |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
298 |
end) |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
299 |
else (); |
32053 | 300 |
|
28191 | 301 |
val _ = |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
302 |
if forall (Thread.isActive o #1) (! workers) then () |
32095 | 303 |
else |
33409 | 304 |
let |
37682 | 305 |
val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); |
33409 | 306 |
val _ = workers := alive; |
307 |
in |
|
308 |
Multithreading.tracing 0 (fn () => |
|
309 |
"SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads") |
|
310 |
end; |
|
28191 | 311 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
312 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
313 |
(* worker pool adjustments *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
314 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
315 |
val max_active0 = ! max_active; |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
316 |
val max_workers0 = ! max_workers; |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
317 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
318 |
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
319 |
val _ = max_active := m; |
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
320 |
|
33411
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
321 |
val mm = |
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
322 |
if ! do_shutdown then 0 |
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
323 |
else if m = 9999 then 1 |
33413
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
wenzelm
parents:
33411
diff
changeset
|
324 |
else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m); |
33411
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
325 |
val _ = |
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
326 |
if tick andalso mm > ! max_workers then |
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
327 |
Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1) |
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
328 |
else if tick andalso mm < ! max_workers then |
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
329 |
Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) |
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
330 |
else (); |
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
331 |
val _ = |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
332 |
if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
333 |
max_workers := mm |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
334 |
else if ! worker_trend > 5 andalso ! max_workers < 2 * m then |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
335 |
max_workers := Int.min (mm, 2 * m) |
33411
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
wenzelm
parents:
33410
diff
changeset
|
336 |
else (); |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
337 |
|
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
338 |
val missing = ! max_workers - length (! workers); |
28203 | 339 |
val _ = |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
340 |
if missing > 0 then |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
341 |
funpow missing (fn () => |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
342 |
ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () |
28203 | 343 |
else (); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
344 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
345 |
val _ = |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
346 |
if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
347 |
else signal work_available; |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
348 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
349 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
350 |
(* canceled groups *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
351 |
|
32225
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
352 |
val _ = |
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
353 |
if null (! canceled) then () |
32293 | 354 |
else |
355 |
(Multithreading.tracing 1 (fn () => |
|
356 |
string_of_int (length (! canceled)) ^ " canceled groups"); |
|
34279
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
357 |
Unsynchronized.change canceled (filter_out cancel_now); |
32293 | 358 |
broadcast_work ()); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
359 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
360 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
361 |
(* delay loop *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
362 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
363 |
val _ = Exn.release (wait_timeout next_round scheduler_event); |
28167 | 364 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
365 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
366 |
(* shutdown *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
367 |
|
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
368 |
val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else (); |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
369 |
val continue = not (! do_shutdown andalso null (! workers)); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
370 |
val _ = if continue then () else scheduler := NONE; |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
371 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
372 |
val _ = broadcast scheduler_event; |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
373 |
in continue end |
39232
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
374 |
handle exn => |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
375 |
if Exn.is_interrupt exn then |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
376 |
(Multithreading.tracing 1 (fn () => "Interrupt"); |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
377 |
List.app cancel_later (Task_Queue.cancel_all (! queue)); |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
378 |
broadcast_work (); true) |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
379 |
else reraise exn; |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
380 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
381 |
fun scheduler_loop () = |
33416
13d00799fe49
scheduler: clarified interrupt attributes and handling;
wenzelm
parents:
33415
diff
changeset
|
382 |
while |
13d00799fe49
scheduler: clarified interrupt attributes and handling;
wenzelm
parents:
33415
diff
changeset
|
383 |
Multithreading.with_attributes |
13d00799fe49
scheduler: clarified interrupt attributes and handling;
wenzelm
parents:
33415
diff
changeset
|
384 |
(Multithreading.sync_interrupts Multithreading.public_interrupts) |
13d00799fe49
scheduler: clarified interrupt attributes and handling;
wenzelm
parents:
33415
diff
changeset
|
385 |
(fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) |
13d00799fe49
scheduler: clarified interrupt attributes and handling;
wenzelm
parents:
33415
diff
changeset
|
386 |
do (); |
28156 | 387 |
|
28203 | 388 |
fun scheduler_active () = (*requires SYNCHRONIZED*) |
389 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); |
|
390 |
||
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
391 |
fun scheduler_check () = (*requires SYNCHRONIZED*) |
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
392 |
(do_shutdown := false; |
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
393 |
if scheduler_active () then () |
37216
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
wenzelm
parents:
37182
diff
changeset
|
394 |
else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); |
28156 | 395 |
|
396 |
||
29366 | 397 |
|
398 |
(** futures **) |
|
28156 | 399 |
|
29366 | 400 |
(* fork *) |
401 |
||
41672
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
402 |
fun bulk {group, deps, pri} es = |
29366 | 403 |
let |
41672
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
404 |
val grp = |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
405 |
(case group of |
37865
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
wenzelm
parents:
37854
diff
changeset
|
406 |
NONE => worker_subgroup () |
41672
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
407 |
| SOME grp => grp); |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
408 |
fun enqueue e (minimal, queue) = |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
409 |
let |
41672
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
410 |
val (result, job) = future_job grp e; |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
411 |
val ((task, minimal'), queue') = Task_Queue.enqueue grp deps pri job queue; |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
412 |
val future = Future {promised = false, task = task, group = grp, result = result}; |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
413 |
in (future, (minimal orelse minimal', queue')) end; |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
414 |
in |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
415 |
SYNCHRONIZED "enqueue" (fn () => |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
416 |
let |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
417 |
val (futures, minimal) = |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
418 |
Unsynchronized.change_result queue (fn q => |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
419 |
let val (futures, (minimal, q')) = fold_map enqueue es (false, q) |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
420 |
in ((futures, minimal), q') end); |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
421 |
val _ = if minimal then signal work_available else (); |
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
422 |
val _ = scheduler_check (); |
41672
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
423 |
in futures end) |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
424 |
end; |
28162 | 425 |
|
41672
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
426 |
fun fork_pri pri e = singleton (bulk {group = NONE, deps = [], pri = pri}) e; |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
427 |
fun fork e = fork_pri 0 e; |
28186 | 428 |
|
429 |
||
29366 | 430 |
(* join *) |
431 |
||
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
432 |
local |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
433 |
|
32099 | 434 |
fun get_result x = |
435 |
(case peek x of |
|
37852
a902f158b4fc
eliminated old-style sys_error/SYS_ERROR in favour of exception Fail -- after careful checking that there is no overlap with existing handling of that;
wenzelm
parents:
37690
diff
changeset
|
436 |
NONE => Exn.Exn (Fail "Unfinished future") |
39232
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
437 |
| SOME res => |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
438 |
if Exn.is_interrupt_exn res then |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
439 |
(case Exn.flatten_list (Task_Queue.group_status (group_of x)) of |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
440 |
[] => res |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
441 |
| exns => Exn.Exn (Exn.EXCEPTIONS exns)) |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
442 |
else res); |
28186 | 443 |
|
32095 | 444 |
fun join_next deps = (*requires SYNCHRONIZED*) |
32224 | 445 |
if null deps then NONE |
446 |
else |
|
32738 | 447 |
(case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of |
32224 | 448 |
(NONE, []) => NONE |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
449 |
| (NONE, deps') => (worker_wait true work_finished; join_next deps') |
32224 | 450 |
| (SOME work, deps') => SOME (work, deps')); |
32095 | 451 |
|
32814
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
452 |
fun execute_work NONE = () |
33408 | 453 |
| execute_work (SOME (work, deps')) = (execute work; join_work deps') |
32814
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
454 |
and join_work deps = |
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
455 |
execute_work (SYNCHRONIZED "join" (fn () => join_next deps)); |
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
456 |
|
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
457 |
fun join_depend task deps = |
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
458 |
execute_work (SYNCHRONIZED "join" (fn () => |
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
459 |
(Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps))); |
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
460 |
|
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
461 |
in |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
462 |
|
29366 | 463 |
fun join_results xs = |
464 |
if forall is_finished xs then map get_result xs |
|
32246 | 465 |
else if Multithreading.self_critical () then |
466 |
error "Cannot join future values within critical section" |
|
32814
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
467 |
else |
41670 | 468 |
worker_waiting (fn () => |
469 |
(case worker_task () of |
|
470 |
SOME task => join_depend task (map task_of xs) |
|
471 |
| NONE => List.app (ignore o Single_Assignment.await o result_of) xs; |
|
472 |
map get_result xs)); |
|
28186 | 473 |
|
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
474 |
end; |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
475 |
|
28647
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
476 |
fun join_result x = singleton join_results x; |
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
477 |
fun join x = Exn.release (join_result x); |
28156 | 478 |
|
29366 | 479 |
|
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
480 |
(* fast-path versions -- bypassing full task management *) |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
481 |
|
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
482 |
fun value (x: 'a) = |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
483 |
let |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
484 |
val group = Task_Queue.new_group NONE; |
35016 | 485 |
val result = Single_Assignment.var "value" : 'a result; |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
486 |
val _ = assign_result group result (Exn.Result x); |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
487 |
in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end; |
29366 | 488 |
|
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
489 |
fun map_future f x = |
29366 | 490 |
let |
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
491 |
val task = task_of x; |
32102 | 492 |
val group = Task_Queue.new_group (SOME (group_of x)); |
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
493 |
val (result, job) = future_job group (fn () => f (join x)); |
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
494 |
|
32246 | 495 |
val extended = SYNCHRONIZED "extend" (fn () => |
29366 | 496 |
(case Task_Queue.extend task job (! queue) of |
497 |
SOME queue' => (queue := queue'; true) |
|
498 |
| NONE => false)); |
|
499 |
in |
|
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
500 |
if extended then Future {promised = false, task = task, group = group, result = result} |
41672
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
501 |
else |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
502 |
singleton (bulk {group = SOME group, deps = [task], pri = Task_Queue.pri_of_task task}) |
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
wenzelm
parents:
41670
diff
changeset
|
503 |
(fn () => f (join x)) |
29366 | 504 |
end; |
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
505 |
|
28191 | 506 |
|
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
507 |
(* promised futures -- fulfilled by external means *) |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
508 |
|
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
509 |
fun promise_group group : 'a future = |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
510 |
let |
35016 | 511 |
val result = Single_Assignment.var "promise" : 'a result; |
39243
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
512 |
fun abort () = assign_result group result Exn.interrupt_exn |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
513 |
handle Fail _ => true |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
514 |
| exn => |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
515 |
if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise" |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
516 |
else reraise exn; |
37854
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
517 |
val task = SYNCHRONIZED "enqueue_passive" (fn () => |
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
518 |
Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort)); |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
519 |
in Future {promised = true, task = task, group = group, result = result} end; |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
520 |
|
37865
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
wenzelm
parents:
37854
diff
changeset
|
521 |
fun promise () = promise_group (worker_subgroup ()); |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
522 |
|
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
523 |
fun fulfill_result (Future {promised, task, group, result}) res = |
39243
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
524 |
if not promised then raise Fail "Not a promised future" |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
525 |
else |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
526 |
let |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
527 |
fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn); |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
528 |
val _ = |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
529 |
Multithreading.with_attributes Multithreading.no_interrupts (fn _ => |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
530 |
let |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
531 |
val still_passive = |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
532 |
SYNCHRONIZED "fulfill_result" (fn () => |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
533 |
Unsynchronized.change_result queue |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
534 |
(Task_Queue.dequeue_passive (Thread.self ()) task)); |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
535 |
in if still_passive then execute (task, group, [job]) else () end); |
41670 | 536 |
val _ = worker_waiting (fn () => Single_Assignment.await result); |
39243
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
537 |
in () end; |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
538 |
|
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
539 |
fun fulfill x res = fulfill_result x (Exn.Result res); |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
540 |
|
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
541 |
|
29431 | 542 |
(* cancellation *) |
28202
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
wenzelm
parents:
28201
diff
changeset
|
543 |
|
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
544 |
fun interruptible_task f x = |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
545 |
if Multithreading.available then |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
546 |
Multithreading.with_attributes |
32058 | 547 |
(if is_worker () |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
548 |
then Multithreading.private_interrupts |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
549 |
else Multithreading.public_interrupts) |
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
550 |
(fn _ => f x) |
30618
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
551 |
else interruptible f x; |
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
wenzelm
parents:
30612
diff
changeset
|
552 |
|
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
553 |
(*cancel: present and future group members will be interrupted eventually*) |
37854
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
554 |
fun cancel_group group = SYNCHRONIZED "cancel" (fn () => |
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
555 |
(if cancel_now group then () else cancel_later group; |
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
556 |
signal work_available; scheduler_check ())); |
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
557 |
|
29431 | 558 |
fun cancel x = cancel_group (group_of x); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
559 |
|
29366 | 560 |
|
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
561 |
(* shutdown *) |
29366 | 562 |
|
28203 | 563 |
fun shutdown () = |
28276 | 564 |
if Multithreading.available then |
565 |
SYNCHRONIZED "shutdown" (fn () => |
|
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
566 |
while scheduler_active () do |
34279
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
567 |
(wait scheduler_event; broadcast_work ())) |
28276 | 568 |
else (); |
28203 | 569 |
|
29366 | 570 |
|
38236
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
wenzelm
parents:
37904
diff
changeset
|
571 |
(* status markup *) |
37690
b16231572c61
general Future.report -- also for Toplevel.async_state;
wenzelm
parents:
37682
diff
changeset
|
572 |
|
38236
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
wenzelm
parents:
37904
diff
changeset
|
573 |
fun status e = |
37690
b16231572c61
general Future.report -- also for Toplevel.async_state;
wenzelm
parents:
37682
diff
changeset
|
574 |
let |
40448 | 575 |
val task_props = |
576 |
(case worker_task () of |
|
577 |
NONE => I |
|
578 |
| SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]); |
|
579 |
val _ = Output.status (Markup.markup (task_props Markup.forked) ""); |
|
37690
b16231572c61
general Future.report -- also for Toplevel.async_state;
wenzelm
parents:
37682
diff
changeset
|
580 |
val x = e (); (*sic -- report "joined" only for success*) |
40448 | 581 |
val _ = Output.status (Markup.markup (task_props Markup.joined) ""); |
37690
b16231572c61
general Future.report -- also for Toplevel.async_state;
wenzelm
parents:
37682
diff
changeset
|
582 |
in x end; |
b16231572c61
general Future.report -- also for Toplevel.async_state;
wenzelm
parents:
37682
diff
changeset
|
583 |
|
b16231572c61
general Future.report -- also for Toplevel.async_state;
wenzelm
parents:
37682
diff
changeset
|
584 |
|
29366 | 585 |
(*final declarations of this structure!*) |
586 |
val map = map_future; |
|
587 |
||
28156 | 588 |
end; |
28972 | 589 |
|
590 |
type 'a future = 'a Future.future; |
|
591 |