| author | wenzelm | 
| Tue, 28 Jan 2025 13:42:40 +0100 | |
| changeset 82007 | 04c704a6b193 | 
| parent 80809 | 4a64fc4d1cde | 
| permissions | -rw-r--r-- | 
| 28156 | 1  | 
(* Title: Pure/Concurrent/future.ML  | 
2  | 
Author: Makarius  | 
|
3  | 
||
| 57350 | 4  | 
Value-oriented parallel execution via futures and promises.  | 
| 28156 | 5  | 
*)  | 
6  | 
||
7  | 
signature FUTURE =  | 
|
8  | 
sig  | 
|
| 
44300
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
9  | 
type task = Task_Queue.task  | 
| 
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
10  | 
type group = Task_Queue.group  | 
| 
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
11  | 
val new_group: group option -> group  | 
| 
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
12  | 
val worker_task: unit -> task option  | 
| 
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
13  | 
val worker_group: unit -> group option  | 
| 52603 | 14  | 
val the_worker_group: unit -> group  | 
| 
44300
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
15  | 
val worker_subgroup: unit -> group  | 
| 28972 | 16  | 
type 'a future  | 
| 
44300
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
17  | 
val task_of: 'a future -> task  | 
| 28972 | 18  | 
val peek: 'a future -> 'a Exn.result option  | 
19  | 
val is_finished: 'a future -> bool  | 
|
| 44301 | 20  | 
  val interruptible_task: ('a -> 'b) -> 'a -> 'b
 | 
| 
47404
 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 
wenzelm 
parents: 
45666 
diff
changeset
 | 
21  | 
val cancel_group: group -> unit  | 
| 
 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 
wenzelm 
parents: 
45666 
diff
changeset
 | 
22  | 
val cancel: 'a future -> unit  | 
| 
56333
 
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
 
wenzelm 
parents: 
54671 
diff
changeset
 | 
23  | 
val error_message: Position.T -> (serial * string) * string option -> unit  | 
| 
50914
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
24  | 
val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result  | 
| 44427 | 25  | 
  type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
 | 
