| author | haftmann | 
| Tue, 10 Aug 2010 14:53:41 +0200 | |
| changeset 38312 | 9dd57db3c0f2 | 
| parent 38236 | d8c7be27e01d | 
| child 39232 | 69c6d3e87660 | 
| permissions | -rw-r--r-- | 
| 28156 | 1 | (* Title: Pure/Concurrent/future.ML | 
| 2 | Author: Makarius | |
| 3 | ||
| 32246 | 4 | Future values, see also | 
| 5 | http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf | |
| 37904 | 6 | http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf | 
| 28201 | 7 | |
| 8 | Notes: | |
| 9 | ||
| 10 | * Futures are similar to delayed evaluation, i.e. delay/force is | |
| 11 | generalized to fork/join (and variants). The idea is to model | |
| 12 | parallel value-oriented computations, but *not* communicating | |
| 13 | processes. | |
| 14 | ||
| 15 | * Futures are grouped; failure of one group member causes the whole | |
| 32220 | 16 | group to be interrupted eventually. Groups are block-structured. | 
| 28201 | 17 | |
| 18 | * Forked futures are evaluated spontaneously by a farm of worker | |
| 19 | threads in the background; join resynchronizes the computation and | |
| 20 | delivers results (values or exceptions). | |
| 21 | ||
| 22 | * The pool of worker threads is limited, usually in correlation with | |
| 23 | the number of physical cores on the machine. Note that allocation | |
| 24 | of runtime resources is distorted either if workers yield CPU time | |
| 25 | (e.g. via system sleep or wait operations), or if non-worker | |
| 26 | threads contend for significant runtime resources independently. | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 27 | |
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 28 | * Promised futures are fulfilled by external means. There is no | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 29 | associated evaluation task, but other futures can depend on them | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 30 | as usual. | 
| 28156 | 31 | *) | 
| 32 | ||
| 33 | signature FUTURE = | |
| 34 | sig | |
| 29119 | 35 | type task = Task_Queue.task | 
| 36 | type group = Task_Queue.group | |
| 32058 | 37 | val is_worker: unit -> bool | 
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 38 | val worker_task: unit -> Task_Queue.task option | 
| 32102 | 39 | val worker_group: unit -> Task_Queue.group option | 
| 37865 
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
 wenzelm parents: 
