author | wenzelm |
Wed, 15 Jul 2020 16:10:43 +0200 | |
changeset 72038 | 254c324f31fd |
parent 71883 | 44ba78056790 |
child 72078 | b8d0b8659e0a |
permissions | -rw-r--r-- |
28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
Author: Makarius |
|
3 |
||
57350 | 4 |
Value-oriented parallel execution via futures and promises. |
28156 | 5 |
*) |
6 |
||
7 |
signature FUTURE = |
|
8 |
sig |
|
44300
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
9 |
type task = Task_Queue.task |
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
10 |
type group = Task_Queue.group |
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
11 |
val new_group: group option -> group |
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
12 |
val worker_task: unit -> task option |
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
13 |
val worker_group: unit -> group option |
52603 | 14 |
val the_worker_group: unit -> group |
44300
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
15 |
val worker_subgroup: unit -> group |
28972 | 16 |
type 'a future |
44300
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
17 |
val task_of: 'a future -> task |
28972 | 18 |
val peek: 'a future -> 'a Exn.result option |
19 |
val is_finished: 'a future -> bool |
|
50280 | 20 |
val ML_statistics: bool Unsynchronized.ref |
44301 | 21 |
val interruptible_task: ('a -> 'b) -> 'a -> 'b |
47404
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
wenzelm
parents:
45666
diff
changeset
|
22 |
val cancel_group: group -> unit |
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
wenzelm
parents:
45666
diff
changeset
|
23 |
val cancel: 'a future -> unit |
56333
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
wenzelm
parents:
54671
diff
changeset
|
24 |
val error_message: Position.T -> (serial * string) * string option -> unit |
50914
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
25 |
val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result |
44427 | 26 |
type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool} |
27 |
val default_params: params |
|
28 |
val forks: params -> (unit -> 'a) list -> 'a future list |
|
32724 | 29 |
val fork: (unit -> 'a) -> 'a future |
68196 | 30 |
val get_finished: 'a future -> 'a |
28972 | 31 |
val join_results: 'a future list -> 'a Exn.result list |
32 |
val join_result: 'a future -> 'a Exn.result |
|
44330 | 33 |
val joins: 'a future list -> 'a list |
28972 | 34 |
val join: 'a future -> 'a |
67658 | 35 |
val forked_results: {name: string, deps: Task_Queue.task list} -> |
36 |
(unit -> 'a) list -> 'a Exn.result list |
|
54649
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
37 |
val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b |
68130
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
38 |
val enabled: unit -> bool |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
39 |
val relevant: 'a list -> bool |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
40 |
val proofs_enabled: int -> bool |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
41 |
val proofs_enabled_timing: Time.time -> bool |
44294
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
wenzelm
parents:
44268
diff
changeset
|
42 |
val value_result: 'a Exn.result -> 'a future |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
43 |
val value: 'a -> 'a future |
44427 | 44 |
val cond_forks: params -> (unit -> 'a) list -> 'a future list |
28972 | 45 |
val map: ('a -> 'b) -> 'a future -> 'b future |
66166 | 46 |
val promise_name: string -> (unit -> unit) -> 'a future |
44298
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
wenzelm
parents:
44295
diff
changeset
|
47 |
val promise: (unit -> unit) -> 'a future |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
48 |
val fulfill_result: 'a future -> 'a Exn.result -> unit |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
49 |
val fulfill: 'a future -> 'a -> unit |
66378 | 50 |
val snapshot: group list -> task list |
28203 | 51 |
val shutdown: unit -> unit |
28156 | 52 |
end; |
53 |
||
54 |
structure Future: FUTURE = |
|
55 |
struct |
|
56 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
57 |
(** future values **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
58 |
|
44300
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
59 |
type task = Task_Queue.task; |
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
60 |
type group = Task_Queue.group; |
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
61 |
val new_group = Task_Queue.new_group; |
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
62 |
|
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
63 |
|
28167 | 64 |
(* identifiers *) |
65 |
||
32058 | 66 |
local |
62889 | 67 |
val worker_task_var = Thread_Data.var () : task Thread_Data.var; |
32058 | 68 |
in |
62889 | 69 |
fun worker_task () = Thread_Data.get worker_task_var; |
70 |
fun setmp_worker_task task f x = Thread_Data.setmp worker_task_var (SOME task) f x; |
|
28167 | 71 |
end; |
72 |
||
41683
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
wenzelm
parents:
41681
diff
changeset
|
73 |
val worker_group = Option.map Task_Queue.group_of_task o worker_task; |
52603 | 74 |
|
75 |
fun the_worker_group () = |
|
76 |
(case worker_group () of |
|
77 |
SOME group => group |
|
78 |
| NONE => raise Fail "Missing worker thread context"); |
|
79 |
||
44300
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
80 |
fun worker_subgroup () = new_group (worker_group ()); |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
81 |
|
41679 | 82 |
fun worker_joining e = |
83 |
(case worker_task () of |
|
84 |
NONE => e () |
|
85 |
| SOME task => Task_Queue.joining task e); |
|
86 |
||
41680
a4c822915eaa
more informative task timing: some dependency tracking;
wenzelm
parents:
41679
diff
changeset
|
87 |
fun worker_waiting deps e = |
41670 | 88 |
(case worker_task () of |
89 |
NONE => e () |
|
41680
a4c822915eaa
more informative task timing: some dependency tracking;
wenzelm
parents:
41679
diff
changeset
|
90 |
| SOME task => Task_Queue.waiting task deps e); |
41670 | 91 |
|
28167 | 92 |
|
93 |
(* datatype future *) |
|
94 |
||
35016 | 95 |
type 'a result = 'a Exn.result Single_Assignment.var; |
96 |
||
66958
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
97 |
datatype 'a future = |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
98 |
Value of 'a Exn.result | |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
99 |
Future of |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
100 |
{promised: bool, |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
101 |
task: task, |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
102 |
result: 'a result}; |
28167 | 103 |
|
66958
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
104 |
fun task_of (Value _) = Task_Queue.dummy_task |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
105 |
| task_of (Future {task, ...}) = task; |
28167 | 106 |
|
66958
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
107 |
fun peek (Value res) = SOME res |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
108 |
| peek (Future {result, ...}) = Single_Assignment.peek result; |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
109 |
|
28558 | 110 |
fun is_finished x = is_some (peek x); |
28320 | 111 |
|
62663 | 112 |
val _ = |
62819
d3ff367a16a0
careful export of type-dependent functions, without losing their special status;
wenzelm
parents:
62663
diff
changeset
|
113 |
ML_system_pp (fn depth => fn pretty => fn x => |
62663 | 114 |
(case peek x of |
68918
3a0db30e5d87
simplified signature (again, see 751bcf0473a7): e.g. relevant for non-Isabelle ML environments;
wenzelm
parents:
68379
diff
changeset
|
115 |
NONE => PolyML.PrettyString "<future>" |
3a0db30e5d87
simplified signature (again, see 751bcf0473a7): e.g. relevant for non-Isabelle ML environments;
wenzelm
parents:
68379
diff
changeset
|
116 |
| SOME (Exn.Exn _) => PolyML.PrettyString "<failed>" |
62663 | 117 |
| SOME (Exn.Res y) => pretty (y, depth))); |
118 |
||
28167 | 119 |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
120 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
121 |
(** scheduling **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
122 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
123 |
(* synchronization *) |
28156 | 124 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
125 |
val scheduler_event = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
126 |
val work_available = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
127 |
val work_finished = ConditionVar.conditionVar (); |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
128 |
|
28156 | 129 |
local |
130 |
val lock = Mutex.mutex (); |
|
131 |
in |
|
132 |
||
59054
61b723761dff
load simple_thread.ML later, such that it benefits from redefined print_exception_trace;
wenzelm
parents:
57350
diff
changeset
|
133 |
fun SYNCHRONIZED name = Multithreading.synchronized name lock; |
28156 | 134 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
135 |
fun wait cond = (*requires SYNCHRONIZED*) |
64276 | 136 |
Multithreading.sync_wait NONE cond lock; |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
137 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
138 |
fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) |
64276 | 139 |
Multithreading.sync_wait (SOME (Time.now () + timeout)) cond lock; |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
140 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
141 |
fun signal cond = (*requires SYNCHRONIZED*) |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
142 |
ConditionVar.signal cond; |
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
143 |
|
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
144 |
fun broadcast cond = (*requires SYNCHRONIZED*) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
145 |
ConditionVar.broadcast cond; |
28156 | 146 |
|
147 |
end; |
|
148 |
||
149 |
||
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
150 |
(* global state *) |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
151 |
|
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
152 |
val queue = Unsynchronized.ref Task_Queue.empty; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
153 |
val next = Unsynchronized.ref 0; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
154 |
val scheduler = Unsynchronized.ref (NONE: Thread.thread option); |
44300
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
wenzelm
parents:
44299
diff
changeset
|
155 |
val canceled = Unsynchronized.ref ([]: group list); |
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
156 |
val do_shutdown = Unsynchronized.ref false; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
157 |
val max_workers = Unsynchronized.ref 0; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
158 |
val max_active = Unsynchronized.ref 0; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
159 |
|
50280 | 160 |
val status_ticks = Unsynchronized.ref 0; |
161 |
val last_round = Unsynchronized.ref Time.zeroTime; |
|
162 |
val next_round = seconds 0.05; |
|
163 |
||
33410
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
164 |
datatype worker_state = Working | Waiting | Sleeping; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
165 |
val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
166 |
|
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
167 |
fun count_workers state = (*requires SYNCHRONIZED*) |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
168 |
fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; |
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
169 |
|
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
wenzelm
parents:
33409
diff
changeset
|
170 |
|
50280 | 171 |
|
172 |
(* status *) |
|
173 |
||
174 |
val ML_statistics = Unsynchronized.ref false; |
|
175 |
||
176 |
fun report_status () = (*requires SYNCHRONIZED*) |
|
177 |
if ! ML_statistics then |
|
178 |
let |
|
72038
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
179 |
val {ready, pending, running, passive, urgent} = Task_Queue.status (! queue); |
68129 | 180 |
val workers_total = length (! workers); |
181 |
val workers_active = count_workers Working; |
|
182 |
val workers_waiting = count_workers Waiting; |
|
72038
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
183 |
val _ = |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
184 |
ML_Statistics.set |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
185 |
{tasks_ready = ready, |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
186 |
tasks_pending = pending, |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
187 |
tasks_running = running, |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
188 |
tasks_passive = passive, |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
189 |
tasks_urgent = urgent, |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
190 |
workers_total = workers_total, |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
191 |
workers_active = workers_active, |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
192 |
workers_waiting = workers_waiting}; |
254c324f31fd
clarified user counters: expose tasks to external monitor;
wenzelm
parents:
71883
diff
changeset
|
193 |
val stats = ML_Statistics.get (); |
56333
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
wenzelm
parents:
54671
diff
changeset
|
194 |
in Output.try_protocol_message (Markup.ML_statistics :: stats) [] end |
50280 | 195 |
else (); |
196 |
||
197 |
||
44110 | 198 |
(* cancellation primitives *) |
32099 | 199 |
|
34279
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
200 |
fun cancel_now group = (*requires SYNCHRONIZED*) |
44341
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
wenzelm
parents:
44330
diff
changeset
|
201 |
let |
47404
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
wenzelm
parents:
45666
diff
changeset
|
202 |
val running = Task_Queue.cancel (! queue) group; |
49894
69bfd86cc711
more robust cancel_now: avoid shooting yourself in the foot;
wenzelm
parents:
49009
diff
changeset
|
203 |
val _ = running |> List.app (fn thread => |
71692 | 204 |
if Isabelle_Thread.is_self thread then () |
205 |
else Isabelle_Thread.interrupt_unsynchronized thread); |
|
47404
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
wenzelm
parents:
45666
diff
changeset
|
206 |
in running end; |
44341
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
wenzelm
parents:
44330
diff
changeset
|
207 |
|
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
wenzelm
parents:
44330
diff
changeset
|
208 |
fun cancel_all () = (*requires SYNCHRONIZED*) |
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
wenzelm
parents:
44330
diff
changeset
|
209 |
let |
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
wenzelm
parents:
44330
diff
changeset
|
210 |
val (groups, threads) = Task_Queue.cancel_all (! queue); |
71692 | 211 |
val _ = List.app Isabelle_Thread.interrupt_unsynchronized threads; |
44341
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
wenzelm
parents:
44330
diff
changeset
|
212 |
in groups end; |
34279
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
213 |
|
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
214 |
fun cancel_later group = (*requires SYNCHRONIZED*) |
32738 | 215 |
(Unsynchronized.change canceled (insert Task_Queue.eq_group group); |
216 |
broadcast scheduler_event); |
|
29341
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
wenzelm
parents:
29119
diff
changeset
|
217 |
|
44301 | 218 |
fun interruptible_task f x = |
62923 | 219 |
Thread_Attributes.with_attributes |
62359 | 220 |
(if is_some (worker_task ()) |
62923 | 221 |
then Thread_Attributes.private_interrupts |
222 |
else Thread_Attributes.public_interrupts) |
|
62359 | 223 |
(fn _ => f x) |
62924
ce47945ce4fb
tuned signature -- closer to Exn.Interrupt.expose in Scala;
wenzelm
parents:
62923
diff
changeset
|
224 |
before Thread_Attributes.expose_interrupt (); |
44301 | 225 |
|
226 |
||
44110 | 227 |
(* worker threads *) |
228 |
||
229 |
fun worker_exec (task, jobs) = |
|
28167 | 230 |
let |
41683
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
wenzelm
parents:
41681
diff
changeset
|
231 |
val group = Task_Queue.group_of_task task; |
32102 | 232 |
val valid = not (Task_Queue.is_canceled group); |
41670 | 233 |
val ok = |
234 |
Task_Queue.running task (fn () => |
|
41683
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
wenzelm
parents:
41681
diff
changeset
|
235 |
setmp_worker_task task (fn () => |
41670 | 236 |
fold (fn job => fn ok => job valid andalso ok) jobs true) ()); |
50975 | 237 |
val _ = |
238 |
if ! Multithreading.trace >= 2 then |
|
56333
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
wenzelm
parents:
54671
diff
changeset
|
239 |
Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) [] |
50975 | 240 |
else (); |
32246 | 241 |
val _ = SYNCHRONIZED "finish" (fn () => |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
242 |
let |
32738 | 243 |
val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); |
62924
ce47945ce4fb
tuned signature -- closer to Exn.Interrupt.expose in Scala;
wenzelm
parents:
62923
diff
changeset
|
244 |
val test = Exn.capture Thread_Attributes.expose_interrupt (); |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
245 |
val _ = |
44295
e43f0ea90c9a
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
wenzelm
parents:
44294
diff
changeset
|
246 |
if ok andalso not (Exn.is_interrupt_exn test) then () |
44299
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
wenzelm
parents:
44298
diff
changeset
|
247 |
else if null (cancel_now group) then () |
34279
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
wenzelm
parents:
34277
diff
changeset
|
248 |
else cancel_later group; |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
249 |
val _ = broadcast work_finished; |
33413
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
wenzelm
parents:
33411
diff
changeset
|
250 |
val _ = if maximal then () else signal work_available; |
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
251 |
in () end); |
28167 | 252 |
in () end; |
253 |
||
59465 | 254 |
fun worker_wait worker_state cond = (*requires SYNCHRONIZED*) |
52558
271663ddf289
allow worker guest threads, which participate actively in future joins, but are outside thread accounting;
wenzelm
parents:
51990
diff
changeset
|
255 |
(case AList.lookup Thread.equal (! workers) (Thread.self ()) of |
59465 | 256 |
SOME state => Unsynchronized.setmp state worker_state wait cond |
257 |
| NONE => wait cond); |
|
28162 | 258 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
259 |
fun worker_next () = (*requires SYNCHRONIZED*) |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
260 |
if length (! workers) > ! max_workers then |
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
261 |
(Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
262 |
signal work_available; |
28167 | 263 |
NONE) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
264 |
else |
60610
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
wenzelm
parents:
59468
diff
changeset
|
265 |
let val urgent_only = count_workers Working > ! max_active in |
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
wenzelm
parents:
59468
diff
changeset
|
266 |
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ()) urgent_only) of |
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
wenzelm
parents:
59468
diff
changeset
|
267 |
NONE => (worker_wait Sleeping work_available; worker_next ()) |
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
wenzelm
parents:
59468
diff
changeset
|
268 |
| some => (signal work_available; some)) |
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
wenzelm
parents:
59468
diff
changeset
|
269 |
end; |
28156 | 270 |
|
28167 | 271 |
fun worker_loop name = |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
272 |
(case SYNCHRONIZED name (fn () => worker_next ()) of |
29119 | 273 |
NONE => () |
44295
e43f0ea90c9a
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
wenzelm
parents:
44294
diff
changeset
|
274 |
| SOME work => (worker_exec work; worker_loop name)); |
28156 | 275 |
|
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
276 |
fun worker_start name = (*requires SYNCHRONIZED*) |
59468
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
wenzelm
parents:
59467
diff
changeset
|
277 |
let |
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
wenzelm
parents:
59467
diff
changeset
|
278 |
val worker = |
71883 | 279 |
Isabelle_Thread.fork |
280 |
{name = "worker", stack_limit = Isabelle_Thread.stack_limit (), interrupts = false} |
|
59468
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
wenzelm
parents:
59467
diff
changeset
|
281 |
(fn () => worker_loop name); |
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
wenzelm
parents:
59467
diff
changeset
|
282 |
in Unsynchronized.change workers (cons (worker, Unsynchronized.ref Working)) end |
59338
2ea1bf517842
discontinued worker_trend: prefer constant number of active + reserve threads;
wenzelm
parents:
59330
diff
changeset
|
283 |
handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg); |
59330
cb3a4caf206d
permissive worker_start: failure to fork thread is deferred to later attempt to provide missing threads, without crashing scheduler;
wenzelm
parents:
59180
diff
changeset
|
284 |
|
28156 | 285 |
|
286 |
(* scheduler *) |
|
287 |
||
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
288 |
fun scheduler_next () = (*requires SYNCHRONIZED*) |
28156 | 289 |
let |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
290 |
val now = Time.now (); |
62826 | 291 |
val tick = ! last_round + next_round <= now; |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
292 |
val _ = if tick then last_round := now else (); |
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
293 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
294 |
|
50280 | 295 |
(* runtime status *) |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
296 |
|
32226 | 297 |
val _ = |
51046 | 298 |
if tick then Unsynchronized.change status_ticks (fn i => i + 1) else (); |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
299 |
val _ = |
51046 | 300 |
if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0 |
301 |
then report_status () else (); |
|
32053 | 302 |
|
28191 | 303 |
val _ = |
59338
2ea1bf517842
discontinued worker_trend: prefer constant number of active + reserve threads;
wenzelm
parents:
59330
diff
changeset
|
304 |
if not tick orelse forall (Thread.isActive o #1) (! workers) then () |
32095 | 305 |
else |
33409 | 306 |
let |
37682 | 307 |
val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); |
33409 | 308 |
val _ = workers := alive; |
309 |
in |
|
310 |
Multithreading.tracing 0 (fn () => |
|
51279 | 311 |
"SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads") |
33409 | 312 |
end; |
28191 | 313 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
314 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
315 |
(* worker pool adjustments *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
316 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
317 |
val max_active0 = ! max_active; |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
318 |
val max_workers0 = ! max_workers; |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
319 |
|
59340
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
wenzelm
parents:
59338
diff
changeset
|
320 |
val m = |
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
wenzelm
parents:
59338
diff
changeset
|
321 |
if ! do_shutdown andalso Task_Queue.all_passive (! queue) then 0 |
62925 | 322 |
else Multithreading.max_threads (); |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
323 |
val _ = max_active := m; |
59338
2ea1bf517842
discontinued worker_trend: prefer constant number of active + reserve threads;
wenzelm
parents:
59330
diff
changeset
|
324 |
val _ = max_workers := 2 * m; |
33406
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
wenzelm
parents:
33061
diff
changeset
|
325 |
|
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
326 |
val missing = ! max_workers - length (! workers); |
28203 | 327 |
val _ = |
33407
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
wenzelm
parents:
33406
diff
changeset
|
328 |
if missing > 0 then |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
329 |
funpow missing (fn () => |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
330 |
ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () |
28203 | 331 |
else (); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
332 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
333 |
val _ = |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
334 |
if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
335 |
else signal work_available; |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
336 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
337 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
338 |
(* canceled groups *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
339 |
|
32225
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
340 |
val _ = |
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
wenzelm
parents:
32224
diff
changeset
|
341 |
if null (! canceled) then () |
32293 | 342 |
else |
343 |
(Multithreading.tracing 1 (fn () => |
|
344 |
string_of_int (length (! canceled)) ^ " canceled groups"); |
|
44299
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
wenzelm
parents:
44298
diff
changeset
|
345 |
Unsynchronized.change canceled (filter_out (null o cancel_now)); |
51281
c05f7e1dd602
signal work_available should be sufficient to initiate daisy-chained workers, and lead to separate broadcast work_finished eventually -- NB: broadcasting all worker threads tends to burn parallel CPU cycles;
wenzelm
parents:
51280
diff
changeset
|
346 |
signal work_available); |
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
347 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
348 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
349 |
(* delay loop *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
350 |
|
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
351 |
val _ = Exn.release (wait_timeout next_round scheduler_event); |
28167 | 352 |
|
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
353 |
|
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
354 |
(* shutdown *) |
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
355 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
356 |
val continue = not (! do_shutdown andalso null (! workers)); |
50429
f8cd5e53653b
final report_status within SYNCHRONIZED part of scheduler loop: required for sanity of data;
wenzelm
parents:
50280
diff
changeset
|
357 |
val _ = if continue then () else (report_status (); scheduler := NONE); |
33415
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
wenzelm
parents:
33413
diff
changeset
|
358 |
|
32219
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
wenzelm
parents:
32186
diff
changeset
|
359 |
val _ = broadcast scheduler_event; |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
360 |
in continue end |
39232
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
361 |
handle exn => |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
362 |
if Exn.is_interrupt exn then |
51279 | 363 |
(Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt"); |
44341
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
wenzelm
parents:
44330
diff
changeset
|
364 |
List.app cancel_later (cancel_all ()); |
51281
c05f7e1dd602
signal work_available should be sufficient to initiate daisy-chained workers, and lead to separate broadcast work_finished eventually -- NB: broadcasting all worker threads tends to burn parallel CPU cycles;
wenzelm
parents:
51280
diff
changeset
|
365 |
signal work_available; true) |
62505 | 366 |
else Exn.reraise exn; |
32295
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
wenzelm
parents:
32293
diff
changeset
|
367 |
|
28206
bcd48c6897d4
eliminated requests, use global state variables uniformly;
wenzelm
parents:
28203
diff
changeset
|
368 |
fun scheduler_loop () = |
44173
aaaa13e297dc
immediate fork of initial workers -- avoid 5 ticks (250ms) for adaptive scheme (a07558eb5029);
wenzelm
parents:
44115
diff
changeset
|
369 |
(while |
62923 | 370 |
Thread_Attributes.with_attributes |
371 |
(Thread_Attributes.sync_interrupts Thread_Attributes.public_interrupts) |
|
33416
13d00799fe49
scheduler: clarified interrupt attributes and handling;
wenzelm
parents:
33415
diff
changeset
|
372 |
(fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) |
50429
f8cd5e53653b
final report_status within SYNCHRONIZED part of scheduler loop: required for sanity of data;
wenzelm
parents:
50280
diff
changeset
|
373 |
do (); last_round := Time.zeroTime); |
28156 | 374 |
|
28203 | 375 |
fun scheduler_active () = (*requires SYNCHRONIZED*) |
376 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); |
|
377 |
||
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
378 |
fun scheduler_check () = (*requires SYNCHRONIZED*) |
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
379 |
(do_shutdown := false; |
32248
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
wenzelm
parents:
32247
diff
changeset
|
380 |
if scheduler_active () then () |
59468
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
wenzelm
parents:
59467
diff
changeset
|
381 |
else |
60764 | 382 |
scheduler := |
71692 | 383 |
SOME (Isabelle_Thread.fork {name = "scheduler", stack_limit = NONE, interrupts = false} |
60764 | 384 |
scheduler_loop)); |
28156 | 385 |
|
44301 | 386 |
|
387 |
||
388 |
(** futures **) |
|
389 |
||
390 |
(* cancel *) |
|
391 |
||
49906 | 392 |
fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*) |
44299
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
wenzelm
parents:
44298
diff
changeset
|
393 |
let |
47421
9624408d8827
always signal after cancel_group: passive tasks may have become active;
wenzelm
parents:
47404
diff
changeset
|
394 |
val _ = if null (cancel_now group) then () else cancel_later group; |
9624408d8827
always signal after cancel_group: passive tasks may have become active;
wenzelm
parents:
47404
diff
changeset
|
395 |
val _ = signal work_available; |
9624408d8827
always signal after cancel_group: passive tasks may have become active;
wenzelm
parents:
47404
diff
changeset
|
396 |
val _ = scheduler_check (); |
49906 | 397 |
in () end; |
398 |
||
399 |
fun cancel_group group = |
|
400 |
SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group); |
|
44299
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
wenzelm
parents:
44298
diff
changeset
|
401 |
|
44301 | 402 |
fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); |
29366 | 403 |
|
28156 | 404 |
|
50914
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
405 |
(* results *) |
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
406 |
|
56333
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
wenzelm
parents:
54671
diff
changeset
|
407 |
fun error_message pos ((serial, msg), exec_id) = |
50916
fd902b616b48
proper runtime position (cf. fe4714886d92 and Toplevel.error_msg) -- to make error messages actually appear in the document;
wenzelm
parents:
50914
diff
changeset
|
408 |
Position.setmp_thread_data pos (fn () => |
50931
a7484a7b6c8a
clarified Future.error_msg: slightly more robust id check, actually suppress displaced messages;
wenzelm
parents:
50916
diff
changeset
|
409 |
let val id = Position.get_id pos in |
a7484a7b6c8a
clarified Future.error_msg: slightly more robust id check, actually suppress displaced messages;
wenzelm
parents:
50916
diff
changeset
|
410 |
if is_none id orelse is_none exec_id orelse id = exec_id |
54387 | 411 |
then Output.error_message' (serial, msg) else () |
50931
a7484a7b6c8a
clarified Future.error_msg: slightly more robust id check, actually suppress displaced messages;
wenzelm
parents:
50916
diff
changeset
|
412 |
end) (); |
44110 | 413 |
|
50914
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
414 |
fun identify_result pos res = |
61077
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
wenzelm
parents:
60911
diff
changeset
|
415 |
res |> Exn.map_exn (fn exn => |
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
wenzelm
parents:
60911
diff
changeset
|
416 |
let val exec_id = |
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
wenzelm
parents:
60911
diff
changeset
|
417 |
(case Position.get_id pos of |
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
wenzelm
parents:
60911
diff
changeset
|
418 |
NONE => [] |
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
wenzelm
parents:
60911
diff
changeset
|
419 |
| SOME id => [(Markup.exec_idN, id)]) |
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
wenzelm
parents:
60911
diff
changeset
|
420 |
in Par_Exn.identify exec_id exn end); |
50914
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
421 |
|
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
422 |
fun assign_result group result res = |
44110 | 423 |
let |
424 |
val _ = Single_Assignment.assign result res |
|
425 |
handle exn as Fail _ => |
|
426 |
(case Single_Assignment.peek result of |
|
62505 | 427 |
SOME (Exn.Exn e) => Exn.reraise (if Exn.is_interrupt e then e else exn) |
428 |
| _ => Exn.reraise exn); |
|
44110 | 429 |
val ok = |
430 |
(case the (Single_Assignment.peek result) of |
|
44111
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
wenzelm
parents:
44110
diff
changeset
|
431 |
Exn.Exn exn => |
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
wenzelm
parents:
44110
diff
changeset
|
432 |
(SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false) |
44110 | 433 |
| Exn.Res _ => true); |
434 |
in ok end; |
|
435 |
||
50914
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
436 |
|
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
437 |
(* future jobs *) |
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
438 |
|
54649
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
439 |
fun future_job group atts (e: unit -> 'a) = |
44110 | 440 |
let |
441 |
val result = Single_Assignment.var "future" : 'a result; |
|
442 |
val pos = Position.thread_data (); |
|
62889 | 443 |
val context = Context.get_generic_context (); |
44110 | 444 |
fun job ok = |
445 |
let |
|
446 |
val res = |
|
447 |
if ok then |
|
448 |
Exn.capture (fn () => |
|
62923 | 449 |
Thread_Attributes.with_attributes atts (fn _ => |
62889 | 450 |
(Position.setmp_thread_data pos o Context.setmp_generic_context context) e ())) () |
44110 | 451 |
else Exn.interrupt_exn; |
50914
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
452 |
in assign_result group result (identify_result pos res) end; |
44110 | 453 |
in (result, job) end; |
454 |
||
455 |
||
29366 | 456 |
(* fork *) |
457 |
||
44427 | 458 |
type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}; |
459 |
val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true}; |
|
44113 | 460 |
|
44427 | 461 |
fun forks ({name, group, deps, pri, interrupts}: params) es = |
41674 | 462 |
if null es then [] |
463 |
else |
|
464 |
let |
|
465 |
val grp = |
|
466 |
(case group of |
|
467 |
NONE => worker_subgroup () |
|
468 |
| SOME grp => grp); |
|
41708 | 469 |
fun enqueue e queue = |
41674 | 470 |
let |
54649
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
471 |
val atts = |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
472 |
if interrupts |
62923 | 473 |
then Thread_Attributes.private_interrupts |
474 |
else Thread_Attributes.no_interrupts; |
|
54649
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
475 |
val (result, job) = future_job grp atts e; |
41708 | 476 |
val (task, queue') = Task_Queue.enqueue name grp deps pri job queue; |
41683
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
wenzelm
parents:
41681
diff
changeset
|
477 |
val future = Future {promised = false, task = task, result = result}; |
41708 | 478 |
in (future, queue') end; |
41674 | 479 |
in |
480 |
SYNCHRONIZED "enqueue" (fn () => |
|
481 |
let |
|
41708 | 482 |
val (futures, queue') = fold_map enqueue es (! queue); |
483 |
val _ = queue := queue'; |
|
484 |
val minimal = forall (not o Task_Queue.known_task queue') deps; |
|
41674 | 485 |
val _ = if minimal then signal work_available else (); |
486 |
val _ = scheduler_check (); |
|
487 |
in futures end) |
|
488 |
end; |
|
28162 | 489 |
|
50983 | 490 |
fun fork e = |
491 |
(singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e; |
|
28186 | 492 |
|
493 |
||
29366 | 494 |
(* join *) |
495 |
||
32099 | 496 |
fun get_result x = |
497 |
(case peek x of |
|
37852
a902f158b4fc
eliminated old-style sys_error/SYS_ERROR in favour of exception Fail -- after careful checking that there is no overlap with existing handling of that;
wenzelm
parents:
37690
diff
changeset
|
498 |
NONE => Exn.Exn (Fail "Unfinished future") |
39232
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
499 |
| SOME res => |
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
500 |
if Exn.is_interrupt_exn res then |
44247 | 501 |
(case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of |
53190
5d92649a310e
simplified Goal.forked_proofs: status is determined via group instead of dummy future (see also Pure/PIDE/execution.ML);
wenzelm
parents:
53189
diff
changeset
|
502 |
[] => res |
5d92649a310e
simplified Goal.forked_proofs: status is determined via group instead of dummy future (see also Pure/PIDE/execution.ML);
wenzelm
parents:
53189
diff
changeset
|
503 |
| exns => Exn.Exn (Par_Exn.make exns)) |
39232
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
wenzelm
parents:
38236
diff
changeset
|
504 |
else res); |
28186 | 505 |
|
68196 | 506 |
fun get_finished x = Exn.release (get_result x); |
507 |
||
49935
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
508 |
local |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
509 |
|
59465 | 510 |
fun join_next atts deps = (*requires SYNCHRONIZED*) |
41695
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
wenzelm
parents:
41683
diff
changeset
|
511 |
if null deps then NONE |
32224 | 512 |
else |
41681
b5d7b15166bf
Future.join_results: discontinued post-hoc recording of dynamic dependencies;
wenzelm
parents:
41680
diff
changeset
|
513 |
(case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of |
41695
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
wenzelm
parents:
41683
diff
changeset
|
514 |
(NONE, []) => NONE |
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
wenzelm
parents:
41683
diff
changeset
|
515 |
| (NONE, deps') => |
59465 | 516 |
(worker_waiting deps' (fn () => |
62923 | 517 |
Thread_Attributes.with_attributes atts (fn _ => |
59465 | 518 |
Exn.release (worker_wait Waiting work_finished))); |
519 |
join_next atts deps') |
|
32224 | 520 |
| (SOME work, deps') => SOME (work, deps')); |
32095 | 521 |
|
59465 | 522 |
fun join_loop atts deps = |
523 |
(case SYNCHRONIZED "join" (fn () => join_next atts deps) of |
|
524 |
NONE => () |
|
525 |
| SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps')); |
|
32814
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
wenzelm
parents:
32738
diff
changeset
|
526 |
|
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
527 |
in |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
528 |
|
29366 | 529 |
fun join_results xs = |
41679 | 530 |
let |
531 |
val _ = |
|
532 |
if forall is_finished xs then () |
|
59465 | 533 |
else if is_some (worker_task ()) then |
62923 | 534 |
Thread_Attributes.with_attributes Thread_Attributes.no_interrupts |
59465 | 535 |
(fn orig_atts => join_loop orig_atts (map task_of xs)) |
66958
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
536 |
else |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
537 |
xs |> List.app |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
538 |
(fn Value _ => () |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
539 |
| Future {result, ...} => ignore (Single_Assignment.await result)); |
41679 | 540 |
in map get_result xs end; |
28186 | 541 |
|
29551
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
542 |
end; |
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
wenzelm
parents:
29431
diff
changeset
|
543 |
|
28647
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
544 |
fun join_result x = singleton join_results x; |
44330 | 545 |
fun joins xs = Par_Exn.release_all (join_results xs); |
28647
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
wenzelm
parents:
28645
diff
changeset
|
546 |
fun join x = Exn.release (join_result x); |
28156 | 547 |
|
29366 | 548 |
|
67658 | 549 |
(* forked results: nested parallel evaluation *) |
550 |
||
551 |
fun forked_results {name, deps} es = |
|
552 |
Thread_Attributes.uninterruptible (fn restore_attributes => fn () => |
|
553 |
let |
|
554 |
val (group, pri) = |
|
555 |
(case worker_task () of |
|
556 |
SOME task => |
|
557 |
(new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task) |
|
558 |
| NONE => (new_group NONE, 0)); |
|
559 |
val futures = |
|
560 |
forks {name = name, group = SOME group, deps = deps, pri = pri, interrupts = true} es; |
|
561 |
in |
|
562 |
restore_attributes join_results futures |
|
563 |
handle exn => (if Exn.is_interrupt exn then cancel_group group else (); Exn.reraise exn) |
|
564 |
end) (); |
|
565 |
||
566 |
||
54649
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
567 |
(* task context for running thread *) |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
568 |
|
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
569 |
fun task_context name group f x = |
62923 | 570 |
Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn orig_atts => |
54649
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
571 |
let |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
572 |
val (result, job) = future_job group orig_atts (fn () => f x); |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
573 |
val task = |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
574 |
SYNCHRONIZED "enroll" (fn () => |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
575 |
Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group)); |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
576 |
val _ = worker_exec (task, [job]); |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
577 |
in |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
578 |
(case Single_Assignment.peek result of |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
579 |
NONE => raise Fail "Missing task context result" |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
580 |
| SOME res => Exn.release res) |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
581 |
end); |
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
582 |
|
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
583 |
|
68130
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
584 |
(* scheduling parameters *) |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
585 |
|
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
586 |
fun enabled () = |
68147 | 587 |
let val threads = Multithreading.max_threads () in |
588 |
threads > 1 andalso |
|
589 |
let val lim = threads * Options.default_int "parallel_limit" |
|
590 |
in lim <= 0 orelse SYNCHRONIZED "enabled" (fn () => Task_Queue.total_jobs (! queue) < lim) end |
|
591 |
end; |
|
68130
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
592 |
|
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
593 |
val relevant = (fn [] => false | [_] => false | _ => enabled ()); |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
594 |
|
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
595 |
fun proofs_enabled n = |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
596 |
! Multithreading.parallel_proofs >= n andalso is_some (worker_task ()) andalso enabled (); |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
597 |
|
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
598 |
fun proofs_enabled_timing t = |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
599 |
proofs_enabled 1 andalso Time.toReal t >= Options.default_real "parallel_subproofs_threshold"; |
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
600 |
|
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
601 |
|
51325
bcd6b1aa4db5
more uniform Future.map: always internalize failure;
wenzelm
parents:
51283
diff
changeset
|
602 |
(* fast-path operations -- bypass task queue if possible *) |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
603 |
|
44294
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
wenzelm
parents:
44268
diff
changeset
|
604 |
fun value_result (res: 'a Exn.result) = |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
605 |
let |
45136
2afb928c71ca
static dummy_task (again) to avoid a few extra allocations;
wenzelm
parents:
44427
diff
changeset
|
606 |
val task = Task_Queue.dummy_task; |
41683
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
wenzelm
parents:
41681
diff
changeset
|
607 |
val group = Task_Queue.group_of_task task; |
35016 | 608 |
val result = Single_Assignment.var "value" : 'a result; |
50914
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
wenzelm
parents:
50911
diff
changeset
|
609 |
val _ = assign_result group result (identify_result (Position.thread_data ()) res); |
41683
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
wenzelm
parents:
41681
diff
changeset
|
610 |
in Future {promised = false, task = task, result = result} end; |
29366 | 611 |
|
44294
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
wenzelm
parents:
44268
diff
changeset
|
612 |
fun value x = value_result (Exn.Res x); |
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
wenzelm
parents:
44268
diff
changeset
|
613 |
|
44330 | 614 |
fun cond_forks args es = |
68130
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
wenzelm
parents:
68129
diff
changeset
|
615 |
if enabled () then forks args es |
44330 | 616 |
else map (fn e => value_result (Exn.interruptible_capture e ())) es; |
617 |
||
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
618 |
fun map_future f x = |
51325
bcd6b1aa4db5
more uniform Future.map: always internalize failure;
wenzelm
parents:
51283
diff
changeset
|
619 |
if is_finished x then value_result (Exn.interruptible_capture (f o join) x) |
49935
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
620 |
else |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
621 |
let |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
622 |
val task = task_of x; |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
623 |
val group = Task_Queue.group_of_task task; |
54649
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
wenzelm
parents:
54637
diff
changeset
|
624 |
val (result, job) = |
62923 | 625 |
future_job group Thread_Attributes.private_interrupts (fn () => f (join x)); |
29384
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
wenzelm
parents:
29366
diff
changeset
|
626 |
|
49935
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
627 |
val extended = SYNCHRONIZED "extend" (fn () => |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
628 |
(case Task_Queue.extend task job (! queue) of |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
629 |
SOME queue' => (queue := queue'; true) |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
630 |
| NONE => false)); |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
631 |
in |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
632 |
if extended then Future {promised = false, task = task, result = result} |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
633 |
else |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
634 |
(singleton o cond_forks) |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
635 |
{name = "map_future", group = SOME group, deps = [task], |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
636 |
pri = Task_Queue.pri_of_task task, interrupts = true} |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
637 |
(fn () => f (join x)) |
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
wenzelm
parents:
49910
diff
changeset
|
638 |
end; |
28979
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
wenzelm
parents:
28972
diff
changeset
|
639 |
|
28191 | 640 |
|
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
641 |
(* promised futures -- fulfilled by external means *) |
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
642 |
|
66166 | 643 |
fun promise_name name abort : 'a future = |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
644 |
let |
66166 | 645 |
val group = worker_subgroup (); |
35016 | 646 |
val result = Single_Assignment.var "promise" : 'a result; |
44298
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
wenzelm
parents:
44295
diff
changeset
|
647 |
fun assign () = assign_result group result Exn.interrupt_exn |
39243
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
648 |
handle Fail _ => true |
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
wenzelm
parents:
39232
diff
changeset
|
649 |
| exn => |
44298
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
wenzelm
parents:
44295
diff
changeset
|
650 |
if Exn.is_interrupt exn |
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
wenzelm
parents:
44295
diff
changeset
|
651 |
then raise Fail "Concurrent attempt to fulfill promise" |
62505 | 652 |
else Exn.reraise exn; |
44298
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
wenzelm
parents:
44295
diff
changeset
|
653 |
fun job () = |
62923 | 654 |
Thread_Attributes.with_attributes Thread_Attributes.no_interrupts |
47423
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
wenzelm
parents:
47421
diff
changeset
|
655 |
(fn _ => Exn.release (Exn.capture assign () before abort ())); |
37854
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
wenzelm
parents:
37852
diff
changeset
|
656 |
val task = SYNCHRONIZED "enqueue_passive" (fn () => |
66166 | 657 |
Unsynchronized.change_result queue (Task_Queue.enqueue_passive group name job)); |
41683
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
wenzelm
parents:
41681
diff
changeset
|
658 |
in Future {promised = true, task = task, result = result} end; |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
659 |
|
66166 | 660 |
fun promise abort = promise_name "passive" abort; |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
661 |
|
66958
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
662 |
fun fulfill_result (Future {promised = true, task, result}) res = |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
663 |
let |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
664 |
val group = Task_Queue.group_of_task task; |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
665 |
val pos = Position.thread_data (); |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
666 |
fun job ok = |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
667 |
assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn); |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
668 |
val _ = |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
669 |
Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn _ => |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
670 |
let |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
671 |
val passive_job = |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
672 |
SYNCHRONIZED "fulfill_result" (fn () => |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
673 |
Unsynchronized.change_result queue |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
674 |
(Task_Queue.dequeue_passive (Thread.self ()) task)); |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
675 |
in |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
676 |
(case passive_job of |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
677 |
SOME true => worker_exec (task, [job]) |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
678 |
| SOME false => () |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
679 |
| NONE => ignore (job (not (Task_Queue.is_canceled group)))) |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
680 |
end); |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
681 |
val _ = |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
682 |
if is_some (Single_Assignment.peek result) then () |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
683 |
else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
684 |
in () end |
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
wenzelm
parents:
66378
diff
changeset
|
685 |
| fulfill_result _ _ = raise Fail "Not a promised future"; |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
686 |
|
43761
e72ba84ae58f
tuned signature -- corresponding to Scala version;
wenzelm
parents:
43665
diff
changeset
|
687 |
fun fulfill x res = fulfill_result x (Exn.Res res); |
34277
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
688 |
|
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
wenzelm
parents:
33416
diff
changeset
|
689 |
|
66378 | 690 |
(* snapshot: current tasks of groups *) |
54369 | 691 |
|
68379 | 692 |
fun snapshot [] = [] |
693 |
| snapshot groups = |
|
694 |
SYNCHRONIZED "snapshot" (fn () => |
|
695 |
Task_Queue.group_tasks (! queue) groups); |
|
54369 | 696 |
|
697 |
||
32228
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
wenzelm
parents:
32227
diff
changeset
|
698 |
(* shutdown *) |
29366 | 699 |
|
28203 | 700 |
fun shutdown () = |
62359 | 701 |
if is_some (worker_task ()) then |
51283
fefd07697979
disallow shutdown from worker, which would lead to deadlock since the scheduler cannot terminate;
wenzelm
parents:
51281
diff
changeset
|
702 |
raise Fail "Cannot shutdown while running as worker thread" |
fefd07697979
disallow shutdown from worker, which would lead to deadlock since the scheduler cannot terminate;
wenzelm
parents:
51281
diff
changeset
|
703 |
else |
28276 | 704 |
SYNCHRONIZED "shutdown" (fn () => |
51279 | 705 |
while scheduler_active () do |
59340
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
wenzelm
parents:
59338
diff
changeset
|
706 |
(do_shutdown := true; |
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
wenzelm
parents:
59338
diff
changeset
|
707 |
Multithreading.tracing 1 (fn () => "SHUTDOWN: wait"); |
51283
fefd07697979
disallow shutdown from worker, which would lead to deadlock since the scheduler cannot terminate;
wenzelm
parents:
51281
diff
changeset
|
708 |
wait scheduler_event)); |
28203 | 709 |
|
29366 | 710 |
|
711 |
(*final declarations of this structure!*) |
|
712 |
val map = map_future; |
|
713 |
||
28156 | 714 |
end; |
28972 | 715 |
|
716 |
type 'a future = 'a Future.future; |