26  | 
val default_params: params  | 
|
27  | 
val forks: params -> (unit -> 'a) list -> 'a future list  | 
|
| 32724 | 28  | 
val fork: (unit -> 'a) -> 'a future  | 
| 72085 | 29  | 
val get_result: 'a future -> 'a Exn.result  | 
| 28972 | 30  | 
val join_results: 'a future list -> 'a Exn.result list  | 
31  | 
val join_result: 'a future -> 'a Exn.result  | 
|
| 44330 | 32  | 
val joins: 'a future list -> 'a list  | 
| 28972 | 33  | 
val join: 'a future -> 'a  | 
| 67658 | 34  | 
  val forked_results: {name: string, deps: Task_Queue.task list} ->
 | 
35  | 
(unit -> 'a) list -> 'a Exn.result list  | 
|
| 
54649
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
36  | 
  val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
 | 
| 
68130
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
37  | 
val enabled: unit -> bool  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
38  | 
val relevant: 'a list -> bool  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
39  | 
val proofs_enabled: int -> bool  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
40  | 
val proofs_enabled_timing: Time.time -> bool  | 
| 
44294
 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 
wenzelm 
parents: 
44268 
diff
changeset
 | 
41  | 
val value_result: 'a Exn.result -> 'a future  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
42  | 
val value: 'a -> 'a future  | 
| 44427 | 43  | 
val cond_forks: params -> (unit -> 'a) list -> 'a future list  | 
| 28972 | 44  | 
  val map: ('a -> 'b) -> 'a future -> 'b future
 | 
| 66166 | 45  | 
val promise_name: string -> (unit -> unit) -> 'a future  | 
| 
44298
 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 
wenzelm 
parents: 
44295 
diff
changeset
 | 
46  | 
val promise: (unit -> unit) -> 'a future  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
47  | 
val fulfill_result: 'a future -> 'a Exn.result -> unit  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
48  | 
val fulfill: 'a future -> 'a -> unit  | 
| 66378 | 49  | 
val snapshot: group list -> task list  | 
| 28203 | 50  | 
val shutdown: unit -> unit  | 
| 28156 | 51  | 
end;  | 
52  | 
||
53  | 
structure Future: FUTURE =  | 
|
54  | 
struct  | 
|
55  | 
||
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
56  | 
(** future values **)  | 
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
57  | 
|
| 
44300
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
58  | 
type task = Task_Queue.task;  | 
| 
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
59  | 
type group = Task_Queue.group;  | 
| 
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
60  | 
val new_group = Task_Queue.new_group;  | 
| 
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
61  | 
|
| 
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
62  | 
|
| 28167 | 63  | 
(* identifiers *)  | 
64  | 
||
| 32058 | 65  | 
local  | 
| 62889 | 66  | 
val worker_task_var = Thread_Data.var () : task Thread_Data.var;  | 
| 32058 | 67  | 
in  | 
| 62889 | 68  | 
fun worker_task () = Thread_Data.get worker_task_var;  | 
69  | 
fun setmp_worker_task task f x = Thread_Data.setmp worker_task_var (SOME task) f x;  | 
|
| 28167 | 70  | 
end;  | 
71  | 
||
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
72  | 
val worker_group = Option.map Task_Queue.group_of_task o worker_task;  | 
| 52603 | 73  | 
|
74  | 
fun the_worker_group () =  | 
|
75  | 
(case worker_group () of  | 
|
76  | 
SOME group => group  | 
|
77  | 
| NONE => raise Fail "Missing worker thread context");  | 
|
78  | 
||
| 
44300
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
79  | 
fun worker_subgroup () = new_group (worker_group ());  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
80  | 
|
| 41679 | 81  | 
fun worker_joining e =  | 
82  | 
(case worker_task () of  | 
|
83  | 
NONE => e ()  | 
|
84  | 
| SOME task => Task_Queue.joining task e);  | 
|
85  | 
||
| 
41680
 
a4c822915eaa
more informative task timing: some dependency tracking;
 
wenzelm 
parents: 
41679 
diff
changeset
 | 
86  | 
fun worker_waiting deps e =  | 
| 41670 | 87  | 
(case worker_task () of  | 
88  | 
NONE => e ()  | 
|
| 
41680
 
a4c822915eaa
more informative task timing: some dependency tracking;
 
wenzelm 
parents: 
41679 
diff
changeset
 | 
89  | 
| SOME task => Task_Queue.waiting task deps e);  | 
| 41670 | 90  | 
|
| 28167 | 91  | 
|
92  | 
(* datatype future *)  | 
|
93  | 
||
| 35016 | 94  | 
type 'a result = 'a Exn.result Single_Assignment.var;  | 
95  | 
||
| 
66958
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
96  | 
datatype 'a future =  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
97  | 
Value of 'a Exn.result |  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
98  | 
Future of  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
99  | 
   {promised: bool,
 | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
100  | 
task: task,  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
101  | 
result: 'a result};  | 
| 28167 | 102  | 
|
| 
66958
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
103  | 
fun task_of (Value _) = Task_Queue.dummy_task  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
104  | 
  | task_of (Future {task, ...}) = task;
 | 
| 28167 | 105  | 
|
| 
66958
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
106  | 
fun peek (Value res) = SOME res  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
107  | 
  | peek (Future {result, ...}) = Single_Assignment.peek result;
 | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
108  | 
|
| 28558 | 109  | 
fun is_finished x = is_some (peek x);  | 
| 28320 | 110  | 
|
| 62663 | 111  | 
val _ =  | 
| 
62819
 
d3ff367a16a0
careful export of type-dependent functions, without losing their special status;
 
wenzelm 
parents: 
62663 
diff
changeset
 | 
112  | 
ML_system_pp (fn depth => fn pretty => fn x =>  | 
| 62663 | 113  | 
(case peek x of  | 
| 
80809
 
4a64fc4d1cde
clarified signature: type ML_Pretty.pretty coincides with PolyML.pretty;
 
wenzelm 
parents: 
78787 
diff
changeset
 | 
114  | 
NONE => ML_Pretty.str "<future>"  | 
| 
 
4a64fc4d1cde
clarified signature: type ML_Pretty.pretty coincides with PolyML.pretty;
 
wenzelm 
parents: 
78787 
diff
changeset
 | 
115  | 
| SOME (Exn.Exn _) => ML_Pretty.str "<failed>"  | 
| 62663 | 116  | 
| SOME (Exn.Res y) => pretty (y, depth)));  | 
117  | 
||
| 28167 | 118  | 
|
| 
28177
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
119  | 
|
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
120  | 
(** scheduling **)  | 
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
121  | 
|
| 
 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 
wenzelm 
parents: 
28170 
diff
changeset
 | 
122  | 
(* synchronization *)  | 
| 28156 | 123  | 
|
| 
78650
 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 
wenzelm 
parents: 
78648 
diff
changeset
 | 
124  | 
val scheduler_event = Thread.ConditionVar.conditionVar ();  | 
| 
 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 
wenzelm 
parents: 
78648 
diff
changeset
 | 
125  | 
val work_available = Thread.ConditionVar.conditionVar ();  | 
| 
 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 
wenzelm 
parents: 
78648 
diff
changeset
 | 
126  | 
val work_finished = Thread.ConditionVar.conditionVar ();  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
127  | 
|
| 28156 | 128  | 
local  | 
| 
78650
 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 
wenzelm 
parents: 
78648 
diff
changeset
 | 
129  | 
val lock = Thread.Mutex.mutex ();  | 
| 28156 | 130  | 
in  | 
131  | 
||
| 
59054
 
61b723761dff
load simple_thread.ML later, such that it benefits from redefined print_exception_trace;
 
wenzelm 
parents: 
57350 
diff
changeset
 | 
132  | 
fun SYNCHRONIZED name = Multithreading.synchronized name lock;  | 
| 28156 | 133  | 
|
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
134  | 
fun wait cond = (*requires SYNCHRONIZED*)  | 
| 64276 | 135  | 
Multithreading.sync_wait NONE cond lock;  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
136  | 
|
| 
32295
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
137  | 
fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)  | 
| 64276 | 138  | 
Multithreading.sync_wait (SOME (Time.now () + timeout)) cond lock;  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
139  | 
|
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
140  | 
fun signal cond = (*requires SYNCHRONIZED*)  | 
| 
78650
 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 
wenzelm 
parents: 
78648 
diff
changeset
 | 
141  | 
Thread.ConditionVar.signal cond;  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
142  | 
|
| 
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
143  | 
fun broadcast cond = (*requires SYNCHRONIZED*)  | 
| 
78650
 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 
wenzelm 
parents: 
78648 
diff
changeset
 | 
144  | 
Thread.ConditionVar.broadcast cond;  | 
| 28156 | 145  | 
|
146  | 
end;  | 
|
147  | 
||
148  | 
||
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
149  | 
(* global state *)  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
150  | 
|
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
151  | 
val queue = Unsynchronized.ref Task_Queue.empty;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
152  | 
val next = Unsynchronized.ref 0;  | 
| 78648 | 153  | 
val scheduler = Unsynchronized.ref (NONE: Isabelle_Thread.T option);  | 
| 
44300
 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 
wenzelm 
parents: 
44299 
diff
changeset
 | 
154  | 
val canceled = Unsynchronized.ref ([]: group list);  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
155  | 
val do_shutdown = Unsynchronized.ref false;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
156  | 
val max_workers = Unsynchronized.ref 0;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
157  | 
val max_active = Unsynchronized.ref 0;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
158  | 
|
| 50280 | 159  | 
val status_ticks = Unsynchronized.ref 0;  | 
160  | 
val last_round = Unsynchronized.ref Time.zeroTime;  | 
|
161  | 
val next_round = seconds 0.05;  | 
|
162  | 
||
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
163  | 
datatype worker_state = Working | Waiting | Sleeping;  | 
| 78648 | 164  | 
val workers = Unsynchronized.ref ([]: (Isabelle_Thread.T * worker_state Unsynchronized.ref) list);  | 
| 
33410
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
165  | 
|
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
166  | 
fun count_workers state = (*requires SYNCHRONIZED*)  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
167  | 
fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;  | 
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
168  | 
|
| 
 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 
wenzelm 
parents: 
33409 
diff
changeset
 | 
169  | 
|
| 
72112
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
170  | 
(* ML statistics *)  | 
| 50280 | 171  | 
|
| 
72112
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
172  | 
fun ML_statistics () = (*requires SYNCHRONIZED*)  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
173  | 
let  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
174  | 
    val {ready, pending, running, passive, urgent} = Task_Queue.status (! queue);
 | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
175  | 
val workers_total = length (! workers);  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
176  | 
val workers_active = count_workers Working;  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
177  | 
val workers_waiting = count_workers Waiting;  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
178  | 
in  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
179  | 
ML_Statistics.set  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
180  | 
     {tasks_ready = ready,
 | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
181  | 
tasks_pending = pending,  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
182  | 
tasks_running = running,  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
183  | 
tasks_passive = passive,  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
184  | 
tasks_urgent = urgent,  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
185  | 
workers_total = workers_total,  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
186  | 
workers_active = workers_active,  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
187  | 
workers_waiting = workers_waiting}  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
188  | 
end;  | 
| 50280 | 189  | 
|
190  | 
||
| 44110 | 191  | 
(* cancellation primitives *)  | 
| 32099 | 192  | 
|
| 
34279
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
193  | 
fun cancel_now group = (*requires SYNCHRONIZED*)  | 
| 
44341
 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 
wenzelm 
parents: 
44330 
diff
changeset
 | 
194  | 
let  | 
| 
47404
 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 
wenzelm 
parents: 
45666 
diff
changeset
 | 
195  | 
val running = Task_Queue.cancel (! queue) group;  | 
| 
49894
 
69bfd86cc711
more robust cancel_now: avoid shooting yourself in the foot;
 
wenzelm 
parents: 
49009 
diff
changeset
 | 
196  | 
val _ = running |> List.app (fn thread =>  | 
| 71692 | 197  | 
if Isabelle_Thread.is_self thread then ()  | 
| 78787 | 198  | 
else Isabelle_Thread.interrupt_thread thread);  | 
| 
47404
 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 
wenzelm 
parents: 
45666 
diff
changeset
 | 
199  | 
in running end;  | 
| 
44341
 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 
wenzelm 
parents: 
44330 
diff
changeset
 | 
200  | 
|
| 
 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 
wenzelm 
parents: 
44330 
diff
changeset
 | 
201  | 
fun cancel_all () = (*requires SYNCHRONIZED*)  | 
| 
 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 
wenzelm 
parents: 
44330 
diff
changeset
 | 
202  | 
let  | 
| 
 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 
wenzelm 
parents: 
44330 
diff
changeset
 | 
203  | 
val (groups, threads) = Task_Queue.cancel_all (! queue);  | 
| 78787 | 204  | 
val _ = List.app Isabelle_Thread.interrupt_thread threads;  | 
| 
44341
 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 
wenzelm 
parents: 
44330 
diff
changeset
 | 
205  | 
in groups end;  | 
| 
34279
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
206  | 
|
| 
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
207  | 
fun cancel_later group = (*requires SYNCHRONIZED*)  | 
| 32738 | 208  | 
(Unsynchronized.change canceled (insert Task_Queue.eq_group group);  | 
209  | 
broadcast scheduler_event);  | 
|
| 
29341
 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 
wenzelm 
parents: 
29119 
diff
changeset
 | 
210  | 
|
| 44301 | 211  | 
fun interruptible_task f x =  | 
| 62923 | 212  | 
Thread_Attributes.with_attributes  | 
| 62359 | 213  | 
(if is_some (worker_task ())  | 
| 62923 | 214  | 
then Thread_Attributes.private_interrupts  | 
215  | 
else Thread_Attributes.public_interrupts)  | 
|
| 62359 | 216  | 
(fn _ => f x)  | 
| 78713 | 217  | 
before Isabelle_Thread.expose_interrupt ();  | 
| 44301 | 218  | 
|
219  | 
||
| 44110 | 220  | 
(* worker threads *)  | 
221  | 
||
222  | 
fun worker_exec (task, jobs) =  | 
|
| 28167 | 223  | 
let  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
224  | 
val group = Task_Queue.group_of_task task;  | 
| 32102 | 225  | 
val valid = not (Task_Queue.is_canceled group);  | 
| 41670 | 226  | 
val ok =  | 
227  | 
Task_Queue.running task (fn () =>  | 
|
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
228  | 
setmp_worker_task task (fn () =>  | 
| 41670 | 229  | 
fold (fn job => fn ok => job valid andalso ok) jobs true) ());  | 
| 50975 | 230  | 
val _ =  | 
231  | 
if ! Multithreading.trace >= 2 then  | 
|
| 
56333
 
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
 
wenzelm 
parents: 
54671 
diff
changeset
 | 
232  | 
Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) []  | 
| 50975 | 233  | 
else ();  | 
| 32246 | 234  | 
val _ = SYNCHRONIZED "finish" (fn () =>  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
235  | 
let  | 
| 32738 | 236  | 
val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);  | 
| 78714 | 237  | 
val test = Isabelle_Thread.expose_interrupt_result ();  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
238  | 
val _ =  | 
| 
78766
 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 
wenzelm 
parents: 
78760 
diff
changeset
 | 
239  | 
if ok andalso not (Exn.is_interrupt_proper_exn test) then ()  | 
| 
44299
 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 
wenzelm 
parents: 
44298 
diff
changeset
 | 
240  | 
else if null (cancel_now group) then ()  | 
| 
34279
 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 
wenzelm 
parents: 
34277 
diff
changeset
 | 
241  | 
else cancel_later group;  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
242  | 
val _ = broadcast work_finished;  | 
| 
33413
 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 
wenzelm 
parents: 
33411 
diff
changeset
 | 
243  | 
val _ = if maximal then () else signal work_available;  | 
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
244  | 
in () end);  | 
| 28167 | 245  | 
in () end;  | 
246  | 
||
| 59465 | 247  | 
fun worker_wait worker_state cond = (*requires SYNCHRONIZED*)  | 
| 78648 | 248  | 
(case AList.lookup Isabelle_Thread.equal (! workers) (Isabelle_Thread.self ()) of  | 
| 59465 | 249  | 
SOME state => Unsynchronized.setmp state worker_state wait cond  | 
250  | 
| NONE => wait cond);  | 
|
| 28162 | 251  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
252  | 
fun worker_next () = (*requires SYNCHRONIZED*)  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
253  | 
if length (! workers) > ! max_workers then  | 
| 78648 | 254  | 
(Unsynchronized.change workers (AList.delete Isabelle_Thread.equal (Isabelle_Thread.self ()));  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
255  | 
signal work_available;  | 
| 28167 | 256  | 
NONE)  | 
| 
28166
 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 
wenzelm 
parents: 
28163 
diff
changeset
 | 
257  | 
else  | 
| 
60610
 
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
 
wenzelm 
parents: 
59468 
diff
changeset
 | 
258  | 
let val urgent_only = count_workers Working > ! max_active in  | 
| 78648 | 259  | 
(case Unsynchronized.change_result queue  | 
260  | 
(Task_Queue.dequeue (Isabelle_Thread.self ()) urgent_only) of  | 
|
| 
60610
 
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
 
wenzelm 
parents: 
59468 
diff
changeset
 | 
261  | 
NONE => (worker_wait Sleeping work_available; worker_next ())  | 
| 
 
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
 
wenzelm 
parents: 
59468 
diff
changeset
 | 
262  | 
| some => (signal work_available; some))  | 
| 
 
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
 
wenzelm 
parents: 
59468 
diff
changeset
 | 
263  | 
end;  | 
| 28156 | 264  | 
|
| 28167 | 265  | 
fun worker_loop name =  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
266  | 
(case SYNCHRONIZED name (fn () => worker_next ()) of  | 
| 29119 | 267  | 
NONE => ()  | 
| 
44295
 
e43f0ea90c9a
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
 
wenzelm 
parents: 
44294 
diff
changeset
 | 
268  | 
| SOME work => (worker_exec work; worker_loop name));  | 
| 28156 | 269  | 
|
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
270  | 
fun worker_start name = (*requires SYNCHRONIZED*)  | 
| 
59468
 
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
 
wenzelm 
parents: 
59467 
diff
changeset
 | 
271  | 
let  | 
| 78753 | 272  | 
val worker = Isabelle_Thread.fork (Isabelle_Thread.params "worker") (fn () => worker_loop name);  | 
| 
59468
 
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
 
wenzelm 
parents: 
59467 
diff
changeset
 | 
273  | 
in Unsynchronized.change workers (cons (worker, Unsynchronized.ref Working)) end  | 
| 
59338
 
2ea1bf517842
discontinued worker_trend: prefer constant number of active + reserve threads;
 
wenzelm 
parents: 
59330 
diff
changeset
 | 
274  | 
handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg);  | 
| 
59330
 
cb3a4caf206d
permissive worker_start: failure to fork thread is deferred to later attempt to provide missing threads, without crashing scheduler;
 
wenzelm 
parents: 
59180 
diff
changeset
 | 
275  | 
|
| 28156 | 276  | 
|
277  | 
(* scheduler *)  | 
|
278  | 
||
| 
72078
 
b8d0b8659e0a
more robust scheduler shutdown, notably for spurious crashes;
 
wenzelm 
parents: 
72038 
diff
changeset
 | 
279  | 
fun scheduler_end () = (*requires SYNCHRONIZED*)  | 
| 
72112
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
280  | 
(ML_statistics (); scheduler := NONE);  | 
| 
72078
 
b8d0b8659e0a
more robust scheduler shutdown, notably for spurious crashes;
 
wenzelm 
parents: 
72038 
diff
changeset
 | 
281  | 
|
| 78759 | 282  | 
fun scheduler_next0 () = (*requires SYNCHRONIZED*)  | 
| 28156 | 283  | 
let  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
284  | 
val now = Time.now ();  | 
| 62826 | 285  | 
val tick = ! last_round + next_round <= now;  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
286  | 
val _ = if tick then last_round := now else ();  | 
| 
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
287  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
288  | 
|
| 50280 | 289  | 
(* runtime status *)  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
290  | 
|
| 32226 | 291  | 
val _ =  | 
| 51046 | 292  | 
if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
293  | 
val _ =  | 
| 
72112
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
294  | 
if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 5) = 0  | 
| 
 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 
