| author | paulson <lp15@cam.ac.uk> | 
| Sat, 24 Aug 2024 14:14:44 +0100 | |
| changeset 80756 | 4d592706086e | 
| parent 78787 | a7e4b412cc7c | 
| child 80809 | 4a64fc4d1cde | 
| permissions | -rw-r--r-- | 
| 28156 | 1 | (* Title: Pure/Concurrent/future.ML | 
| 2 | Author: Makarius | |
| 3 | ||
| 57350 | 4 | Value-oriented parallel execution via futures and promises. | 
| 28156 | 5 | *) | 
| 6 | ||
| 7 | signature FUTURE = | |
| 8 | sig | |
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 9 | type task = Task_Queue.task | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 10 | type group = Task_Queue.group | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 11 | val new_group: group option -> group | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 12 | val worker_task: unit -> task option | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 13 | val worker_group: unit -> group option | 
| 52603 | 14 | val the_worker_group: unit -> group | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 15 | val worker_subgroup: unit -> group | 
| 28972 | 16 | type 'a future | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 17 | val task_of: 'a future -> task | 
| 28972 | 18 | val peek: 'a future -> 'a Exn.result option | 
| 19 | val is_finished: 'a future -> bool | |
| 44301 | 20 |   val interruptible_task: ('a -> 'b) -> 'a -> 'b
 | 
| 47404 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 21 | val cancel_group: group -> unit | 
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 22 | val cancel: 'a future -> unit | 
| 56333 
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
 wenzelm parents: 
54671diff
changeset | 23 | val error_message: Position.T -> (serial * string) * string option -> unit | 
| 50914 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 24 | val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result | 
| 44427 | 25 |   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
 | 