37854diff
changeset | 40 | val worker_subgroup: unit -> Task_Queue.group | 
| 28972 | 41 | type 'a future | 
| 42 | val task_of: 'a future -> task | |
| 43 | val group_of: 'a future -> group | |
| 44 | val peek: 'a future -> 'a Exn.result option | |
| 45 | val is_finished: 'a future -> bool | |
| 28979 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 wenzelm parents: 
28972diff
changeset | 46 | val fork_group: group -> (unit -> 'a) -> 'a future | 
| 32724 | 47 | val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future | 
| 28979 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 wenzelm parents: 
28972diff
changeset | 48 | val fork_deps: 'b future list -> (unit -> 'a) -> 'a future | 
| 29119 | 49 | val fork_pri: int -> (unit -> 'a) -> 'a future | 
| 32724 | 50 | val fork: (unit -> 'a) -> 'a future | 
| 28972 | 51 | val join_results: 'a future list -> 'a Exn.result list | 
| 52 | val join_result: 'a future -> 'a Exn.result | |
| 53 | val join: 'a future -> 'a | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 54 | val value: 'a -> 'a future | 
| 28972 | 55 |   val map: ('a -> 'b) -> 'a future -> 'b future
 | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 56 | val promise_group: group -> 'a future | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 57 | val promise: unit -> 'a future | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 58 | val fulfill_result: 'a future -> 'a Exn.result -> unit | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 59 | val fulfill: 'a future -> 'a -> unit | 
| 30618 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 wenzelm parents: 
30612diff
changeset | 60 |   val interruptible_task: ('a -> 'b) -> 'a -> 'b
 | 
| 29431 | 61 | val cancel_group: group -> unit | 
| 28972 | 62 | val cancel: 'a future -> unit | 
| 28203 | 63 | val shutdown: unit -> unit | 
| 38236 
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
 wenzelm parents: 
37904diff
changeset | 64 | val status: (unit -> 'a) -> 'a | 
| 28156 | 65 | end; | 
| 66 | ||
| 67 | structure Future: FUTURE = | |
| 68 | struct | |
| 69 | ||
| 28177 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 70 | (** future values **) | 
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 71 | |
| 28167 | 72 | (* identifiers *) | 
| 73 | ||
| 29119 | 74 | type task = Task_Queue.task; | 
| 75 | type group = Task_Queue.group; | |
| 28167 | 76 | |
| 32058 | 77 | local | 
| 33408 | 78 | val tag = Universal.tag () : (task * group) option Universal.tag; | 
| 32058 | 79 | in | 
| 28177 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 80 | fun thread_data () = the_default NONE (Thread.getLocal tag); | 
| 32058 | 81 | fun setmp_thread_data data f x = | 
| 82 | Library.setmp_thread_data tag (thread_data ()) (SOME data) f x; | |
| 28167 | 83 | end; | 
| 84 | ||
| 32058 | 85 | val is_worker = is_some o thread_data; | 
| 33408 | 86 | val worker_task = Option.map #1 o thread_data; | 
| 87 | val worker_group = Option.map #2 o thread_data; | |
| 37865 
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
 wenzelm parents: 
37854diff
changeset | 88 | fun worker_subgroup () = Task_Queue.new_group (worker_group ()); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 89 | |
| 28167 | 90 | |
| 91 | (* datatype future *) | |
| 92 | ||
| 35016 | 93 | type 'a result = 'a Exn.result Single_Assignment.var; | 
| 94 | ||
| 28972 | 95 | datatype 'a future = Future of | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 96 |  {promised: bool,
 | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 97 | task: task, | 
| 28177 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 98 | group: group, | 
| 35016 | 99 | result: 'a result}; | 
| 28167 | 100 | |
| 101 | fun task_of (Future {task, ...}) = task;
 | |
| 102 | fun group_of (Future {group, ...}) = group;
 | |
| 32253 | 103 | fun result_of (Future {result, ...}) = result;
 | 
| 28167 | 104 | |
| 35016 | 105 | fun peek x = Single_Assignment.peek (result_of x); | 
| 28558 | 106 | fun is_finished x = is_some (peek x); | 
| 28320 | 107 | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 108 | fun assign_result group result res = | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 109 | let | 
| 37854 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 110 | val _ = Single_Assignment.assign result res | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 111 | handle exn as Fail _ => | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 112 | (case Single_Assignment.peek result of | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 113 | SOME (Exn.Exn Exn.Interrupt) => raise Exn.Interrupt | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 114 | | _ => reraise exn); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 115 | val ok = | 
| 37854 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 116 | (case the (Single_Assignment.peek result) of | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 117 | Exn.Exn exn => (Task_Queue.cancel_group group exn; false) | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 118 | | Exn.Result _ => true); | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 119 | in ok end; | 
| 28997 | 120 | |
| 28167 | 121 | |
| 28177 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 122 | |
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 123 | (** scheduling **) | 
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 124 | |
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 125 | (* synchronization *) | 
| 28156 | 126 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 127 | val scheduler_event = ConditionVar.conditionVar (); | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 128 | val work_available = ConditionVar.conditionVar (); | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 129 | val work_finished = ConditionVar.conditionVar (); | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 130 | |
| 28156 | 131 | local | 
| 132 | val lock = Mutex.mutex (); | |
| 133 | in | |
| 134 | ||
| 37216 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 wenzelm parents: 
37182diff
changeset | 135 | fun SYNCHRONIZED name = Simple_Thread.synchronized name lock; | 
| 28156 | 136 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 137 | fun wait cond = (*requires SYNCHRONIZED*) | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 138 | Multithreading.sync_wait NONE NONE cond lock; | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 139 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 140 | fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 141 | Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock; | 
| 28166 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 wenzelm parents: 
28163diff
changeset | 142 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 143 | fun signal cond = (*requires SYNCHRONIZED*) | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 144 | ConditionVar.signal cond; | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 145 | |
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 146 | fun broadcast cond = (*requires SYNCHRONIZED*) | 
| 28166 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 wenzelm parents: 
28163diff
changeset | 147 | ConditionVar.broadcast cond; | 
| 28156 | 148 | |
| 32248 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 wenzelm parents: 
32247diff
changeset | 149 | fun broadcast_work () = (*requires SYNCHRONIZED*) | 
| 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 wenzelm parents: 
32247diff
changeset | 150 | (ConditionVar.broadcast work_available; | 
| 32225 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 151 | ConditionVar.broadcast work_finished); | 
| 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 152 | |
| 28156 | 153 | end; | 
| 154 | ||
| 155 | ||
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 156 | (* global state *) | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 157 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 158 | val queue = Unsynchronized.ref Task_Queue.empty; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 159 | val next = Unsynchronized.ref 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 160 | val scheduler = Unsynchronized.ref (NONE: Thread.thread option); | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 161 | val canceled = Unsynchronized.ref ([]: Task_Queue.group list); | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 162 | val do_shutdown = Unsynchronized.ref false; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 163 | val max_workers = Unsynchronized.ref 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 164 | val max_active = Unsynchronized.ref 0; | 
| 33411 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 165 | val worker_trend = Unsynchronized.ref 0; | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 166 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 167 | datatype worker_state = Working | Waiting | Sleeping; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 168 | val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 169 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 170 | fun count_workers state = (*requires SYNCHRONIZED*) | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 171 | fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 172 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 173 | |
| 32099 | 174 | (* execute future jobs *) | 
| 175 | ||
| 176 | fun future_job group (e: unit -> 'a) = | |
| 177 | let | |
| 35016 | 178 | val result = Single_Assignment.var "future" : 'a result; | 
| 37046 
78d88b670a53
future_job: propagate current Position.thread_data to the forked job -- this is important to provide a default position, e.g. for parallelizied Goal.prove within a package (proper command transactions are wrapped via Toplevel.setmp_thread_position);
 wenzelm parents: 
35016diff
changeset | 179 | val pos = Position.thread_data (); | 
| 32107 
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
 wenzelm parents: 
32102diff
changeset | 180 | fun job ok = | 
| 
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
 wenzelm parents: 
32102diff
changeset | 181 | let | 
| 
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
 wenzelm parents: 
32102diff
changeset | 182 | val res = | 
| 
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
 wenzelm parents: 
32102diff
changeset | 183 | if ok then | 
| 32230 
9f6461b1c9cc
interruptible: Thread.testInterrupt before changing thread attributes;
 wenzelm parents: 
32229diff
changeset | 184 | Exn.capture (fn () => | 
| 37046 
78d88b670a53
future_job: propagate current Position.thread_data to the forked job -- this is important to provide a default position, e.g. for parallelizied Goal.prove within a package (proper command transactions are wrapped via Toplevel.setmp_thread_position);
 wenzelm parents: 
35016diff
changeset | 185 | Multithreading.with_attributes Multithreading.private_interrupts | 
| 
78d88b670a53
future_job: propagate current Position.thread_data to the forked job -- this is important to provide a default position, e.g. for parallelizied Goal.prove within a package (proper command transactions are wrapped via Toplevel.setmp_thread_position);
 wenzelm parents: 
35016diff
changeset | 186 | (fn _ => Position.setmp_thread_data pos e ())) () | 
| 32107 
47d0da617fcc
future_job: tight scope for interrupts, to prevent shooting ourselves in the foot via cancel_group;
 wenzelm parents: 
32102diff
changeset | 187 | else Exn.Exn Exn.Interrupt; | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 188 | in assign_result group result res end; | 
| 32099 | 189 | in (result, job) end; | 
| 28156 | 190 | |
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 191 | fun cancel_now group = (*requires SYNCHRONIZED*) | 
| 34280 
16bf3e9786a3
eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
 wenzelm parents: 
34279diff
changeset | 192 | Task_Queue.cancel (! queue) group; | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 193 | |
| 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 194 | fun cancel_later group = (*requires SYNCHRONIZED*) | 
| 32738 | 195 | (Unsynchronized.change canceled (insert Task_Queue.eq_group group); | 
| 196 | broadcast scheduler_event); | |
| 29341 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 wenzelm parents: 
29119diff
changeset | 197 | |
| 33408 | 198 | fun execute (task, group, jobs) = | 
| 28167 | 199 | let | 
| 32102 | 200 | val valid = not (Task_Queue.is_canceled group); | 
| 33408 | 201 | val ok = setmp_thread_data (task, group) (fn () => | 
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 202 | fold (fn job => fn ok => job valid andalso ok) jobs true) (); | 
| 32246 | 203 | val _ = SYNCHRONIZED "finish" (fn () => | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 204 | let | 
| 32738 | 205 | val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 206 | val _ = | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 207 | if ok then () | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 208 | else if cancel_now group then () | 
| 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 209 | else cancel_later group; | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 210 | val _ = broadcast work_finished; | 
| 33413 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 wenzelm parents: 
33411diff
changeset | 211 | val _ = if maximal then () else signal work_available; | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 212 | in () end); | 
| 28167 | 213 | in () end; | 
| 214 | ||
| 215 | ||
| 216 | (* worker threads *) | |
| 217 | ||
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 218 | fun worker_wait active cond = (*requires SYNCHRONIZED*) | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 219 | let | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 220 | val state = | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 221 | (case AList.lookup Thread.equal (! workers) (Thread.self ()) of | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 222 | SOME state => state | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 223 | | NONE => raise Fail "Unregistered worker thread"); | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 224 | val _ = state := (if active then Waiting else Sleeping); | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 225 | val _ = wait cond; | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 226 | val _ = state := Working; | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 227 | in () end; | 
| 28162 | 228 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 229 | fun worker_next () = (*requires SYNCHRONIZED*) | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 230 | if length (! workers) > ! max_workers then | 
| 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 231 | (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 232 | signal work_available; | 
| 28167 | 233 | NONE) | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 234 | else if count_workers Working > ! max_active then | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 235 | (worker_wait false work_available; worker_next ()) | 
| 28166 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 wenzelm parents: 
28163diff
changeset | 236 | else | 
| 32738 | 237 | (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 238 | NONE => (worker_wait false work_available; worker_next ()) | 
| 33413 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 wenzelm parents: 
33411diff
changeset | 239 | | some => (signal work_available; some)); | 
| 28156 | 240 | |
| 28167 | 241 | fun worker_loop name = | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 242 | (case SYNCHRONIZED name (fn () => worker_next ()) of | 
| 29119 | 243 | NONE => () | 
| 33408 | 244 | | SOME work => (execute work; worker_loop name)); | 
| 28156 | 245 | |
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 246 | fun worker_start name = (*requires SYNCHRONIZED*) | 
| 37216 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 wenzelm parents: 
37182diff
changeset | 247 | Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 248 | Unsynchronized.ref Working)); | 
| 28156 | 249 | |
| 250 | ||
| 251 | (* scheduler *) | |
| 252 | ||
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 253 | val status_ticks = Unsynchronized.ref 0; | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 254 | |
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 255 | val last_round = Unsynchronized.ref Time.zeroTime; | 
| 32248 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 wenzelm parents: 
32247diff
changeset | 256 | val next_round = Time.fromMilliseconds 50; | 
| 32226 | 257 | |
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 258 | fun scheduler_next () = (*requires SYNCHRONIZED*) | 
| 28156 | 259 | let | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 260 | val now = Time.now (); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 261 | val tick = Time.<= (Time.+ (! last_round, next_round), now); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 262 | val _ = if tick then last_round := now else (); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 263 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 264 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 265 | (* queue and worker status *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 266 | |
| 32226 | 267 | val _ = | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 268 | if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 269 | val _ = | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 270 | if tick andalso ! status_ticks = 0 then | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 271 | Multithreading.tracing 1 (fn () => | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 272 | let | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 273 |             val {ready, pending, running, passive} = Task_Queue.status (! queue);
 | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 274 | val total = length (! workers); | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 275 | val active = count_workers Working; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 276 | val waiting = count_workers Waiting; | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 277 | in | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 278 | "SCHEDULE " ^ Time.toString now ^ ": " ^ | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 279 | string_of_int ready ^ " ready, " ^ | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 280 | string_of_int pending ^ " pending, " ^ | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 281 | string_of_int running ^ " running, " ^ | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 282 | string_of_int passive ^ " passive; " ^ | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 283 | string_of_int total ^ " workers, " ^ | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 284 | string_of_int active ^ " active, " ^ | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 285 | string_of_int waiting ^ " waiting " | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 286 | end) | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 287 | else (); | 
| 32053 | 288 | |
| 28191 | 289 | val _ = | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 290 | if forall (Thread.isActive o #1) (! workers) then () | 
| 32095 | 291 | else | 
| 33409 | 292 | let | 
| 37682 | 293 | val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); | 
| 33409 | 294 | val _ = workers := alive; | 
| 295 | in | |
| 296 | Multithreading.tracing 0 (fn () => | |
| 297 | "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads") | |
| 298 | end; | |
| 28191 | 299 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 300 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 301 | (* worker pool adjustments *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 302 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 303 | val max_active0 = ! max_active; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 304 | val max_workers0 = ! max_workers; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 305 | |
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 306 | val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 307 | val _ = max_active := m; | 
| 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 308 | |
| 33411 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 309 | val mm = | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 310 | if ! do_shutdown then 0 | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 311 | else if m = 9999 then 1 | 
| 33413 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 wenzelm parents: 
33411diff
changeset | 312 | else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m); | 
| 33411 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 313 | val _ = | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 314 | if tick andalso mm > ! max_workers then | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 315 | Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1) | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 316 | else if tick andalso mm < ! max_workers then | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 317 | Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 318 | else (); | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 319 | val _ = | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 320 | if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 321 | max_workers := mm | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 322 | else if ! worker_trend > 5 andalso ! max_workers < 2 * m then | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 323 | max_workers := Int.min (mm, 2 * m) | 
| 33411 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 324 | else (); | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 325 | |
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 326 | val missing = ! max_workers - length (! workers); | 
| 28203 | 327 | val _ = | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 328 | if missing > 0 then | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 329 | funpow missing (fn () => | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 330 |           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
 | 
| 28203 | 331 | else (); | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 332 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 333 | val _ = | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 334 | if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 335 | else signal work_available; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 336 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 337 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 338 | (* canceled groups *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 339 | |
| 32225 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 340 | val _ = | 
| 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 341 | if null (! canceled) then () | 
| 32293 | 342 | else | 
| 343 | (Multithreading.tracing 1 (fn () => | |
| 344 | string_of_int (length (! canceled)) ^ " canceled groups"); | |
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 345 | Unsynchronized.change canceled (filter_out cancel_now); | 
| 32293 | 346 | broadcast_work ()); | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 347 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 348 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 349 | (* delay loop *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 350 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 351 | val _ = Exn.release (wait_timeout next_round scheduler_event); | 
| 28167 | 352 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 353 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 354 | (* shutdown *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 355 | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 356 | val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else (); | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 357 | val continue = not (! do_shutdown andalso null (! workers)); | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 358 | val _ = if continue then () else scheduler := NONE; | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 359 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 360 | val _ = broadcast scheduler_event; | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 361 | in continue end | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 362 | handle Exn.Interrupt => | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 363 | (Multithreading.tracing 1 (fn () => "Interrupt"); | 
| 34280 
16bf3e9786a3
eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
 wenzelm parents: 
34279diff
changeset | 364 | List.app cancel_later (Task_Queue.cancel_all (! queue)); | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 365 | broadcast_work (); true); | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 366 | |
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 367 | fun scheduler_loop () = | 
| 33416 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 368 | while | 
| 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 369 | Multithreading.with_attributes | 
| 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 370 | (Multithreading.sync_interrupts Multithreading.public_interrupts) | 
| 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 371 | (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) | 
| 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 372 | do (); | 
| 28156 | 373 | |
| 28203 | 374 | fun scheduler_active () = (*requires SYNCHRONIZED*) | 
| 375 | (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); | |
| 376 | ||
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 377 | fun scheduler_check () = (*requires SYNCHRONIZED*) | 
| 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 378 | (do_shutdown := false; | 
| 32248 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 wenzelm parents: 
32247diff
changeset | 379 | if scheduler_active () then () | 
| 37216 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 wenzelm parents: 
37182diff
changeset | 380 | else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); | 
| 28156 | 381 | |
| 382 | ||
| 29366 | 383 | |
| 384 | (** futures **) | |
| 28156 | 385 | |
| 29366 | 386 | (* fork *) | 
| 387 | ||
| 388 | fun fork_future opt_group deps pri e = | |
| 389 | let | |
| 32102 | 390 | val group = | 
| 391 | (case opt_group of | |
| 37865 
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
 wenzelm parents: 
37854diff
changeset | 392 | NONE => worker_subgroup () | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 393 | | SOME group => group); | 
| 29366 | 394 | val (result, job) = future_job group e; | 
| 32246 | 395 | val task = SYNCHRONIZED "enqueue" (fn () => | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 396 | let | 
| 32738 | 397 | val (task, minimal) = | 
| 398 | Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job); | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 399 | val _ = if minimal then signal work_available else (); | 
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 400 | val _ = scheduler_check (); | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 401 | in task end); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 402 |   in Future {promised = false, task = task, group = group, result = result} end;
 | 
| 28162 | 403 | |
| 29366 | 404 | fun fork_group group e = fork_future (SOME group) [] 0 e; | 
| 32724 | 405 | fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e; | 
| 406 | fun fork_deps deps e = fork_deps_pri deps 0 e; | |
| 407 | fun fork_pri pri e = fork_deps_pri [] pri e; | |
| 408 | fun fork e = fork_deps [] e; | |
| 28186 | 409 | |
| 410 | ||
| 29366 | 411 | (* join *) | 
| 412 | ||
| 29551 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 413 | local | 
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 414 | |
| 32099 | 415 | fun get_result x = | 
| 416 | (case peek x of | |
| 37852 
a902f158b4fc
eliminated old-style sys_error/SYS_ERROR in favour of exception Fail -- after careful checking that there is no overlap with existing handling of that;
 wenzelm parents: 
37690diff
changeset | 417 | NONE => Exn.Exn (Fail "Unfinished future") | 
| 37182 
71c8565dae38
future result: retain plain Interrupt for vacuous group exceptions;
 wenzelm parents: 
37046diff
changeset | 418 | | SOME (exn as Exn.Exn Exn.Interrupt) => | 
| 
71c8565dae38
future result: retain plain Interrupt for vacuous group exceptions;
 wenzelm parents: 
37046diff
changeset | 419 | (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of | 
| 
71c8565dae38
future result: retain plain Interrupt for vacuous group exceptions;
 wenzelm parents: 
37046diff
changeset | 420 | [] => exn | 
| 
71c8565dae38
future result: retain plain Interrupt for vacuous group exceptions;
 wenzelm parents: 
37046diff
changeset | 421 | | exns => Exn.Exn (Exn.EXCEPTIONS exns)) | 
| 32102 | 422 | | SOME res => res); | 
| 28186 | 423 | |
| 32095 | 424 | fun join_next deps = (*requires SYNCHRONIZED*) | 
| 32224 | 425 | if null deps then NONE | 
| 426 | else | |
| 32738 | 427 | (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of | 
| 32224 | 428 | (NONE, []) => NONE | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 429 | | (NONE, deps') => (worker_wait true work_finished; join_next deps') | 
| 32224 | 430 | | (SOME work, deps') => SOME (work, deps')); | 
| 32095 | 431 | |
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 432 | fun execute_work NONE = () | 
| 33408 | 433 | | execute_work (SOME (work, deps')) = (execute work; join_work deps') | 
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 434 | and join_work deps = | 
| 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 435 | execute_work (SYNCHRONIZED "join" (fn () => join_next deps)); | 
| 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 436 | |
| 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 437 | fun join_depend task deps = | 
| 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 438 | execute_work (SYNCHRONIZED "join" (fn () => | 
| 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 439 | (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps))); | 
| 29551 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 440 | |
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 441 | in | 
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 442 | |
| 29366 | 443 | fun join_results xs = | 
| 444 | if forall is_finished xs then map get_result xs | |
| 32246 | 445 | else if Multithreading.self_critical () then | 
| 446 | error "Cannot join future values within critical section" | |
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 447 | else | 
| 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 448 | (case worker_task () of | 
| 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 449 | SOME task => join_depend task (map task_of xs) | 
| 35016 | 450 | | NONE => List.app (ignore o Single_Assignment.await o result_of) xs; | 
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 451 | map get_result xs); | 
| 28186 | 452 | |
| 29551 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 453 | end; | 
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 454 | |
| 28647 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 wenzelm parents: 
28645diff
changeset | 455 | fun join_result x = singleton join_results x; | 
| 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 wenzelm parents: 
28645diff
changeset | 456 | fun join x = Exn.release (join_result x); | 
| 28156 | 457 | |
| 29366 | 458 | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 459 | (* fast-path versions -- bypassing full task management *) | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 460 | |
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 461 | fun value (x: 'a) = | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 462 | let | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 463 | val group = Task_Queue.new_group NONE; | 
| 35016 | 464 | val result = Single_Assignment.var "value" : 'a result; | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 465 | val _ = assign_result group result (Exn.Result x); | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 466 |   in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
 | 
| 29366 | 467 | |
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 468 | fun map_future f x = | 
| 29366 | 469 | let | 
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 470 | val task = task_of x; | 
| 32102 | 471 | val group = Task_Queue.new_group (SOME (group_of x)); | 
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 472 | val (result, job) = future_job group (fn () => f (join x)); | 
| 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 473 | |
| 32246 | 474 | val extended = SYNCHRONIZED "extend" (fn () => | 
| 29366 | 475 | (case Task_Queue.extend task job (! queue) of | 
| 476 | SOME queue' => (queue := queue'; true) | |
| 477 | | NONE => false)); | |
| 478 | in | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 479 |     if extended then Future {promised = false, task = task, group = group, result = result}
 | 
| 32099 | 480 | else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x)) | 
| 29366 | 481 | end; | 
| 28979 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 wenzelm parents: 
28972diff
changeset | 482 | |
| 28191 | 483 | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 484 | (* promised futures -- fulfilled by external means *) | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 485 | |
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 486 | fun promise_group group : 'a future = | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 487 | let | 
| 35016 | 488 | val result = Single_Assignment.var "promise" : 'a result; | 
| 37854 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 489 | fun abort () = assign_result group result (Exn.Exn Exn.Interrupt) handle Fail _ => true; | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 490 | val task = SYNCHRONIZED "enqueue_passive" (fn () => | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 491 | Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort)); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 492 |   in Future {promised = true, task = task, group = group, result = result} end;
 | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 493 | |
| 37865 
3a6ec95a9f68
clarified/exported Future.worker_subgroup, which is already the default for Future.fork;
 wenzelm parents: 
37854diff
changeset | 494 | fun promise () = promise_group (worker_subgroup ()); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 495 | |
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 496 | fun fulfill_result (Future {promised, task, group, result}) res =
 | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 497 | let | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 498 | val _ = promised orelse raise Fail "Not a promised future"; | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 499 | fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt); | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 500 | val _ = execute (task, group, [job]); | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 501 | in () end; | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 502 | |
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 503 | fun fulfill x res = fulfill_result x (Exn.Result res); | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 504 | |
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 505 | |
| 29431 | 506 | (* cancellation *) | 
| 28202 
23cb9a974630
added focus, which indicates a particular collection of high-priority tasks;
 wenzelm parents: 
28201diff
changeset | 507 | |
| 30618 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 wenzelm parents: 
30612diff
changeset | 508 | fun interruptible_task f x = | 
| 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 wenzelm parents: 
30612diff
changeset | 509 | if Multithreading.available then | 
| 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 wenzelm parents: 
30612diff
changeset | 510 | Multithreading.with_attributes | 
| 32058 | 511 | (if is_worker () | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 512 | then Multithreading.private_interrupts | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 513 | else Multithreading.public_interrupts) | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 514 | (fn _ => f x) | 
| 30618 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 wenzelm parents: 
30612diff
changeset | 515 | else interruptible f x; | 
| 
046f4f986fb5
restricted interrupts for tasks running as future worker thread -- attempt to prevent interrupt race conditions;
 wenzelm parents: 
30612diff
changeset | 516 | |
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 517 | (*cancel: present and future group members will be interrupted eventually*) | 
| 37854 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 518 | fun cancel_group group = SYNCHRONIZED "cancel" (fn () => | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 519 | (if cancel_now group then () else cancel_later group; | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 520 | signal work_available; scheduler_check ())); | 
| 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 521 | |
| 29431 | 522 | fun cancel x = cancel_group (group_of x); | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 523 | |
| 29366 | 524 | |
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 525 | (* shutdown *) | 
| 29366 | 526 | |
| 28203 | 527 | fun shutdown () = | 
| 28276 | 528 | if Multithreading.available then | 
| 529 | SYNCHRONIZED "shutdown" (fn () => | |
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 530 | while scheduler_active () do | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 531 | (wait scheduler_event; broadcast_work ())) | 
| 28276 | 532 | else (); | 
| 28203 | 533 | |
| 29366 | 534 | |
| 38236 
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
 wenzelm parents: 
37904diff
changeset | 535 | (* status markup *) | 
| 37690 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 wenzelm parents: 
37682diff
changeset | 536 | |
| 38236 
d8c7be27e01d
explicitly distinguish Output.status (essential feedback) vs. Output.report (useful markup);
 wenzelm parents: 
37904diff
changeset | 537 | fun status e = | 
| 37690 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 wenzelm parents: 
37682diff
changeset | 538 | let | 
| 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 wenzelm parents: 
37682diff
changeset | 539 | val _ = Output.status (Markup.markup Markup.forked ""); | 
| 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 wenzelm parents: 
37682diff
changeset | 540 | val x = e (); (*sic -- report "joined" only for success*) | 
| 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 wenzelm parents: 
37682diff
changeset | 541 | val _ = Output.status (Markup.markup Markup.joined ""); | 
| 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 wenzelm parents: 
37682diff
changeset | 542 | in x end; | 
| 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 wenzelm parents: 
37682diff
changeset | 543 | |
| 
b16231572c61
general Future.report -- also for Toplevel.async_state;
 wenzelm parents: 
37682diff
changeset | 544 | |
| 29366 | 545 | (*final declarations of this structure!*) | 
| 546 | val map = map_future; | |
| 547 | ||
| 28156 | 548 | end; | 
| 28972 | 549 | |
| 550 | type 'a future = 'a Future.future; | |
| 551 |