wenzelm 
parents: 
72086 
diff
changeset
 | 
295  | 
then ML_statistics () else ();  | 
| 32053 | 296  | 
|
| 28191 | 297  | 
val _ =  | 
| 78648 | 298  | 
if not tick orelse forall (Isabelle_Thread.is_active o #1) (! workers) then ()  | 
| 32095 | 299  | 
else  | 
| 33409 | 300  | 
let  | 
| 78648 | 301  | 
val (alive, dead) = List.partition (Isabelle_Thread.is_active o #1) (! workers);  | 
| 33409 | 302  | 
val _ = workers := alive;  | 
303  | 
in  | 
|
304  | 
Multithreading.tracing 0 (fn () =>  | 
|
| 51279 | 305  | 
"SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")  | 
| 33409 | 306  | 
end;  | 
| 28191 | 307  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
308  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
309  | 
(* worker pool adjustments *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
310  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
311  | 
val max_active0 = ! max_active;  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
312  | 
val max_workers0 = ! max_workers;  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
313  | 
|
| 
72086
 
41e1e2395a67
avoid exhaustion of worker threads, notably due to complex interaction of future/promise/lazy in Proofterm.make_thm_node;
 
wenzelm 
parents: 
72085 
diff
changeset
 | 
314  | 
val workers_waiting = count_workers Waiting;  | 
| 
 
41e1e2395a67
avoid exhaustion of worker threads, notably due to complex interaction of future/promise/lazy in Proofterm.make_thm_node;
 
wenzelm 
parents: 
72085 
diff
changeset
 | 
315  | 
|
| 
59340
 
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
 
wenzelm 
parents: 
59338 
diff
changeset
 | 
316  | 
val m =  | 
| 
 
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
 
wenzelm 
parents: 
59338 
diff
changeset
 | 
317  | 
if ! do_shutdown andalso Task_Queue.all_passive (! queue) then 0  | 
| 62925 | 318  | 
else Multithreading.max_threads ();  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
319  | 
val _ = max_active := m;  | 
| 
72086
 
41e1e2395a67
avoid exhaustion of worker threads, notably due to complex interaction of future/promise/lazy in Proofterm.make_thm_node;
 
wenzelm 
parents: 
72085 
diff
changeset
 | 
320  | 
val _ = max_workers :=  | 
| 
 
41e1e2395a67
avoid exhaustion of worker threads, notably due to complex interaction of future/promise/lazy in Proofterm.make_thm_node;
 
wenzelm 
parents: 
72085 
diff
changeset
 | 
321  | 
Int.max (2 * m, if workers_waiting > 0 then workers_waiting + 1 else 0);  | 
| 
33406
 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 
wenzelm 
parents: 
33061 
diff
changeset
 | 
322  | 
|
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
323  | 
val missing = ! max_workers - length (! workers);  | 
| 28203 | 324  | 
val _ =  | 
| 
33407
 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 
wenzelm 
parents: 
33406 
diff
changeset
 | 
325  | 
if missing > 0 then  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
326  | 
funpow missing (fn () =>  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
327  | 
          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
 | 
| 28203 | 328  | 
else ();  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
329  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
330  | 
val _ =  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
331  | 
if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
332  | 
else signal work_available;  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
333  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
334  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
335  | 
(* canceled groups *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
336  | 
|
| 
32225
 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 
wenzelm 
parents: 
32224 
diff
changeset
 | 
337  | 
val _ =  | 
| 
 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 
wenzelm 
parents: 
32224 
diff
changeset
 | 
338  | 
if null (! canceled) then ()  | 
| 32293 | 339  | 
else  | 
340  | 
(Multithreading.tracing 1 (fn () =>  | 
|
341  | 
string_of_int (length (! canceled)) ^ " canceled groups");  | 
|
| 
44299
 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 
wenzelm 
parents: 
44298 
diff
changeset
 | 
342  | 
Unsynchronized.change canceled (filter_out (null o cancel_now));  | 
| 
51281
 
c05f7e1dd602
signal work_available should be sufficient to initiate daisy-chained workers, and lead to separate broadcast work_finished eventually -- NB: broadcasting all worker threads tends to burn parallel CPU cycles;
 
wenzelm 
parents: 
51280 
diff
changeset
 | 
343  | 
signal work_available);  | 
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
344  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
345  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
346  | 
(* delay loop *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
347  | 
|
| 
32295
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
348  | 
val _ = Exn.release (wait_timeout next_round scheduler_event);  | 
| 28167 | 349  | 
|
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
350  | 
|
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
351  | 
(* shutdown *)  | 
| 
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
352  | 
|
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
353  | 
val continue = not (! do_shutdown andalso null (! workers));  | 
| 
72078
 
b8d0b8659e0a
more robust scheduler shutdown, notably for spurious crashes;
 
wenzelm 
parents: 
72038 
diff
changeset
 | 
354  | 
val _ = if continue then () else scheduler_end ();  | 
| 
33415
 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 
wenzelm 
parents: 
33413 
diff
changeset
 | 
355  | 
|
| 
32219
 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 
wenzelm 
parents: 
32186 
diff
changeset
 | 
356  | 
val _ = broadcast scheduler_event;  | 
| 78759 | 357  | 
in continue end;  | 
358  | 
||
359  | 
fun scheduler_next () =  | 
|
360  | 
(case Exn.capture_body scheduler_next0 of  | 
|
361  | 
Exn.Res res => res  | 
|
362  | 
| Exn.Exn exn =>  | 
|
363  | 
(Multithreading.tracing 1 (fn () => "SCHEDULER: " ^ General.exnMessage exn);  | 
|
| 
78766
 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 
wenzelm 
parents: 
78760 
diff
changeset
 | 
364  | 
if Exn.is_interrupt_proper exn then  | 
| 78759 | 365  | 
(List.app cancel_later (cancel_all ());  | 
366  | 
signal work_available; true)  | 
|
367  | 
else  | 
|
368  | 
(scheduler_end ();  | 
|
369  | 
Exn.reraise exn)));  | 
|
| 
32295
 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 
wenzelm 
parents: 
32293 
diff
changeset
 | 
370  | 
|
| 
28206
 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 
wenzelm 
parents: 
28203 
diff
changeset
 | 
371  | 
fun scheduler_loop () =  | 
| 
44173
 
aaaa13e297dc
immediate fork of initial workers -- avoid 5 ticks (250ms) for adaptive scheme (a07558eb5029);
 
wenzelm 
parents: 
44115 
diff
changeset
 | 
372  | 
(while  | 
| 62923 | 373  | 
Thread_Attributes.with_attributes  | 
374  | 
(Thread_Attributes.sync_interrupts Thread_Attributes.public_interrupts)  | 
|
| 
33416
 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 
wenzelm 
parents: 
33415 
diff
changeset
 | 
375  | 
(fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))  | 
| 
50429
 
f8cd5e53653b
final report_status within SYNCHRONIZED part of scheduler loop: required for sanity of data;
 
wenzelm 
parents: 
50280 
diff
changeset
 | 
376  | 
do (); last_round := Time.zeroTime);  | 
| 28156 | 377  | 
|
| 28203 | 378  | 
fun scheduler_active () = (*requires SYNCHRONIZED*)  | 
| 78648 | 379  | 
(case ! scheduler of NONE => false | SOME thread => Isabelle_Thread.is_active thread);  | 
| 28203 | 380  | 
|
| 
32228
 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 
wenzelm 
parents: 
32227 
diff
changeset
 | 
381  | 
fun scheduler_check () = (*requires SYNCHRONIZED*)  | 
| 
 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 
wenzelm 
parents: 
32227 
diff
changeset
 | 
382  | 
(do_shutdown := false;  | 
| 
32248
 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 
wenzelm 
parents: 
32247 
diff
changeset
 | 
383  | 
if scheduler_active () then ()  | 
| 78753 | 384  | 
else scheduler := SOME (Isabelle_Thread.fork (Isabelle_Thread.params "scheduler") scheduler_loop));  | 
| 28156 | 385  | 
|
| 44301 | 386  | 
|
387  | 
||
388  | 
(** futures **)  | 
|
389  | 
||
390  | 
(* cancel *)  | 
|
391  | 
||
| 49906 | 392  | 
fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)  | 
| 
44299
 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 
wenzelm 
parents: 
44298 
diff
changeset
 | 
393  | 
let  | 
| 
47421
 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 
wenzelm 
parents: 
47404 
diff
changeset
 | 
394  | 
val _ = if null (cancel_now group) then () else cancel_later group;  | 
| 
 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 
wenzelm 
parents: 
47404 
diff
changeset
 | 
395  | 
val _ = signal work_available;  | 
| 
 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 
wenzelm 
parents: 
47404 
diff
changeset
 | 
396  | 
val _ = scheduler_check ();  | 
| 49906 | 397  | 
in () end;  | 
398  | 
||
399  | 
fun cancel_group group =  | 
|
400  | 
SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);  | 
|
| 
44299
 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 
wenzelm 
parents: 
44298 
diff
changeset
 | 
401  | 
|
| 44301 | 402  | 
fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));  | 
| 29366 | 403  | 
|
| 28156 | 404  | 
|
| 
50914
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
405  | 
(* results *)  | 
| 
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
406  | 
|
| 
56333
 
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
 
wenzelm 
parents: 
54671 
diff
changeset
 | 
407  | 
fun error_message pos ((serial, msg), exec_id) =  | 
| 
50916
 
fd902b616b48
proper runtime position (cf. fe4714886d92 and Toplevel.error_msg) -- to make error messages actually appear in the document;
 
wenzelm 
parents: 
50914 
diff
changeset
 | 
408  | 
Position.setmp_thread_data pos (fn () =>  | 
| 74166 | 409  | 
let val id = Position.id_of pos in  | 
| 
50931
 
a7484a7b6c8a
clarified Future.error_msg: slightly more robust id check, actually suppress displaced messages;
 
wenzelm 
parents: 
50916 
diff
changeset
 | 
410  | 
if is_none id orelse is_none exec_id orelse id = exec_id  | 
| 54387 | 411  | 
then Output.error_message' (serial, msg) else ()  | 
| 
50931
 
a7484a7b6c8a
clarified Future.error_msg: slightly more robust id check, actually suppress displaced messages;
 
wenzelm 
parents: 
50916 
diff
changeset
 | 
412  | 
end) ();  | 
| 44110 | 413  | 
|
| 
50914
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
414  | 
fun identify_result pos res =  | 
| 
61077
 
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
 
wenzelm 
parents: 
60911 
diff
changeset
 | 
415  | 
res |> Exn.map_exn (fn exn =>  | 
| 
 
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
 
wenzelm 
parents: 
60911 
diff
changeset
 | 
416  | 
let val exec_id =  | 
| 74166 | 417  | 
(case Position.id_of pos of  | 
| 
61077
 
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
 
wenzelm 
parents: 
60911 
diff
changeset
 | 
418  | 
NONE => []  | 
| 
 
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
 
wenzelm 
parents: 
60911 
diff
changeset
 | 
419  | 
| SOME id => [(Markup.exec_idN, id)])  | 
| 78679 | 420  | 
in Exn_Properties.identify exec_id exn end);  | 
| 
50914
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
421  | 
|
| 
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
422  | 
fun assign_result group result res =  | 
| 44110 | 423  | 
let  | 
| 78759 | 424  | 
val _ =  | 
425  | 
(case Exn.capture_body (fn () => Single_Assignment.assign result res) of  | 
|
426  | 
Exn.Exn (exn as Fail _) =>  | 
|
427  | 
(case Single_Assignment.peek result of  | 
|
| 
78766
 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 
wenzelm 
parents: 
78760 
diff
changeset
 | 
428  | 
SOME (Exn.Exn e) => Exn.reraise (if Exn.is_interrupt_proper e then e else exn)  | 
| 78759 | 429  | 
| _ => Exn.reraise exn)  | 
430  | 
| _ => ());  | 
|
| 44110 | 431  | 
val ok =  | 
432  | 
(case the (Single_Assignment.peek result) of  | 
|
| 
44111
 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 
wenzelm 
parents: 
44110 
diff
changeset
 | 
433  | 
Exn.Exn exn =>  | 
| 
 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 
wenzelm 
parents: 
44110 
diff
changeset
 | 
434  | 
(SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)  | 
| 44110 | 435  | 
| Exn.Res _ => true);  | 
436  | 
in ok end;  | 
|
437  | 
||
| 
50914
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
438  | 
|
| 
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
439  | 
(* future jobs *)  | 
| 
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
440  | 
|
| 
54649
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
441  | 
fun future_job group atts (e: unit -> 'a) =  | 
| 44110 | 442  | 
let  | 
443  | 
val result = Single_Assignment.var "future" : 'a result;  | 
|
444  | 
val pos = Position.thread_data ();  | 
|
| 62889 | 445  | 
val context = Context.get_generic_context ();  | 
| 44110 | 446  | 
fun job ok =  | 
447  | 
let  | 
|
448  | 
val res =  | 
|
449  | 
if ok then  | 
|
| 78757 | 450  | 
Exn.capture_body (fn () =>  | 
| 62923 | 451  | 
Thread_Attributes.with_attributes atts (fn _ =>  | 
| 78757 | 452  | 
(Position.setmp_thread_data pos o Context.setmp_generic_context context) e ()))  | 
| 78681 | 453  | 
else Isabelle_Thread.interrupt_exn;  | 
| 
50914
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
454  | 
in assign_result group result (identify_result pos res) end;  | 
| 44110 | 455  | 
in (result, job) end;  | 
456  | 
||
457  | 
||
| 29366 | 458  | 
(* fork *)  | 
459  | 
||
| 44427 | 460  | 
type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
 | 
461  | 
val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
 | 
|
| 44113 | 462  | 
|
| 44427 | 463  | 
fun forks ({name, group, deps, pri, interrupts}: params) es =
 | 
| 41674 | 464  | 
if null es then []  | 
465  | 
else  | 
|
466  | 
let  | 
|
467  | 
val grp =  | 
|
468  | 
(case group of  | 
|
469  | 
NONE => worker_subgroup ()  | 
|
470  | 
| SOME grp => grp);  | 
|
| 41708 | 471  | 
fun enqueue e queue =  | 
| 41674 | 472  | 
let  | 
| 
54649
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
473  | 
val atts =  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
474  | 
if interrupts  | 
| 62923 | 475  | 
then Thread_Attributes.private_interrupts  | 
476  | 
else Thread_Attributes.no_interrupts;  | 
|
| 
54649
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
477  | 
val (result, job) = future_job grp atts e;  | 
| 41708 | 478  | 
val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
479  | 
          val future = Future {promised = false, task = task, result = result};
 | 
| 41708 | 480  | 
in (future, queue') end;  | 
| 41674 | 481  | 
in  | 
482  | 
SYNCHRONIZED "enqueue" (fn () =>  | 
|
483  | 
let  | 
|
| 41708 | 484  | 
val (futures, queue') = fold_map enqueue es (! queue);  | 
485  | 
val _ = queue := queue';  | 
|
486  | 
val minimal = forall (not o Task_Queue.known_task queue') deps;  | 
|
| 41674 | 487  | 
val _ = if minimal then signal work_available else ();  | 
488  | 
val _ = scheduler_check ();  | 
|
489  | 
in futures end)  | 
|
490  | 
end;  | 
|
| 28162 | 491  | 
|
| 50983 | 492  | 
fun fork e =  | 
493  | 
  (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
 | 
|
| 28186 | 494  | 
|
495  | 
||
| 29366 | 496  | 
(* join *)  | 
497  | 
||
| 32099 | 498  | 
fun get_result x =  | 
499  | 
(case peek x of  | 
|
| 
37852
 
a902f158b4fc
eliminated old-style sys_error/SYS_ERROR in favour of exception Fail -- after careful checking that there is no overlap with existing handling of that;
 
wenzelm 
parents: 
37690 
diff
changeset
 | 
500  | 
NONE => Exn.Exn (Fail "Unfinished future")  | 
| 78760 | 501  | 
| SOME result =>  | 
| 
78766
 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 
wenzelm 
parents: 
78760 
diff
changeset
 | 
502  | 
if Exn.is_interrupt_proper_exn result then  | 
| 44247 | 503  | 
(case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of  | 
| 78760 | 504  | 
[] => result  | 
| 
53190
 
5d92649a310e
simplified Goal.forked_proofs: status is determined via group instead of dummy future (see also Pure/PIDE/execution.ML);
 
wenzelm 
parents: 
53189 
diff
changeset
 | 
505  | 
| exns => Exn.Exn (Par_Exn.make exns))  | 
| 78760 | 506  | 
else result);  | 
| 28186 | 507  | 
|
| 
49935
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
508  | 
local  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
509  | 
|
| 59465 | 510  | 
fun join_next atts deps = (*requires SYNCHRONIZED*)  | 
| 
41695
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
511  | 
if null deps then NONE  | 
| 32224 | 512  | 
else  | 
| 78648 | 513  | 
(case Unsynchronized.change_result queue  | 
514  | 
(Task_Queue.dequeue_deps (Isabelle_Thread.self ()) deps) of  | 
|
| 
41695
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
515  | 
(NONE, []) => NONE  | 
| 
 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 
wenzelm 
parents: 
41683 
diff
changeset
 | 
516  | 
| (NONE, deps') =>  | 
| 59465 | 517  | 
(worker_waiting deps' (fn () =>  | 
| 62923 | 518  | 
Thread_Attributes.with_attributes atts (fn _ =>  | 
| 59465 | 519  | 
Exn.release (worker_wait Waiting work_finished)));  | 
520  | 
join_next atts deps')  | 
|
| 32224 | 521  | 
| (SOME work, deps') => SOME (work, deps'));  | 
| 32095 | 522  | 
|
| 59465 | 523  | 
fun join_loop atts deps =  | 
524  | 
(case SYNCHRONIZED "join" (fn () => join_next atts deps) of  | 
|
525  | 
NONE => ()  | 
|
526  | 
| SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps'));  | 
|
| 
32814
 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 
wenzelm 
parents: 
32738 
diff
changeset
 | 
527  | 
|
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
528  | 
in  | 
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
529  | 
|
| 29366 | 530  | 
fun join_results xs =  | 
| 41679 | 531  | 
let  | 
532  | 
val _ =  | 
|
533  | 
if forall is_finished xs then ()  | 
|
| 59465 | 534  | 
else if is_some (worker_task ()) then  | 
| 62923 | 535  | 
Thread_Attributes.with_attributes Thread_Attributes.no_interrupts  | 
| 59465 | 536  | 
(fn orig_atts => join_loop orig_atts (map task_of xs))  | 
| 
66958
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
537  | 
else  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
538  | 
xs |> List.app  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
539  | 
(fn Value _ => ()  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
540  | 
            | Future {result, ...} => ignore (Single_Assignment.await result));
 | 
| 41679 | 541  | 
in map get_result xs end;  | 
| 28186 | 542  | 
|
| 
29551
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
543  | 
end;  | 
| 
 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 
wenzelm 
parents: 
29431 
diff
changeset
 | 
544  | 
|
| 
28647
 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 
wenzelm 
parents: 
28645 
diff
changeset
 | 
545  | 
fun join_result x = singleton join_results x;  | 
| 44330 | 546  | 
fun joins xs = Par_Exn.release_all (join_results xs);  | 
| 
28647
 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 
wenzelm 
parents: 
28645 
diff
changeset
 | 
547  | 
fun join x = Exn.release (join_result x);  | 
| 28156 | 548  | 
|
| 29366 | 549  | 
|
| 67658 | 550  | 
(* forked results: nested parallel evaluation *)  | 
551  | 
||
552  | 
fun forked_results {name, deps} es =
 | 
|
| 78720 | 553  | 
Thread_Attributes.uninterruptible_body (fn run =>  | 
| 67658 | 554  | 
let  | 
555  | 
val (group, pri) =  | 
|
556  | 
(case worker_task () of  | 
|
557  | 
SOME task =>  | 
|
558  | 
(new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task)  | 
|
559  | 
| NONE => (new_group NONE, 0));  | 
|
560  | 
val futures =  | 
|
561  | 
        forks {name = name, group = SOME group, deps = deps, pri = pri, interrupts = true} es;
 | 
|
562  | 
in  | 
|
| 78759 | 563  | 
(case Exn.capture_body (fn () => run join_results futures) of  | 
564  | 
Exn.Res res => res  | 
|
| 
78766
 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 
wenzelm 
parents: 
78760 
diff
changeset
 | 
565  | 
| Exn.Exn exn =>  | 
| 
 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 
wenzelm 
parents: 
78760 
diff
changeset
 | 
566  | 
(if Exn.is_interrupt_proper exn then cancel_group group else ();  | 
| 
 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 
wenzelm 
parents: 
78760 
diff
changeset
 | 
567  | 
Exn.reraise exn))  | 
| 78720 | 568  | 
end);  | 
| 67658 | 569  | 
|
570  | 
||
| 
54649
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
571  | 
(* task context for running thread *)  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
572  | 
|
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
573  | 
fun task_context name group f x =  | 
| 62923 | 574  | 
Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn orig_atts =>  | 
| 
54649
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
575  | 
let  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
576  | 
val (result, job) = future_job group orig_atts (fn () => f x);  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
577  | 
val task =  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
578  | 
SYNCHRONIZED "enroll" (fn () =>  | 
| 78648 | 579  | 
Unsynchronized.change_result queue (Task_Queue.enroll (Isabelle_Thread.self ()) name group));  | 
| 
54649
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
580  | 
val _ = worker_exec (task, [job]);  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
581  | 
in  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
582  | 
(case Single_Assignment.peek result of  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
583  | 
NONE => raise Fail "Missing task context result"  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
584  | 
| SOME res => Exn.release res)  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
585  | 
end);  | 
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
586  | 
|
| 
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
587  | 
|
| 
68130
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
588  | 
(* scheduling parameters *)  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
589  | 
|
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
590  | 
fun enabled () =  | 
| 68147 | 591  | 
let val threads = Multithreading.max_threads () in  | 
592  | 
threads > 1 andalso  | 
|
593  | 
let val lim = threads * Options.default_int "parallel_limit"  | 
|
594  | 
in lim <= 0 orelse SYNCHRONIZED "enabled" (fn () => Task_Queue.total_jobs (! queue) < lim) end  | 
|
595  | 
end;  | 
|
| 
68130
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
596  | 
|
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
597  | 
val relevant = (fn [] => false | [_] => false | _ => enabled ());  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
598  | 
|
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
599  | 
fun proofs_enabled n =  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
600  | 
! Multithreading.parallel_proofs >= n andalso is_some (worker_task ()) andalso enabled ();  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
601  | 
|
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
602  | 
fun proofs_enabled_timing t =  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
603  | 
proofs_enabled 1 andalso Time.toReal t >= Options.default_real "parallel_subproofs_threshold";  | 
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
604  | 
|
| 
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
605  | 
|
| 
51325
 
bcd6b1aa4db5
more uniform Future.map: always internalize failure;
 
wenzelm 
parents: 
51283 
diff
changeset
 | 
606  | 
(* fast-path operations -- bypass task queue if possible *)  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
607  | 
|
| 
44294
 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 
wenzelm 
parents: 
44268 
diff
changeset
 | 
608  | 
fun value_result (res: 'a Exn.result) =  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
609  | 
let  | 
| 
45136
 
2afb928c71ca
static dummy_task (again) to avoid a few extra allocations;
 
wenzelm 
parents: 
44427 
diff
changeset
 | 
610  | 
val task = Task_Queue.dummy_task;  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
611  | 
val group = Task_Queue.group_of_task task;  | 
| 35016 | 612  | 
val result = Single_Assignment.var "value" : 'a result;  | 
| 
50914
 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 
wenzelm 
parents: 
50911 
diff
changeset
 | 
613  | 
val _ = assign_result group result (identify_result (Position.thread_data ()) res);  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
614  | 
  in Future {promised = false, task = task, result = result} end;
 | 
| 29366 | 615  | 
|
| 
44294
 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 
wenzelm 
parents: 
44268 
diff
changeset
 | 
616  | 
fun value x = value_result (Exn.Res x);  | 
| 
 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 
wenzelm 
parents: 
44268 
diff
changeset
 | 
617  | 
|
| 44330 | 618  | 
fun cond_forks args es =  | 
| 
68130
 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 
wenzelm 
parents: 
68129 
diff
changeset
 | 
619  | 
if enabled () then forks args es  | 
| 
78705
 
fde0b195cb7d
clarified signature: avoid association with potentially dangerous Exn.capture;
 
wenzelm 
parents: 
78681 
diff
changeset
 | 
620  | 
else map (fn e => value_result (Exn.result e ())) es;  | 
| 44330 | 621  | 
|
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
622  | 
fun map_future f x =  | 
| 
78705
 
fde0b195cb7d
clarified signature: avoid association with potentially dangerous Exn.capture;
 
wenzelm 
parents: 
78681 
diff
changeset
 | 
623  | 
if is_finished x then value_result (Exn.result (f o join) x)  | 
| 
49935
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
624  | 
else  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
625  | 
let  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
626  | 
val task = task_of x;  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
627  | 
val group = Task_Queue.group_of_task task;  | 
| 
54649
 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 
wenzelm 
parents: 
54637 
diff
changeset
 | 
628  | 
val (result, job) =  | 
| 62923 | 629  | 
future_job group Thread_Attributes.private_interrupts (fn () => f (join x));  | 
| 
29384
 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 
wenzelm 
parents: 
29366 
diff
changeset
 | 
630  | 
|
| 
49935
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
631  | 
val extended = SYNCHRONIZED "extend" (fn () =>  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
632  | 
(case Task_Queue.extend task job (! queue) of  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
633  | 
SOME queue' => (queue := queue'; true)  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
634  | 
| NONE => false));  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
635  | 
in  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
636  | 
      if extended then Future {promised = false, task = task, result = result}
 | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
637  | 
else  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
638  | 
(singleton o cond_forks)  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
639  | 
          {name = "map_future", group = SOME group, deps = [task],
 | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
640  | 
pri = Task_Queue.pri_of_task task, interrupts = true}  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
641  | 
(fn () => f (join x))  | 
| 
 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 
wenzelm 
parents: 
49910 
diff
changeset
 | 
642  | 
end;  | 
| 
28979
 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 
wenzelm 
parents: 
28972 
diff
changeset
 | 
643  | 
|
| 28191 | 644  | 
|
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
645  | 
(* promised futures -- fulfilled by external means *)  | 
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
646  | 
|
| 66166 | 647  | 
fun promise_name name abort : 'a future =  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
648  | 
let  | 
| 66166 | 649  | 
val group = worker_subgroup ();  | 
| 35016 | 650  | 
val result = Single_Assignment.var "promise" : 'a result;  | 
| 78759 | 651  | 
fun assign () =  | 
652  | 
(case Exn.capture_body (fn () => assign_result group result Isabelle_Thread.interrupt_exn) of  | 
|
653  | 
Exn.Res res => res  | 
|
654  | 
| Exn.Exn (Fail _) => true  | 
|
655  | 
| Exn.Exn exn =>  | 
|
| 
78766
 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 
wenzelm 
parents: 
78760 
diff
changeset
 | 
656  | 
if Exn.is_interrupt_proper exn  | 
| 78759 | 657  | 
then raise Fail "Concurrent attempt to fulfill promise"  | 
658  | 
else Exn.reraise exn);  | 
|
| 
44298
 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 
wenzelm 
parents: 
44295 
diff
changeset
 | 
659  | 
fun job () =  | 
| 62923 | 660  | 
Thread_Attributes.with_attributes Thread_Attributes.no_interrupts  | 
| 
47423
 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 
wenzelm 
parents: 
47421 
diff
changeset
 | 
661  | 
(fn _ => Exn.release (Exn.capture assign () before abort ()));  | 
| 
37854
 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 
wenzelm 
parents: 
37852 
diff
changeset
 | 
662  | 
val task = SYNCHRONIZED "enqueue_passive" (fn () =>  | 
| 66166 | 663  | 
Unsynchronized.change_result queue (Task_Queue.enqueue_passive group name job));  | 
| 
41683
 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 
wenzelm 
parents: 
41681 
diff
changeset
 | 
664  | 
  in Future {promised = true, task = task, result = result} end;
 | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
665  | 
|
| 66166 | 666  | 
fun promise abort = promise_name "passive" abort;  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
667  | 
|
| 
66958
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
668  | 
fun fulfill_result (Future {promised = true, task, result}) res =
 | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
669  | 
let  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
670  | 
val group = Task_Queue.group_of_task task;  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
671  | 
val pos = Position.thread_data ();  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
672  | 
fun job ok =  | 
| 78681 | 673  | 
assign_result group result  | 
674  | 
(if ok then identify_result pos res else Isabelle_Thread.interrupt_exn);  | 
|
| 
66958
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
675  | 
val _ =  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
676  | 
Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn _ =>  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
677  | 
let  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
678  | 
val passive_job =  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
679  | 
SYNCHRONIZED "fulfill_result" (fn () =>  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
680  | 
Unsynchronized.change_result queue  | 
| 78648 | 681  | 
(Task_Queue.dequeue_passive (Isabelle_Thread.self ()) task));  | 
| 
66958
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
682  | 
in  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
683  | 
(case passive_job of  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
684  | 
SOME true => worker_exec (task, [job])  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
685  | 
| SOME false => ()  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
686  | 
| NONE => ignore (job (not (Task_Queue.is_canceled group))))  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
687  | 
end);  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
688  | 
val _ =  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
689  | 
if is_some (Single_Assignment.peek result) then ()  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
690  | 
else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
691  | 
in () end  | 
| 
 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 
wenzelm 
parents: 
66378 
diff
changeset
 | 
692  | 
| fulfill_result _ _ = raise Fail "Not a promised future";  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
693  | 
|
| 
43761
 
e72ba84ae58f
tuned signature -- corresponding to Scala version;
 
wenzelm 
parents: 
43665 
diff
changeset
 | 
694  | 
fun fulfill x res = fulfill_result x (Exn.Res res);  | 
| 
34277
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
695  | 
|
| 
 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 
wenzelm 
parents: 
33416 
diff
changeset
 | 
696  | 
|
| 66378 | 697  | 
(* snapshot: current tasks of groups *)  | 
| 54369 | 698  | 
|
| 68379 | 699  | 
fun snapshot [] = []  | 
700  | 
| snapshot groups =  | 
|
701  | 
SYNCHRONIZED "snapshot" (fn () =>  | 
|
702  | 
Task_Queue.group_tasks (! queue) groups);  | 
|
| 54369 | 703  | 
|
704  | 
||
| 
32228
 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 
wenzelm 
parents: 
32227 
diff
changeset
 | 
705  | 
(* shutdown *)  | 
| 29366 | 706  | 
|
| 28203 | 707  | 
fun shutdown () =  | 
| 62359 | 708  | 
if is_some (worker_task ()) then  | 
| 
51283
 
fefd07697979
disallow shutdown from worker, which would lead to deadlock since the scheduler cannot terminate;
 
wenzelm 
parents: 
51281 
diff
changeset
 | 
709  | 
raise Fail "Cannot shutdown while running as worker thread"  | 
| 
 
fefd07697979
disallow shutdown from worker, which would lead to deadlock since the scheduler cannot terminate;
 
wenzelm 
parents: 
51281 
diff
changeset
 | 
710  | 
else  | 
| 28276 | 711  | 
SYNCHRONIZED "shutdown" (fn () =>  | 
| 51279 | 712  | 
while scheduler_active () do  | 
| 
59340
 
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
 
wenzelm 
parents: 
59338 
diff
changeset
 | 
713  | 
(do_shutdown := true;  | 
| 
 
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
 
wenzelm 
parents: 
59338 
diff
changeset
 | 
714  | 
Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");  | 
| 
72078
 
b8d0b8659e0a
more robust scheduler shutdown, notably for spurious crashes;
 
wenzelm 
parents: 
72038 
diff
changeset
 | 
715  | 
wait_timeout next_round scheduler_event));  | 
| 28203 | 716  | 
|
| 29366 | 717  | 
|
718  | 
(*final declarations of this structure!*)  | 
|
719  | 
val map = map_future;  | 
|
720  | 
||
| 28156 | 721  | 
end;  | 
| 28972 | 722  | 
|
723  | 
type 'a future = 'a Future.future;  |