| 26 | val default_params: params | |
| 27 | val forks: params -> (unit -> 'a) list -> 'a future list | |
| 32724 | 28 | val fork: (unit -> 'a) -> 'a future | 
| 72085 | 29 | val get_result: 'a future -> 'a Exn.result | 
| 28972 | 30 | val join_results: 'a future list -> 'a Exn.result list | 
| 31 | val join_result: 'a future -> 'a Exn.result | |
| 44330 | 32 | val joins: 'a future list -> 'a list | 
| 28972 | 33 | val join: 'a future -> 'a | 
| 67658 | 34 |   val forked_results: {name: string, deps: Task_Queue.task list} ->
 | 
| 35 | (unit -> 'a) list -> 'a Exn.result list | |
| 54649 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 36 |   val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
 | 
| 68130 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 37 | val enabled: unit -> bool | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 38 | val relevant: 'a list -> bool | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 39 | val proofs_enabled: int -> bool | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 40 | val proofs_enabled_timing: Time.time -> bool | 
| 44294 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 41 | val value_result: 'a Exn.result -> 'a future | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 42 | val value: 'a -> 'a future | 
| 44427 | 43 | val cond_forks: params -> (unit -> 'a) list -> 'a future list | 
| 28972 | 44 |   val map: ('a -> 'b) -> 'a future -> 'b future
 | 
| 66166 | 45 | val promise_name: string -> (unit -> unit) -> 'a future | 
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 46 | val promise: (unit -> unit) -> 'a future | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 47 | val fulfill_result: 'a future -> 'a Exn.result -> unit | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 48 | val fulfill: 'a future -> 'a -> unit | 
| 66378 | 49 | val snapshot: group list -> task list | 
| 28203 | 50 | val shutdown: unit -> unit | 
| 28156 | 51 | end; | 
| 52 | ||
| 53 | structure Future: FUTURE = | |
| 54 | struct | |
| 55 | ||
| 28177 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 56 | (** future values **) | 
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 57 | |
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 58 | type task = Task_Queue.task; | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 59 | type group = Task_Queue.group; | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 60 | val new_group = Task_Queue.new_group; | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 61 | |
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 62 | |
| 28167 | 63 | (* identifiers *) | 
| 64 | ||
| 32058 | 65 | local | 
| 62889 | 66 | val worker_task_var = Thread_Data.var () : task Thread_Data.var; | 
| 32058 | 67 | in | 
| 62889 | 68 | fun worker_task () = Thread_Data.get worker_task_var; | 
| 69 | fun setmp_worker_task task f x = Thread_Data.setmp worker_task_var (SOME task) f x; | |
| 28167 | 70 | end; | 
| 71 | ||
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 72 | val worker_group = Option.map Task_Queue.group_of_task o worker_task; | 
| 52603 | 73 | |
| 74 | fun the_worker_group () = | |
| 75 | (case worker_group () of | |
| 76 | SOME group => group | |
| 77 | | NONE => raise Fail "Missing worker thread context"); | |
| 78 | ||
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 79 | fun worker_subgroup () = new_group (worker_group ()); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 80 | |
| 41679 | 81 | fun worker_joining e = | 
| 82 | (case worker_task () of | |
| 83 | NONE => e () | |
| 84 | | SOME task => Task_Queue.joining task e); | |
| 85 | ||
| 41680 
a4c822915eaa
more informative task timing: some dependency tracking;
 wenzelm parents: 
41679diff
changeset | 86 | fun worker_waiting deps e = | 
| 41670 | 87 | (case worker_task () of | 
| 88 | NONE => e () | |
| 41680 
a4c822915eaa
more informative task timing: some dependency tracking;
 wenzelm parents: 
41679diff
changeset | 89 | | SOME task => Task_Queue.waiting task deps e); | 
| 41670 | 90 | |
| 28167 | 91 | |
| 92 | (* datatype future *) | |
| 93 | ||
| 35016 | 94 | type 'a result = 'a Exn.result Single_Assignment.var; | 
| 95 | ||
| 66958 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 96 | datatype 'a future = | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 97 | Value of 'a Exn.result | | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 98 | Future of | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 99 |    {promised: bool,
 | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 100 | task: task, | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 101 | result: 'a result}; | 
| 28167 | 102 | |
| 66958 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 103 | fun task_of (Value _) = Task_Queue.dummy_task | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 104 |   | task_of (Future {task, ...}) = task;
 | 
| 28167 | 105 | |
| 66958 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 106 | fun peek (Value res) = SOME res | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 107 |   | peek (Future {result, ...}) = Single_Assignment.peek result;
 | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 108 | |
| 28558 | 109 | fun is_finished x = is_some (peek x); | 
| 28320 | 110 | |
| 62663 | 111 | val _ = | 
| 62819 
d3ff367a16a0
careful export of type-dependent functions, without losing their special status;
 wenzelm parents: 
62663diff
changeset | 112 | ML_system_pp (fn depth => fn pretty => fn x => | 
| 62663 | 113 | (case peek x of | 
| 68918 
3a0db30e5d87
simplified signature (again, see 751bcf0473a7): e.g. relevant for non-Isabelle ML environments;
 wenzelm parents: 
68379diff
changeset | 114 | NONE => PolyML.PrettyString "<future>" | 
| 
3a0db30e5d87
simplified signature (again, see 751bcf0473a7): e.g. relevant for non-Isabelle ML environments;
 wenzelm parents: 
68379diff
changeset | 115 | | SOME (Exn.Exn _) => PolyML.PrettyString "<failed>" | 
| 62663 | 116 | | SOME (Exn.Res y) => pretty (y, depth))); | 
| 117 | ||
| 28167 | 118 | |
| 28177 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 119 | |
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 120 | (** scheduling **) | 
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 121 | |
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 122 | (* synchronization *) | 
| 28156 | 123 | |
| 78650 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 wenzelm parents: 
78648diff
changeset | 124 | val scheduler_event = Thread.ConditionVar.conditionVar (); | 
| 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 wenzelm parents: 
78648diff
changeset | 125 | val work_available = Thread.ConditionVar.conditionVar (); | 
| 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 wenzelm parents: 
78648diff
changeset | 126 | val work_finished = Thread.ConditionVar.conditionVar (); | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 127 | |
| 28156 | 128 | local | 
| 78650 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 wenzelm parents: 
78648diff
changeset | 129 | val lock = Thread.Mutex.mutex (); | 
| 28156 | 130 | in | 
| 131 | ||
| 59054 
61b723761dff
load simple_thread.ML later, such that it benefits from redefined print_exception_trace;
 wenzelm parents: 
57350diff
changeset | 132 | fun SYNCHRONIZED name = Multithreading.synchronized name lock; | 
| 28156 | 133 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 134 | fun wait cond = (*requires SYNCHRONIZED*) | 
| 64276 | 135 | Multithreading.sync_wait NONE cond lock; | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 136 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 137 | fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) | 
| 64276 | 138 | Multithreading.sync_wait (SOME (Time.now () + timeout)) cond lock; | 
| 28166 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 wenzelm parents: 
28163diff
changeset | 139 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 140 | fun signal cond = (*requires SYNCHRONIZED*) | 
| 78650 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 wenzelm parents: 
78648diff
changeset | 141 | Thread.ConditionVar.signal cond; | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 142 | |
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 143 | fun broadcast cond = (*requires SYNCHRONIZED*) | 
| 78650 
47d0c333d155
clarified signature: retain original Poly/ML names Thread.Thread, Thread.Mutex, Thread.ConditionVar and de-emphasize them for Isabelle/ML;
 wenzelm parents: 
78648diff
changeset | 144 | Thread.ConditionVar.broadcast cond; | 
| 28156 | 145 | |
| 146 | end; | |
| 147 | ||
| 148 | ||
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 149 | (* global state *) | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 150 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 151 | val queue = Unsynchronized.ref Task_Queue.empty; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 152 | val next = Unsynchronized.ref 0; | 
| 78648 | 153 | val scheduler = Unsynchronized.ref (NONE: Isabelle_Thread.T option); | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 154 | val canceled = Unsynchronized.ref ([]: group list); | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 155 | val do_shutdown = Unsynchronized.ref false; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 156 | val max_workers = Unsynchronized.ref 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 157 | val max_active = Unsynchronized.ref 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 158 | |
| 50280 | 159 | val status_ticks = Unsynchronized.ref 0; | 
| 160 | val last_round = Unsynchronized.ref Time.zeroTime; | |
| 161 | val next_round = seconds 0.05; | |
| 162 | ||
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 163 | datatype worker_state = Working | Waiting | Sleeping; | 
| 78648 | 164 | val workers = Unsynchronized.ref ([]: (Isabelle_Thread.T * worker_state Unsynchronized.ref) list); | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 165 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 166 | fun count_workers state = (*requires SYNCHRONIZED*) | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 167 | fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 168 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 169 | |
| 72112 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 170 | (* ML statistics *) | 
| 50280 | 171 | |
| 72112 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 172 | fun ML_statistics () = (*requires SYNCHRONIZED*) | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 173 | let | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 174 |     val {ready, pending, running, passive, urgent} = Task_Queue.status (! queue);
 | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 175 | val workers_total = length (! workers); | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 176 | val workers_active = count_workers Working; | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 177 | val workers_waiting = count_workers Waiting; | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 178 | in | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 179 | ML_Statistics.set | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 180 |      {tasks_ready = ready,
 | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 181 | tasks_pending = pending, | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 182 | tasks_running = running, | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 183 | tasks_passive = passive, | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 184 | tasks_urgent = urgent, | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 185 | workers_total = workers_total, | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 186 | workers_active = workers_active, | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 187 | workers_waiting = workers_waiting} | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 188 | end; | 
| 50280 | 189 | |
| 190 | ||
| 44110 | 191 | (* cancellation primitives *) | 
| 32099 | 192 | |
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 193 | fun cancel_now group = (*requires SYNCHRONIZED*) | 
| 44341 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 194 | let | 
| 47404 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 195 | val running = Task_Queue.cancel (! queue) group; | 
| 49894 
69bfd86cc711
more robust cancel_now: avoid shooting yourself in the foot;
 wenzelm parents: 
49009diff
changeset | 196 | val _ = running |> List.app (fn thread => | 
| 71692 | 197 | if Isabelle_Thread.is_self thread then () | 
| 78787 | 198 | else Isabelle_Thread.interrupt_thread thread); | 
| 47404 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 199 | in running end; | 
| 44341 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 200 | |
| 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 201 | fun cancel_all () = (*requires SYNCHRONIZED*) | 
| 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 202 | let | 
| 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 203 | val (groups, threads) = Task_Queue.cancel_all (! queue); | 
| 78787 | 204 | val _ = List.app Isabelle_Thread.interrupt_thread threads; | 
| 44341 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 205 | in groups end; | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 206 | |
| 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 207 | fun cancel_later group = (*requires SYNCHRONIZED*) | 
| 32738 | 208 | (Unsynchronized.change canceled (insert Task_Queue.eq_group group); | 
| 209 | broadcast scheduler_event); | |
| 29341 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 wenzelm parents: 
29119diff
changeset | 210 | |
| 44301 | 211 | fun interruptible_task f x = | 
| 62923 | 212 | Thread_Attributes.with_attributes | 
| 62359 | 213 | (if is_some (worker_task ()) | 
| 62923 | 214 | then Thread_Attributes.private_interrupts | 
| 215 | else Thread_Attributes.public_interrupts) | |
| 62359 | 216 | (fn _ => f x) | 
| 78713 | 217 | before Isabelle_Thread.expose_interrupt (); | 
| 44301 | 218 | |
| 219 | ||
| 44110 | 220 | (* worker threads *) | 
| 221 | ||
| 222 | fun worker_exec (task, jobs) = | |
| 28167 | 223 | let | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 224 | val group = Task_Queue.group_of_task task; | 
| 32102 | 225 | val valid = not (Task_Queue.is_canceled group); | 
| 41670 | 226 | val ok = | 
| 227 | Task_Queue.running task (fn () => | |
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 228 | setmp_worker_task task (fn () => | 
| 41670 | 229 | fold (fn job => fn ok => job valid andalso ok) jobs true) ()); | 
| 50975 | 230 | val _ = | 
| 231 | if ! Multithreading.trace >= 2 then | |
| 56333 
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
 wenzelm parents: 
54671diff
changeset | 232 | Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) [] | 
| 50975 | 233 | else (); | 
| 32246 | 234 | val _ = SYNCHRONIZED "finish" (fn () => | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 235 | let | 
| 32738 | 236 | val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); | 
| 78714 | 237 | val test = Isabelle_Thread.expose_interrupt_result (); | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 238 | val _ = | 
| 78766 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 wenzelm parents: 
78760diff
changeset | 239 | if ok andalso not (Exn.is_interrupt_proper_exn test) then () | 
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 240 | else if null (cancel_now group) then () | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 241 | else cancel_later group; | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 242 | val _ = broadcast work_finished; | 
| 33413 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 wenzelm parents: 
33411diff
changeset | 243 | val _ = if maximal then () else signal work_available; | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 244 | in () end); | 
| 28167 | 245 | in () end; | 
| 246 | ||
| 59465 | 247 | fun worker_wait worker_state cond = (*requires SYNCHRONIZED*) | 
| 78648 | 248 | (case AList.lookup Isabelle_Thread.equal (! workers) (Isabelle_Thread.self ()) of | 
| 59465 | 249 | SOME state => Unsynchronized.setmp state worker_state wait cond | 
| 250 | | NONE => wait cond); | |
| 28162 | 251 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 252 | fun worker_next () = (*requires SYNCHRONIZED*) | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 253 | if length (! workers) > ! max_workers then | 
| 78648 | 254 | (Unsynchronized.change workers (AList.delete Isabelle_Thread.equal (Isabelle_Thread.self ())); | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 255 | signal work_available; | 
| 28167 | 256 | NONE) | 
| 28166 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 wenzelm parents: 
28163diff
changeset | 257 | else | 
| 60610 
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
 wenzelm parents: 
59468diff
changeset | 258 | let val urgent_only = count_workers Working > ! max_active in | 
| 78648 | 259 | (case Unsynchronized.change_result queue | 
| 260 | (Task_Queue.dequeue (Isabelle_Thread.self ()) urgent_only) of | |
| 60610 
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
 wenzelm parents: 
59468diff
changeset | 261 | NONE => (worker_wait Sleeping work_available; worker_next ()) | 
| 
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
 wenzelm parents: 
59468diff
changeset | 262 | | some => (signal work_available; some)) | 
| 
f52b4b0c10c4
improved scheduling for urgent tasks, using farm of replacement threads (may lead to factor 2 overloading, but CPUs are usually hyperthreaded);
 wenzelm parents: 
59468diff
changeset | 263 | end; | 
| 28156 | 264 | |
| 28167 | 265 | fun worker_loop name = | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 266 | (case SYNCHRONIZED name (fn () => worker_next ()) of | 
| 29119 | 267 | NONE => () | 
| 44295 
e43f0ea90c9a
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
 wenzelm parents: 
44294diff
changeset | 268 | | SOME work => (worker_exec work; worker_loop name)); | 
| 28156 | 269 | |
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 270 | fun worker_start name = (*requires SYNCHRONIZED*) | 
| 59468 
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
 wenzelm parents: 
59467diff
changeset | 271 | let | 
| 78753 | 272 | val worker = Isabelle_Thread.fork (Isabelle_Thread.params "worker") (fn () => worker_loop name); | 
| 59468 
fe6651760643
explicit threads_stack_limit (for recent Poly/ML SVN versions), which leads to soft interrupt instead of exhaustion of virtual memory, which is particularly relevant for the bigger address space of x86_64;
 wenzelm parents: 
59467diff
changeset | 273 | in Unsynchronized.change workers (cons (worker, Unsynchronized.ref Working)) end | 
| 59338 
2ea1bf517842
discontinued worker_trend: prefer constant number of active + reserve threads;
 wenzelm parents: 
59330diff
changeset | 274 | handle Fail msg => Multithreading.tracing 0 (fn () => "SCHEDULER: " ^ msg); | 
| 59330 
cb3a4caf206d
permissive worker_start: failure to fork thread is deferred to later attempt to provide missing threads, without crashing scheduler;
 wenzelm parents: 
59180diff
changeset | 275 | |
| 28156 | 276 | |
| 277 | (* scheduler *) | |
| 278 | ||
| 72078 
b8d0b8659e0a
more robust scheduler shutdown, notably for spurious crashes;
 wenzelm parents: 
72038diff
changeset | 279 | fun scheduler_end () = (*requires SYNCHRONIZED*) | 
| 72112 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 280 | (ML_statistics (); scheduler := NONE); | 
| 72078 
b8d0b8659e0a
more robust scheduler shutdown, notably for spurious crashes;
 wenzelm parents: 
72038diff
changeset | 281 | |
| 78759 | 282 | fun scheduler_next0 () = (*requires SYNCHRONIZED*) | 
| 28156 | 283 | let | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 284 | val now = Time.now (); | 
| 62826 | 285 | val tick = ! last_round + next_round <= now; | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 286 | val _ = if tick then last_round := now else (); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 287 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 288 | |
| 50280 | 289 | (* runtime status *) | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 290 | |
| 32226 | 291 | val _ = | 
| 51046 | 292 | if tick then Unsynchronized.change status_ticks (fn i => i + 1) else (); | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 293 | val _ = | 
| 72112 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 294 | if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 5) = 0 | 
| 
3546dd4ade74
ML statistics via external process: allows monitoring RTS while ML program sleeps;
 wenzelm parents: 
72086diff
changeset | 295 | then ML_statistics () else (); | 
| 32053 | 296 | |
| 28191 | 297 | val _ = | 
| 78648 | 298 | if not tick orelse forall (Isabelle_Thread.is_active o #1) (! workers) then () | 
| 32095 | 299 | else | 
| 33409 | 300 | let | 
| 78648 | 301 | val (alive, dead) = List.partition (Isabelle_Thread.is_active o #1) (! workers); | 
| 33409 | 302 | val _ = workers := alive; | 
| 303 | in | |
| 304 | Multithreading.tracing 0 (fn () => | |
| 51279 | 305 | "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads") | 
| 33409 | 306 | end; | 
| 28191 | 307 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 308 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 309 | (* worker pool adjustments *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 310 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 311 | val max_active0 = ! max_active; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 312 | val max_workers0 = ! max_workers; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 313 | |
| 72086 
41e1e2395a67
avoid exhaustion of worker threads, notably due to complex interaction of future/promise/lazy in Proofterm.make_thm_node;
 wenzelm parents: 
72085diff
changeset | 314 | val workers_waiting = count_workers Waiting; | 
| 
41e1e2395a67
avoid exhaustion of worker threads, notably due to complex interaction of future/promise/lazy in Proofterm.make_thm_node;
 wenzelm parents: 
72085diff
changeset | 315 | |
| 59340 
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
 wenzelm parents: 
59338diff
changeset | 316 | val m = | 
| 
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
 wenzelm parents: 
59338diff
changeset | 317 | if ! do_shutdown andalso Task_Queue.all_passive (! queue) then 0 | 
| 62925 | 318 | else Multithreading.max_threads (); | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 319 | val _ = max_active := m; | 
| 72086 
41e1e2395a67
avoid exhaustion of worker threads, notably due to complex interaction of future/promise/lazy in Proofterm.make_thm_node;
 wenzelm parents: 
72085diff
changeset | 320 | val _ = max_workers := | 
| 
41e1e2395a67
avoid exhaustion of worker threads, notably due to complex interaction of future/promise/lazy in Proofterm.make_thm_node;
 wenzelm parents: 
72085diff
changeset | 321 | Int.max (2 * m, if workers_waiting > 0 then workers_waiting + 1 else 0); | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 322 | |
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 323 | val missing = ! max_workers - length (! workers); | 
| 28203 | 324 | val _ = | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 325 | if missing > 0 then | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 326 | funpow missing (fn () => | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 327 |           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
 | 
| 28203 | 328 | else (); | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 329 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 330 | val _ = | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 331 | if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 332 | else signal work_available; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 333 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 334 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 335 | (* canceled groups *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 336 | |
| 32225 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 337 | val _ = | 
| 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 338 | if null (! canceled) then () | 
| 32293 | 339 | else | 
| 340 | (Multithreading.tracing 1 (fn () => | |
| 341 | string_of_int (length (! canceled)) ^ " canceled groups"); | |
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 342 | Unsynchronized.change canceled (filter_out (null o cancel_now)); | 
| 51281 
c05f7e1dd602
signal work_available should be sufficient to initiate daisy-chained workers, and lead to separate broadcast work_finished eventually -- NB: broadcasting all worker threads tends to burn parallel CPU cycles;
 wenzelm parents: 
51280diff
changeset | 343 | signal work_available); | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 344 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 345 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 346 | (* delay loop *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 347 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 348 | val _ = Exn.release (wait_timeout next_round scheduler_event); | 
| 28167 | 349 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 350 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 351 | (* shutdown *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 352 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 353 | val continue = not (! do_shutdown andalso null (! workers)); | 
| 72078 
b8d0b8659e0a
more robust scheduler shutdown, notably for spurious crashes;
 wenzelm parents: 
72038diff
changeset | 354 | val _ = if continue then () else scheduler_end (); | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 355 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 356 | val _ = broadcast scheduler_event; | 
| 78759 | 357 | in continue end; | 
| 358 | ||
| 359 | fun scheduler_next () = | |
| 360 | (case Exn.capture_body scheduler_next0 of | |
| 361 | Exn.Res res => res | |
| 362 | | Exn.Exn exn => | |
| 363 | (Multithreading.tracing 1 (fn () => "SCHEDULER: " ^ General.exnMessage exn); | |
| 78766 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 wenzelm parents: 
78760diff
changeset | 364 | if Exn.is_interrupt_proper exn then | 
| 78759 | 365 | (List.app cancel_later (cancel_all ()); | 
| 366 | signal work_available; true) | |
| 367 | else | |
| 368 | (scheduler_end (); | |
| 369 | Exn.reraise exn))); | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 370 | |
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 371 | fun scheduler_loop () = | 
| 44173 
aaaa13e297dc
immediate fork of initial workers -- avoid 5 ticks (250ms) for adaptive scheme (a07558eb5029);
 wenzelm parents: 
44115diff
changeset | 372 | (while | 
| 62923 | 373 | Thread_Attributes.with_attributes | 
| 374 | (Thread_Attributes.sync_interrupts Thread_Attributes.public_interrupts) | |
| 33416 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 375 | (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) | 
| 50429 
f8cd5e53653b
final report_status within SYNCHRONIZED part of scheduler loop: required for sanity of data;
 wenzelm parents: 
50280diff
changeset | 376 | do (); last_round := Time.zeroTime); | 
| 28156 | 377 | |
| 28203 | 378 | fun scheduler_active () = (*requires SYNCHRONIZED*) | 
| 78648 | 379 | (case ! scheduler of NONE => false | SOME thread => Isabelle_Thread.is_active thread); | 
| 28203 | 380 | |
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 381 | fun scheduler_check () = (*requires SYNCHRONIZED*) | 
| 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 382 | (do_shutdown := false; | 
| 32248 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 wenzelm parents: 
32247diff
changeset | 383 | if scheduler_active () then () | 
| 78753 | 384 | else scheduler := SOME (Isabelle_Thread.fork (Isabelle_Thread.params "scheduler") scheduler_loop)); | 
| 28156 | 385 | |
| 44301 | 386 | |
| 387 | ||
| 388 | (** futures **) | |
| 389 | ||
| 390 | (* cancel *) | |
| 391 | ||
| 49906 | 392 | fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*) | 
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 393 | let | 
| 47421 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 wenzelm parents: 
47404diff
changeset | 394 | val _ = if null (cancel_now group) then () else cancel_later group; | 
| 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 wenzelm parents: 
47404diff
changeset | 395 | val _ = signal work_available; | 
| 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 wenzelm parents: 
47404diff
changeset | 396 | val _ = scheduler_check (); | 
| 49906 | 397 | in () end; | 
| 398 | ||
| 399 | fun cancel_group group = | |
| 400 | SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group); | |
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 401 | |
| 44301 | 402 | fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); | 
| 29366 | 403 | |
| 28156 | 404 | |
| 50914 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 405 | (* results *) | 
| 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 406 | |
| 56333 
38f1422ef473
support bulk messages consisting of small string segments, which are more healthy to the Poly/ML RTS and might prevent spurious GC crashes such as MTGCProcessMarkPointers::ScanAddressesInObject;
 wenzelm parents: 
54671diff
changeset | 407 | fun error_message pos ((serial, msg), exec_id) = | 
| 50916 
fd902b616b48
proper runtime position (cf. fe4714886d92 and Toplevel.error_msg) -- to make error messages actually appear in the document;
 wenzelm parents: 
50914diff
changeset | 408 | Position.setmp_thread_data pos (fn () => | 
| 74166 | 409 | let val id = Position.id_of pos in | 
| 50931 
a7484a7b6c8a
clarified Future.error_msg: slightly more robust id check, actually suppress displaced messages;
 wenzelm parents: 
50916diff
changeset | 410 | if is_none id orelse is_none exec_id orelse id = exec_id | 
| 54387 | 411 | then Output.error_message' (serial, msg) else () | 
| 50931 
a7484a7b6c8a
clarified Future.error_msg: slightly more robust id check, actually suppress displaced messages;
 wenzelm parents: 
50916diff
changeset | 412 | end) (); | 
| 44110 | 413 | |
| 50914 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 414 | fun identify_result pos res = | 
| 61077 
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
 wenzelm parents: 
60911diff
changeset | 415 | res |> Exn.map_exn (fn exn => | 
| 
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
 wenzelm parents: 
60911diff
changeset | 416 | let val exec_id = | 
| 74166 | 417 | (case Position.id_of pos of | 
| 61077 
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
 wenzelm parents: 
60911diff
changeset | 418 | NONE => [] | 
| 
06cca32aa519
thread context for exceptions from forks, e.g. relevant when printing errors;
 wenzelm parents: 
60911diff
changeset | 419 | | SOME id => [(Markup.exec_idN, id)]) | 
| 78679 | 420 | in Exn_Properties.identify exec_id exn end); | 
| 50914 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 421 | |
| 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 422 | fun assign_result group result res = | 
| 44110 | 423 | let | 
| 78759 | 424 | val _ = | 
| 425 | (case Exn.capture_body (fn () => Single_Assignment.assign result res) of | |
| 426 | Exn.Exn (exn as Fail _) => | |
| 427 | (case Single_Assignment.peek result of | |
| 78766 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 wenzelm parents: 
78760diff
changeset | 428 | SOME (Exn.Exn e) => Exn.reraise (if Exn.is_interrupt_proper e then e else exn) | 
| 78759 | 429 | | _ => Exn.reraise exn) | 
| 430 | | _ => ()); | |
| 44110 | 431 | val ok = | 
| 432 | (case the (Single_Assignment.peek result) of | |
| 44111 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 wenzelm parents: 
44110diff
changeset | 433 | Exn.Exn exn => | 
| 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 wenzelm parents: 
44110diff
changeset | 434 | (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false) | 
| 44110 | 435 | | Exn.Res _ => true); | 
| 436 | in ok end; | |
| 437 | ||
| 50914 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 438 | |
| 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 439 | (* future jobs *) | 
| 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 440 | |
| 54649 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 441 | fun future_job group atts (e: unit -> 'a) = | 
| 44110 | 442 | let | 
| 443 | val result = Single_Assignment.var "future" : 'a result; | |
| 444 | val pos = Position.thread_data (); | |
| 62889 | 445 | val context = Context.get_generic_context (); | 
| 44110 | 446 | fun job ok = | 
| 447 | let | |
| 448 | val res = | |
| 449 | if ok then | |
| 78757 | 450 | Exn.capture_body (fn () => | 
| 62923 | 451 | Thread_Attributes.with_attributes atts (fn _ => | 
| 78757 | 452 | (Position.setmp_thread_data pos o Context.setmp_generic_context context) e ())) | 
| 78681 | 453 | else Isabelle_Thread.interrupt_exn; | 
| 50914 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 454 | in assign_result group result (identify_result pos res) end; | 
| 44110 | 455 | in (result, job) end; | 
| 456 | ||
| 457 | ||
| 29366 | 458 | (* fork *) | 
| 459 | ||
| 44427 | 460 | type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
 | 
| 461 | val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
 | |
| 44113 | 462 | |
| 44427 | 463 | fun forks ({name, group, deps, pri, interrupts}: params) es =
 | 
| 41674 | 464 | if null es then [] | 
| 465 | else | |
| 466 | let | |
| 467 | val grp = | |
| 468 | (case group of | |
| 469 | NONE => worker_subgroup () | |
| 470 | | SOME grp => grp); | |
| 41708 | 471 | fun enqueue e queue = | 
| 41674 | 472 | let | 
| 54649 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 473 | val atts = | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 474 | if interrupts | 
| 62923 | 475 | then Thread_Attributes.private_interrupts | 
| 476 | else Thread_Attributes.no_interrupts; | |
| 54649 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 477 | val (result, job) = future_job grp atts e; | 
| 41708 | 478 | val (task, queue') = Task_Queue.enqueue name grp deps pri job queue; | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 479 |           val future = Future {promised = false, task = task, result = result};
 | 
| 41708 | 480 | in (future, queue') end; | 
| 41674 | 481 | in | 
| 482 | SYNCHRONIZED "enqueue" (fn () => | |
| 483 | let | |
| 41708 | 484 | val (futures, queue') = fold_map enqueue es (! queue); | 
| 485 | val _ = queue := queue'; | |
| 486 | val minimal = forall (not o Task_Queue.known_task queue') deps; | |
| 41674 | 487 | val _ = if minimal then signal work_available else (); | 
| 488 | val _ = scheduler_check (); | |
| 489 | in futures end) | |
| 490 | end; | |
| 28162 | 491 | |
| 50983 | 492 | fun fork e = | 
| 493 |   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
 | |
| 28186 | 494 | |
| 495 | ||
| 29366 | 496 | (* join *) | 
| 497 | ||
| 32099 | 498 | fun get_result x = | 
| 499 | (case peek x of | |
| 37852 
a902f158b4fc
eliminated old-style sys_error/SYS_ERROR in favour of exception Fail -- after careful checking that there is no overlap with existing handling of that;
 wenzelm parents: 
37690diff
changeset | 500 | NONE => Exn.Exn (Fail "Unfinished future") | 
| 78760 | 501 | | SOME result => | 
| 78766 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 wenzelm parents: 
78760diff
changeset | 502 | if Exn.is_interrupt_proper_exn result then | 
| 44247 | 503 | (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of | 
| 78760 | 504 | [] => result | 
| 53190 
5d92649a310e
simplified Goal.forked_proofs: status is determined via group instead of dummy future (see also Pure/PIDE/execution.ML);
 wenzelm parents: 
53189diff
changeset | 505 | | exns => Exn.Exn (Par_Exn.make exns)) | 
| 78760 | 506 | else result); | 
| 28186 | 507 | |
| 49935 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 508 | local | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 509 | |
| 59465 | 510 | fun join_next atts deps = (*requires SYNCHRONIZED*) | 
| 41695 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 511 | if null deps then NONE | 
| 32224 | 512 | else | 
| 78648 | 513 | (case Unsynchronized.change_result queue | 
| 514 | (Task_Queue.dequeue_deps (Isabelle_Thread.self ()) deps) of | |
| 41695 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 515 | (NONE, []) => NONE | 
| 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 516 | | (NONE, deps') => | 
| 59465 | 517 | (worker_waiting deps' (fn () => | 
| 62923 | 518 | Thread_Attributes.with_attributes atts (fn _ => | 
| 59465 | 519 | Exn.release (worker_wait Waiting work_finished))); | 
| 520 | join_next atts deps') | |
| 32224 | 521 | | (SOME work, deps') => SOME (work, deps')); | 
| 32095 | 522 | |
| 59465 | 523 | fun join_loop atts deps = | 
| 524 | (case SYNCHRONIZED "join" (fn () => join_next atts deps) of | |
| 525 | NONE => () | |
| 526 | | SOME (work, deps') => (worker_joining (fn () => worker_exec work); join_loop atts deps')); | |
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 527 | |
| 29551 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 528 | in | 
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 529 | |
| 29366 | 530 | fun join_results xs = | 
| 41679 | 531 | let | 
| 532 | val _ = | |
| 533 | if forall is_finished xs then () | |
| 59465 | 534 | else if is_some (worker_task ()) then | 
| 62923 | 535 | Thread_Attributes.with_attributes Thread_Attributes.no_interrupts | 
| 59465 | 536 | (fn orig_atts => join_loop orig_atts (map task_of xs)) | 
| 66958 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 537 | else | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 538 | xs |> List.app | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 539 | (fn Value _ => () | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 540 |             | Future {result, ...} => ignore (Single_Assignment.await result));
 | 
| 41679 | 541 | in map get_result xs end; | 
| 28186 | 542 | |
| 29551 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 543 | end; | 
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 544 | |
| 28647 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 wenzelm parents: 
28645diff
changeset | 545 | fun join_result x = singleton join_results x; | 
| 44330 | 546 | fun joins xs = Par_Exn.release_all (join_results xs); | 
| 28647 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 wenzelm parents: 
28645diff
changeset | 547 | fun join x = Exn.release (join_result x); | 
| 28156 | 548 | |
| 29366 | 549 | |
| 67658 | 550 | (* forked results: nested parallel evaluation *) | 
| 551 | ||
| 552 | fun forked_results {name, deps} es =
 | |
| 78720 | 553 | Thread_Attributes.uninterruptible_body (fn run => | 
| 67658 | 554 | let | 
| 555 | val (group, pri) = | |
| 556 | (case worker_task () of | |
| 557 | SOME task => | |
| 558 | (new_group (SOME (Task_Queue.group_of_task task)), Task_Queue.pri_of_task task) | |
| 559 | | NONE => (new_group NONE, 0)); | |
| 560 | val futures = | |
| 561 |         forks {name = name, group = SOME group, deps = deps, pri = pri, interrupts = true} es;
 | |
| 562 | in | |
| 78759 | 563 | (case Exn.capture_body (fn () => run join_results futures) of | 
| 564 | Exn.Res res => res | |
| 78766 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 wenzelm parents: 
78760diff
changeset | 565 | | Exn.Exn exn => | 
| 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 wenzelm parents: 
78760diff
changeset | 566 | (if Exn.is_interrupt_proper exn then cancel_group group else (); | 
| 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 wenzelm parents: 
78760diff
changeset | 567 | Exn.reraise exn)) | 
| 78720 | 568 | end); | 
| 67658 | 569 | |
| 570 | ||
| 54649 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 571 | (* task context for running thread *) | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 572 | |
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 573 | fun task_context name group f x = | 
| 62923 | 574 | Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn orig_atts => | 
| 54649 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 575 | let | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 576 | val (result, job) = future_job group orig_atts (fn () => f x); | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 577 | val task = | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 578 | SYNCHRONIZED "enroll" (fn () => | 
| 78648 | 579 | Unsynchronized.change_result queue (Task_Queue.enroll (Isabelle_Thread.self ()) name group)); | 
| 54649 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 580 | val _ = worker_exec (task, [job]); | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 581 | in | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 582 | (case Single_Assignment.peek result of | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 583 | NONE => raise Fail "Missing task context result" | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 584 | | SOME res => Exn.release res) | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 585 | end); | 
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 586 | |
| 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 587 | |
| 68130 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 588 | (* scheduling parameters *) | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 589 | |
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 590 | fun enabled () = | 
| 68147 | 591 | let val threads = Multithreading.max_threads () in | 
| 592 | threads > 1 andalso | |
| 593 | let val lim = threads * Options.default_int "parallel_limit" | |
| 594 | in lim <= 0 orelse SYNCHRONIZED "enabled" (fn () => Task_Queue.total_jobs (! queue) < lim) end | |
| 595 | end; | |
| 68130 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 596 | |
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 597 | val relevant = (fn [] => false | [_] => false | _ => enabled ()); | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 598 | |
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 599 | fun proofs_enabled n = | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 600 | ! Multithreading.parallel_proofs >= n andalso is_some (worker_task ()) andalso enabled (); | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 601 | |
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 602 | fun proofs_enabled_timing t = | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 603 | proofs_enabled 1 andalso Time.toReal t >= Options.default_real "parallel_subproofs_threshold"; | 
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 604 | |
| 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 605 | |
| 51325 
bcd6b1aa4db5
more uniform Future.map: always internalize failure;
 wenzelm parents: 
51283diff
changeset | 606 | (* fast-path operations -- bypass task queue if possible *) | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 607 | |
| 44294 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 608 | fun value_result (res: 'a Exn.result) = | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 609 | let | 
| 45136 
2afb928c71ca
static dummy_task (again) to avoid a few extra allocations;
 wenzelm parents: 
44427diff
changeset | 610 | val task = Task_Queue.dummy_task; | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 611 | val group = Task_Queue.group_of_task task; | 
| 35016 | 612 | val result = Single_Assignment.var "value" : 'a result; | 
| 50914 
fe4714886d92
identify future results more carefully, to avoid odd duplication of error messages, notably from forked goals;
 wenzelm parents: 
50911diff
changeset | 613 | val _ = assign_result group result (identify_result (Position.thread_data ()) res); | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 614 |   in Future {promised = false, task = task, result = result} end;
 | 
| 29366 | 615 | |
| 44294 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 616 | fun value x = value_result (Exn.Res x); | 
| 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 617 | |
| 44330 | 618 | fun cond_forks args es = | 
| 68130 
6fb85346cb79
clarified future scheduling parameters, with support for parallel_limit;
 wenzelm parents: 
68129diff
changeset | 619 | if enabled () then forks args es | 
| 78705 
fde0b195cb7d
clarified signature: avoid association with potentially dangerous Exn.capture;
 wenzelm parents: 
78681diff
changeset | 620 | else map (fn e => value_result (Exn.result e ())) es; | 
| 44330 | 621 | |
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 622 | fun map_future f x = | 
| 78705 
fde0b195cb7d
clarified signature: avoid association with potentially dangerous Exn.capture;
 wenzelm parents: 
78681diff
changeset | 623 | if is_finished x then value_result (Exn.result (f o join) x) | 
| 49935 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 624 | else | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 625 | let | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 626 | val task = task_of x; | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 627 | val group = Task_Queue.group_of_task task; | 
| 54649 
99b9249b3e05
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
 wenzelm parents: 
54637diff
changeset | 628 | val (result, job) = | 
| 62923 | 629 | future_job group Thread_Attributes.private_interrupts (fn () => f (join x)); | 
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 630 | |
| 49935 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 631 | val extended = SYNCHRONIZED "extend" (fn () => | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 632 | (case Task_Queue.extend task job (! queue) of | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 633 | SOME queue' => (queue := queue'; true) | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 634 | | NONE => false)); | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 635 | in | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 636 |       if extended then Future {promised = false, task = task, result = result}
 | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 637 | else | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 638 | (singleton o cond_forks) | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 639 |           {name = "map_future", group = SOME group, deps = [task],
 | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 640 | pri = Task_Queue.pri_of_task task, interrupts = true} | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 641 | (fn () => f (join x)) | 
| 
d1ecb3554b25
clarified Future.map (again): finished value is mapped in-place, which saves task structures and changes error behaviour slightly (tolerance against canceled group of old value etc.);
 wenzelm parents: 
49910diff
changeset | 642 | end; | 
| 28979 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 wenzelm parents: 
28972diff
changeset | 643 | |
| 28191 | 644 | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 645 | (* promised futures -- fulfilled by external means *) | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 646 | |
| 66166 | 647 | fun promise_name name abort : 'a future = | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 648 | let | 
| 66166 | 649 | val group = worker_subgroup (); | 
| 35016 | 650 | val result = Single_Assignment.var "promise" : 'a result; | 
| 78759 | 651 | fun assign () = | 
| 652 | (case Exn.capture_body (fn () => assign_result group result Isabelle_Thread.interrupt_exn) of | |
| 653 | Exn.Res res => res | |
| 654 | | Exn.Exn (Fail _) => true | |
| 655 | | Exn.Exn exn => | |
| 78766 
5578341489cb
further clarification of Exn.is_interrupt_proper vs. overall Exn.is_interrupt;
 wenzelm parents: 
78760diff
changeset | 656 | if Exn.is_interrupt_proper exn | 
| 78759 | 657 | then raise Fail "Concurrent attempt to fulfill promise" | 
| 658 | else Exn.reraise exn); | |
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 659 | fun job () = | 
| 62923 | 660 | Thread_Attributes.with_attributes Thread_Attributes.no_interrupts | 
| 47423 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 661 | (fn _ => Exn.release (Exn.capture assign () before abort ())); | 
| 37854 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 662 | val task = SYNCHRONIZED "enqueue_passive" (fn () => | 
| 66166 | 663 | Unsynchronized.change_result queue (Task_Queue.enqueue_passive group name job)); | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 664 |   in Future {promised = true, task = task, result = result} end;
 | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 665 | |
| 66166 | 666 | fun promise abort = promise_name "passive" abort; | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 667 | |
| 66958 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 668 | fun fulfill_result (Future {promised = true, task, result}) res =
 | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 669 | let | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 670 | val group = Task_Queue.group_of_task task; | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 671 | val pos = Position.thread_data (); | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 672 | fun job ok = | 
| 78681 | 673 | assign_result group result | 
| 674 | (if ok then identify_result pos res else Isabelle_Thread.interrupt_exn); | |
| 66958 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 675 | val _ = | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 676 | Thread_Attributes.with_attributes Thread_Attributes.no_interrupts (fn _ => | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 677 | let | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 678 | val passive_job = | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 679 | SYNCHRONIZED "fulfill_result" (fn () => | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 680 | Unsynchronized.change_result queue | 
| 78648 | 681 | (Task_Queue.dequeue_passive (Isabelle_Thread.self ()) task)); | 
| 66958 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 682 | in | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 683 | (case passive_job of | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 684 | SOME true => worker_exec (task, [job]) | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 685 | | SOME false => () | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 686 | | NONE => ignore (job (not (Task_Queue.is_canceled group)))) | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 687 | end); | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 688 | val _ = | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 689 | if is_some (Single_Assignment.peek result) then () | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 690 | else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 691 | in () end | 
| 
86bc3f1ec97e
minor performance tuning: avoid mutable variable for plain value, e.g. relevant for GC;
 wenzelm parents: 
66378diff
changeset | 692 | | fulfill_result _ _ = raise Fail "Not a promised future"; | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 693 | |
| 43761 
e72ba84ae58f
tuned signature -- corresponding to Scala version;
 wenzelm parents: 
43665diff
changeset | 694 | fun fulfill x res = fulfill_result x (Exn.Res res); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 695 | |
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 696 | |
| 66378 | 697 | (* snapshot: current tasks of groups *) | 
| 54369 | 698 | |
| 68379 | 699 | fun snapshot [] = [] | 
| 700 | | snapshot groups = | |
| 701 | SYNCHRONIZED "snapshot" (fn () => | |
| 702 | Task_Queue.group_tasks (! queue) groups); | |
| 54369 | 703 | |
| 704 | ||
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 705 | (* shutdown *) | 
| 29366 | 706 | |
| 28203 | 707 | fun shutdown () = | 
| 62359 | 708 | if is_some (worker_task ()) then | 
| 51283 
fefd07697979
disallow shutdown from worker, which would lead to deadlock since the scheduler cannot terminate;
 wenzelm parents: 
51281diff
changeset | 709 | raise Fail "Cannot shutdown while running as worker thread" | 
| 
fefd07697979
disallow shutdown from worker, which would lead to deadlock since the scheduler cannot terminate;
 wenzelm parents: 
51281diff
changeset | 710 | else | 
| 28276 | 711 | SYNCHRONIZED "shutdown" (fn () => | 
| 51279 | 712 | while scheduler_active () do | 
| 59340 
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
 wenzelm parents: 
59338diff
changeset | 713 | (do_shutdown := true; | 
| 
734bb148503e
explicit shutdown of scheduler and worker thread farm, assuming Session.shutdown() before saving heap image;
 wenzelm parents: 
59338diff
changeset | 714 | Multithreading.tracing 1 (fn () => "SHUTDOWN: wait"); | 
| 72078 
b8d0b8659e0a
more robust scheduler shutdown, notably for spurious crashes;
 wenzelm parents: 
72038diff
changeset | 715 | wait_timeout next_round scheduler_event)); | 
| 28203 | 716 | |
| 29366 | 717 | |
| 718 | (*final declarations of this structure!*) | |
| 719 | val map = map_future; | |
| 720 | ||
| 28156 | 721 | end; | 
| 28972 | 722 | |
| 723 | type 'a future = 'a Future.future; |