| author | wenzelm | 
| Thu, 13 Sep 2012 16:09:35 +0200 | |
| changeset 49347 | d4768cb77a69 | 
| parent 49009 | 15381ea111ec | 
| child 49894 | 69bfd86cc711 | 
| permissions | -rw-r--r-- | 
| 28156 | 1 | (* Title: Pure/Concurrent/future.ML | 
| 2 | Author: Makarius | |
| 3 | ||
| 44268 | 4 | Value-oriented parallelism via futures and promises. See also | 
| 32246 | 5 | http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf | 
| 37904 | 6 | http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf | 
| 28201 | 7 | |
| 8 | Notes: | |
| 9 | ||
| 10 | * Futures are similar to delayed evaluation, i.e. delay/force is | |
| 44268 | 11 | generalized to fork/join. The idea is to model parallel | 
| 12 | value-oriented computations (not communicating processes). | |
| 28201 | 13 | |
| 14 | * Forked futures are evaluated spontaneously by a farm of worker | |
| 15 | threads in the background; join resynchronizes the computation and | |
| 16 | delivers results (values or exceptions). | |
| 17 | ||
| 18 | * The pool of worker threads is limited, usually in correlation with | |
| 19 | the number of physical cores on the machine. Note that allocation | |
| 44268 | 20 | of runtime resources may be distorted either if workers yield CPU | 
| 21 | time (e.g. via system sleep or wait operations), or if non-worker | |
| 28201 | 22 | threads contend for significant runtime resources independently. | 
| 44268 | 23 | There is a limited number of replacement worker threads that get | 
| 24 | activated in certain explicit wait conditions. | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 25 | |
| 44268 | 26 | * Future tasks are organized in groups, which are block-structured. | 
| 27 | When forking a new new task, the default is to open an individual | |
| 28 | subgroup, unless some common group is specified explicitly. | |
| 29 | Failure of one group member causes the immediate peers to be | |
| 30 | interrupted eventually (i.e. none by default). Interrupted tasks | |
| 31 | that lack regular result information, will pick up parallel | |
| 32 | exceptions from the cumulative group context (as Par_Exn). | |
| 33 | ||
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 34 | * Future task groups may be canceled: present and future group | 
| 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 35 | members will be interrupted eventually. | 
| 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 36 | |
| 44268 | 37 | * Promised "passive" futures are fulfilled by external means. There | 
| 38 | is no associated evaluation task, but other futures can depend on | |
| 39 | them via regular join operations. | |
| 28156 | 40 | *) | 
| 41 | ||
| 42 | signature FUTURE = | |
| 43 | sig | |
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 44 | type task = Task_Queue.task | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 45 | type group = Task_Queue.group | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 46 | val new_group: group option -> group | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 47 | val worker_task: unit -> task option | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 48 | val worker_group: unit -> group option | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 49 | val worker_subgroup: unit -> group | 
| 28972 | 50 | type 'a future | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 51 | val task_of: 'a future -> task | 
| 28972 | 52 | val peek: 'a future -> 'a Exn.result option | 
| 53 | val is_finished: 'a future -> bool | |
| 44301 | 54 |   val interruptible_task: ('a -> 'b) -> 'a -> 'b
 | 
| 47404 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 55 | val cancel_group: group -> unit | 
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 56 | val cancel: 'a future -> unit | 
| 44427 | 57 |   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
 | 
| 58 | val default_params: params | |
| 59 | val forks: params -> (unit -> 'a) list -> 'a future list | |
| 29119 | 60 | val fork_pri: int -> (unit -> 'a) -> 'a future | 
| 32724 | 61 | val fork: (unit -> 'a) -> 'a future | 
| 28972 | 62 | val join_results: 'a future list -> 'a Exn.result list | 
| 63 | val join_result: 'a future -> 'a Exn.result | |
| 44330 | 64 | val joins: 'a future list -> 'a list | 
| 28972 | 65 | val join: 'a future -> 'a | 
| 44301 | 66 | val join_tasks: task list -> unit | 
| 44294 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 67 | val value_result: 'a Exn.result -> 'a future | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 68 | val value: 'a -> 'a future | 
| 44427 | 69 | val cond_forks: params -> (unit -> 'a) list -> 'a future list | 
| 28972 | 70 |   val map: ('a -> 'b) -> 'a future -> 'b future
 | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 71 | val promise_group: group -> (unit -> unit) -> 'a future | 
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 72 | val promise: (unit -> unit) -> 'a future | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 73 | val fulfill_result: 'a future -> 'a Exn.result -> unit | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 74 | val fulfill: 'a future -> 'a -> unit | 
| 28203 | 75 | val shutdown: unit -> unit | 
| 47404 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 76 | val group_tasks: group -> task list | 
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 77 |   val queue_status: unit -> {ready: int, pending: int, running: int, passive: int}
 | 
| 28156 | 78 | end; | 
| 79 | ||
| 80 | structure Future: FUTURE = | |
| 81 | struct | |
| 82 | ||
| 28177 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 83 | (** future values **) | 
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 84 | |
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 85 | type task = Task_Queue.task; | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 86 | type group = Task_Queue.group; | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 87 | val new_group = Task_Queue.new_group; | 
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 88 | |
| 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 89 | |
| 28167 | 90 | (* identifiers *) | 
| 91 | ||
| 32058 | 92 | local | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 93 | val tag = Universal.tag () : task option Universal.tag; | 
| 32058 | 94 | in | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 95 | fun worker_task () = the_default NONE (Thread.getLocal tag); | 
| 44110 | 96 | fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x; | 
| 28167 | 97 | end; | 
| 98 | ||
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 99 | val worker_group = Option.map Task_Queue.group_of_task o worker_task; | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 100 | fun worker_subgroup () = new_group (worker_group ()); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 101 | |
| 41679 | 102 | fun worker_joining e = | 
| 103 | (case worker_task () of | |
| 104 | NONE => e () | |
| 105 | | SOME task => Task_Queue.joining task e); | |
| 106 | ||
| 41680 
a4c822915eaa
more informative task timing: some dependency tracking;
 wenzelm parents: 
41679diff
changeset | 107 | fun worker_waiting deps e = | 
| 41670 | 108 | (case worker_task () of | 
| 109 | NONE => e () | |
| 41680 
a4c822915eaa
more informative task timing: some dependency tracking;
 wenzelm parents: 
41679diff
changeset | 110 | | SOME task => Task_Queue.waiting task deps e); | 
| 41670 | 111 | |
| 28167 | 112 | |
| 113 | (* datatype future *) | |
| 114 | ||
| 35016 | 115 | type 'a result = 'a Exn.result Single_Assignment.var; | 
| 116 | ||
| 28972 | 117 | datatype 'a future = Future of | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 118 |  {promised: bool,
 | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 119 | task: task, | 
| 35016 | 120 | result: 'a result}; | 
| 28167 | 121 | |
| 122 | fun task_of (Future {task, ...}) = task;
 | |
| 32253 | 123 | fun result_of (Future {result, ...}) = result;
 | 
| 28167 | 124 | |
| 35016 | 125 | fun peek x = Single_Assignment.peek (result_of x); | 
| 28558 | 126 | fun is_finished x = is_some (peek x); | 
| 28320 | 127 | |
| 28167 | 128 | |
| 28177 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 129 | |
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 130 | (** scheduling **) | 
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 131 | |
| 
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
 wenzelm parents: 
28170diff
changeset | 132 | (* synchronization *) | 
| 28156 | 133 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 134 | val scheduler_event = ConditionVar.conditionVar (); | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 135 | val work_available = ConditionVar.conditionVar (); | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 136 | val work_finished = ConditionVar.conditionVar (); | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 137 | |
| 28156 | 138 | local | 
| 139 | val lock = Mutex.mutex (); | |
| 140 | in | |
| 141 | ||
| 37216 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 wenzelm parents: 
37182diff
changeset | 142 | fun SYNCHRONIZED name = Simple_Thread.synchronized name lock; | 
| 28156 | 143 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 144 | fun wait cond = (*requires SYNCHRONIZED*) | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 145 | Multithreading.sync_wait NONE NONE cond lock; | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 146 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 147 | fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) | 
| 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 148 | Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock; | 
| 28166 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 wenzelm parents: 
28163diff
changeset | 149 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 150 | fun signal cond = (*requires SYNCHRONIZED*) | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 151 | ConditionVar.signal cond; | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 152 | |
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 153 | fun broadcast cond = (*requires SYNCHRONIZED*) | 
| 28166 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 wenzelm parents: 
28163diff
changeset | 154 | ConditionVar.broadcast cond; | 
| 28156 | 155 | |
| 32248 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 wenzelm parents: 
32247diff
changeset | 156 | fun broadcast_work () = (*requires SYNCHRONIZED*) | 
| 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 wenzelm parents: 
32247diff
changeset | 157 | (ConditionVar.broadcast work_available; | 
| 32225 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 158 | ConditionVar.broadcast work_finished); | 
| 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 159 | |
| 28156 | 160 | end; | 
| 161 | ||
| 162 | ||
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 163 | (* global state *) | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 164 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 165 | val queue = Unsynchronized.ref Task_Queue.empty; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 166 | val next = Unsynchronized.ref 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 167 | val scheduler = Unsynchronized.ref (NONE: Thread.thread option); | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 168 | val canceled = Unsynchronized.ref ([]: group list); | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 169 | val do_shutdown = Unsynchronized.ref false; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 170 | val max_workers = Unsynchronized.ref 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 171 | val max_active = Unsynchronized.ref 0; | 
| 33411 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 172 | val worker_trend = Unsynchronized.ref 0; | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 173 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 174 | datatype worker_state = Working | Waiting | Sleeping; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 175 | val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 176 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 177 | fun count_workers state = (*requires SYNCHRONIZED*) | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 178 | fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 179 | |
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 180 | |
| 44110 | 181 | (* cancellation primitives *) | 
| 32099 | 182 | |
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 183 | fun cancel_now group = (*requires SYNCHRONIZED*) | 
| 44341 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 184 | let | 
| 47404 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 185 | val running = Task_Queue.cancel (! queue) group; | 
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 186 | val _ = List.app Simple_Thread.interrupt_unsynchronized running; | 
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 187 | in running end; | 
| 44341 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 188 | |
| 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 189 | fun cancel_all () = (*requires SYNCHRONIZED*) | 
| 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 190 | let | 
| 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 191 | val (groups, threads) = Task_Queue.cancel_all (! queue); | 
| 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 192 | val _ = List.app Simple_Thread.interrupt_unsynchronized threads; | 
| 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 193 | in groups end; | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 194 | |
| 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 195 | fun cancel_later group = (*requires SYNCHRONIZED*) | 
| 32738 | 196 | (Unsynchronized.change canceled (insert Task_Queue.eq_group group); | 
| 197 | broadcast scheduler_event); | |
| 29341 
6bb007a0f9f2
more reactive scheduler: reduced loop timeout, propagate broadcast interrupt via TaskQueue.cancel_all;
 wenzelm parents: 
29119diff
changeset | 198 | |
| 44301 | 199 | fun interruptible_task f x = | 
| 200 | (if Multithreading.available then | |
| 201 | Multithreading.with_attributes | |
| 202 | (if is_some (worker_task ()) | |
| 203 | then Multithreading.private_interrupts | |
| 204 | else Multithreading.public_interrupts) | |
| 205 | (fn _ => f x) | |
| 206 | else interruptible f x) | |
| 207 | before Multithreading.interrupted (); | |
| 208 | ||
| 209 | ||
| 44110 | 210 | (* worker threads *) | 
| 211 | ||
| 212 | fun worker_exec (task, jobs) = | |
| 28167 | 213 | let | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 214 | val group = Task_Queue.group_of_task task; | 
| 32102 | 215 | val valid = not (Task_Queue.is_canceled group); | 
| 41670 | 216 | val ok = | 
| 217 | Task_Queue.running task (fn () => | |
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 218 | setmp_worker_task task (fn () => | 
| 41670 | 219 | fold (fn job => fn ok => job valid andalso ok) jobs true) ()); | 
| 41776 | 220 | val _ = Multithreading.tracing 2 (fn () => | 
| 41670 | 221 | let | 
| 43951 | 222 | val s = Task_Queue.str_of_task_groups task; | 
| 41670 | 223 | fun micros time = string_of_int (Time.toNanoseconds time div 1000); | 
| 41680 
a4c822915eaa
more informative task timing: some dependency tracking;
 wenzelm parents: 
41679diff
changeset | 224 | val (run, wait, deps) = Task_Queue.timing_of_task task; | 
| 
a4c822915eaa
more informative task timing: some dependency tracking;
 wenzelm parents: 
41679diff
changeset | 225 |       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
 | 
| 32246 | 226 | val _ = SYNCHRONIZED "finish" (fn () => | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 227 | let | 
| 32738 | 228 | val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); | 
| 44295 
e43f0ea90c9a
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
 wenzelm parents: 
44294diff
changeset | 229 | val test = Exn.capture Multithreading.interrupted (); | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 230 | val _ = | 
| 44295 
e43f0ea90c9a
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
 wenzelm parents: 
44294diff
changeset | 231 | if ok andalso not (Exn.is_interrupt_exn test) then () | 
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 232 | else if null (cancel_now group) then () | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 233 | else cancel_later group; | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 234 | val _ = broadcast work_finished; | 
| 33413 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 wenzelm parents: 
33411diff
changeset | 235 | val _ = if maximal then () else signal work_available; | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 236 | in () end); | 
| 28167 | 237 | in () end; | 
| 238 | ||
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 239 | fun worker_wait active cond = (*requires SYNCHRONIZED*) | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 240 | let | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 241 | val state = | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 242 | (case AList.lookup Thread.equal (! workers) (Thread.self ()) of | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 243 | SOME state => state | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 244 | | NONE => raise Fail "Unregistered worker thread"); | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 245 | val _ = state := (if active then Waiting else Sleeping); | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 246 | val _ = wait cond; | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 247 | val _ = state := Working; | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 248 | in () end; | 
| 28162 | 249 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 250 | fun worker_next () = (*requires SYNCHRONIZED*) | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 251 | if length (! workers) > ! max_workers then | 
| 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 252 | (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 253 | signal work_available; | 
| 28167 | 254 | NONE) | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 255 | else if count_workers Working > ! max_active then | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 256 | (worker_wait false work_available; worker_next ()) | 
| 28166 
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
 wenzelm parents: 
28163diff
changeset | 257 | else | 
| 32738 | 258 | (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 259 | NONE => (worker_wait false work_available; worker_next ()) | 
| 33413 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 wenzelm parents: 
33411diff
changeset | 260 | | some => (signal work_available; some)); | 
| 28156 | 261 | |
| 28167 | 262 | fun worker_loop name = | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 263 | (case SYNCHRONIZED name (fn () => worker_next ()) of | 
| 29119 | 264 | NONE => () | 
| 44295 
e43f0ea90c9a
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
 wenzelm parents: 
44294diff
changeset | 265 | | SOME work => (worker_exec work; worker_loop name)); | 
| 28156 | 266 | |
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 267 | fun worker_start name = (*requires SYNCHRONIZED*) | 
| 37216 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 wenzelm parents: 
37182diff
changeset | 268 | Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 269 | Unsynchronized.ref Working)); | 
| 28156 | 270 | |
| 271 | ||
| 272 | (* scheduler *) | |
| 273 | ||
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 274 | val status_ticks = Unsynchronized.ref 0; | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 275 | |
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 276 | val last_round = Unsynchronized.ref Time.zeroTime; | 
| 40301 | 277 | val next_round = seconds 0.05; | 
| 32226 | 278 | |
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 279 | fun scheduler_next () = (*requires SYNCHRONIZED*) | 
| 28156 | 280 | let | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 281 | val now = Time.now (); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 282 | val tick = Time.<= (Time.+ (! last_round, next_round), now); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 283 | val _ = if tick then last_round := now else (); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 284 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 285 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 286 | (* queue and worker status *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 287 | |
| 32226 | 288 | val _ = | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 289 | if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 290 | val _ = | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 291 | if tick andalso ! status_ticks = 0 then | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 292 | Multithreading.tracing 1 (fn () => | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 293 | let | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 294 |             val {ready, pending, running, passive} = Task_Queue.status (! queue);
 | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 295 | val total = length (! workers); | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 296 | val active = count_workers Working; | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 297 | val waiting = count_workers Waiting; | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 298 | in | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 299 | "SCHEDULE " ^ Time.toString now ^ ": " ^ | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 300 | string_of_int ready ^ " ready, " ^ | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 301 | string_of_int pending ^ " pending, " ^ | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 302 | string_of_int running ^ " running, " ^ | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 303 | string_of_int passive ^ " passive; " ^ | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 304 | string_of_int total ^ " workers, " ^ | 
| 33410 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 305 | string_of_int active ^ " active, " ^ | 
| 
e351f4c1f18c
worker activity: distinguish between waiting (formerly active) and sleeping;
 wenzelm parents: 
33409diff
changeset | 306 | string_of_int waiting ^ " waiting " | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 307 | end) | 
| 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 308 | else (); | 
| 32053 | 309 | |
| 28191 | 310 | val _ = | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 311 | if forall (Thread.isActive o #1) (! workers) then () | 
| 32095 | 312 | else | 
| 33409 | 313 | let | 
| 37682 | 314 | val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); | 
| 33409 | 315 | val _ = workers := alive; | 
| 316 | in | |
| 317 | Multithreading.tracing 0 (fn () => | |
| 318 | "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads") | |
| 319 | end; | |
| 28191 | 320 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 321 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 322 | (* worker pool adjustments *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 323 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 324 | val max_active0 = ! max_active; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 325 | val max_workers0 = ! max_workers; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 326 | |
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 327 | val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 328 | val _ = max_active := m; | 
| 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 329 | |
| 33411 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 330 | val mm = | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 331 | if ! do_shutdown then 0 | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 332 | else if m = 9999 then 1 | 
| 33413 
cb409680dda8
avoid broadcast work_available, use daisy-chained signal instead;
 wenzelm parents: 
33411diff
changeset | 333 | else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m); | 
| 33411 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 334 | val _ = | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 335 | if tick andalso mm > ! max_workers then | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 336 | Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1) | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 337 | else if tick andalso mm < ! max_workers then | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 338 | Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 339 | else (); | 
| 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 340 | val _ = | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 341 | if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 342 | max_workers := mm | 
| 44173 
aaaa13e297dc
immediate fork of initial workers -- avoid 5 ticks (250ms) for adaptive scheme (a07558eb5029);
 wenzelm parents: 
44115diff
changeset | 343 | else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 344 | max_workers := Int.min (mm, 2 * m) | 
| 33411 
a07558eb5029
worker_next: treat wait for work_available as Sleeping, not Waiting;
 wenzelm parents: 
33410diff
changeset | 345 | else (); | 
| 33406 
1ddcb8472bd2
slightly leaner and more direct control of worker activity etc.;
 wenzelm parents: 
33061diff
changeset | 346 | |
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 347 | val missing = ! max_workers - length (! workers); | 
| 28203 | 348 | val _ = | 
| 33407 
1427333220bc
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
 wenzelm parents: 
33406diff
changeset | 349 | if missing > 0 then | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 350 | funpow missing (fn () => | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 351 |           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
 | 
| 28203 | 352 | else (); | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 353 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 354 | val _ = | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 355 | if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 356 | else signal work_available; | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 357 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 358 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 359 | (* canceled groups *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 360 | |
| 32225 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 361 | val _ = | 
| 
d5d6f47fb018
cancel: improved reactivity due to more careful broadcasting;
 wenzelm parents: 
32224diff
changeset | 362 | if null (! canceled) then () | 
| 32293 | 363 | else | 
| 364 | (Multithreading.tracing 1 (fn () => | |
| 365 | string_of_int (length (! canceled)) ^ " canceled groups"); | |
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 366 | Unsynchronized.change canceled (filter_out (null o cancel_now)); | 
| 32293 | 367 | broadcast_work ()); | 
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 368 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 369 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 370 | (* delay loop *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 371 | |
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 372 | val _ = Exn.release (wait_timeout next_round scheduler_event); | 
| 28167 | 373 | |
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 374 | |
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 375 | (* shutdown *) | 
| 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 376 | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 377 | val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else (); | 
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 378 | val continue = not (! do_shutdown andalso null (! workers)); | 
| 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 379 | val _ = if continue then () else scheduler := NONE; | 
| 33415 
352fe8e9162d
worker_next: plain signalling via work_available only, not scheduler_event;
 wenzelm parents: 
33413diff
changeset | 380 | |
| 32219 
9a2566d1fdbd
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
 wenzelm parents: 
32186diff
changeset | 381 | val _ = broadcast scheduler_event; | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 382 | in continue end | 
| 39232 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 wenzelm parents: 
38236diff
changeset | 383 | handle exn => | 
| 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 wenzelm parents: 
38236diff
changeset | 384 | if Exn.is_interrupt exn then | 
| 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 wenzelm parents: 
38236diff
changeset | 385 | (Multithreading.tracing 1 (fn () => "Interrupt"); | 
| 44341 
a93d25fb14fc
purely functional task_queue.ML -- moved actual interrupt_unsynchronized to future.ML;
 wenzelm parents: 
44330diff
changeset | 386 | List.app cancel_later (cancel_all ()); | 
| 39232 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 wenzelm parents: 
38236diff
changeset | 387 | broadcast_work (); true) | 
| 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 wenzelm parents: 
38236diff
changeset | 388 | else reraise exn; | 
| 32295 
400cc493d466
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
 wenzelm parents: 
32293diff
changeset | 389 | |
| 28206 
bcd48c6897d4
eliminated requests, use global state variables uniformly;
 wenzelm parents: 
28203diff
changeset | 390 | fun scheduler_loop () = | 
| 44173 
aaaa13e297dc
immediate fork of initial workers -- avoid 5 ticks (250ms) for adaptive scheme (a07558eb5029);
 wenzelm parents: 
44115diff
changeset | 391 | (while | 
| 33416 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 392 | Multithreading.with_attributes | 
| 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 393 | (Multithreading.sync_interrupts Multithreading.public_interrupts) | 
| 
13d00799fe49
scheduler: clarified interrupt attributes and handling;
 wenzelm parents: 
33415diff
changeset | 394 | (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) | 
| 44173 
aaaa13e297dc
immediate fork of initial workers -- avoid 5 ticks (250ms) for adaptive scheme (a07558eb5029);
 wenzelm parents: 
44115diff
changeset | 395 | do (); last_round := Time.zeroTime); | 
| 28156 | 396 | |
| 28203 | 397 | fun scheduler_active () = (*requires SYNCHRONIZED*) | 
| 398 | (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); | |
| 399 | ||
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 400 | fun scheduler_check () = (*requires SYNCHRONIZED*) | 
| 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 401 | (do_shutdown := false; | 
| 32248 
0241916a5f06
more precise treatment of scheduler_event: continous pulse (50ms) instead of flooding, which was burning many CPU cycles in spare threads;
 wenzelm parents: 
32247diff
changeset | 402 | if scheduler_active () then () | 
| 37216 
3165bc303f66
modernized some structure names, keeping a few legacy aliases;
 wenzelm parents: 
37182diff
changeset | 403 | else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); | 
| 28156 | 404 | |
| 44301 | 405 | |
| 406 | ||
| 407 | (** futures **) | |
| 408 | ||
| 409 | (* cancel *) | |
| 410 | ||
| 411 | fun cancel_group group = SYNCHRONIZED "cancel_group" (fn () => | |
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 412 | let | 
| 47421 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 wenzelm parents: 
47404diff
changeset | 413 | val _ = if null (cancel_now group) then () else cancel_later group; | 
| 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 wenzelm parents: 
47404diff
changeset | 414 | val _ = signal work_available; | 
| 
9624408d8827
always signal after cancel_group: passive tasks may have become active;
 wenzelm parents: 
47404diff
changeset | 415 | val _ = scheduler_check (); | 
| 47404 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 416 | in () end); | 
| 44299 
061599cb6eb0
refined Future.cancel: explicit future allows to join actual cancellation;
 wenzelm parents: 
44298diff
changeset | 417 | |
| 44301 | 418 | fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); | 
| 29366 | 419 | |
| 28156 | 420 | |
| 44110 | 421 | (* future jobs *) | 
| 422 | ||
| 44249 
64620f1d6f87
identify parallel exceptions where they emerge first -- to achieve unique results within evaluation graph;
 wenzelm parents: 
44247diff
changeset | 423 | fun assign_result group result raw_res = | 
| 44110 | 424 | let | 
| 44249 
64620f1d6f87
identify parallel exceptions where they emerge first -- to achieve unique results within evaluation graph;
 wenzelm parents: 
44247diff
changeset | 425 | val res = | 
| 
64620f1d6f87
identify parallel exceptions where they emerge first -- to achieve unique results within evaluation graph;
 wenzelm parents: 
44247diff
changeset | 426 | (case raw_res of | 
| 
64620f1d6f87
identify parallel exceptions where they emerge first -- to achieve unique results within evaluation graph;
 wenzelm parents: 
44247diff
changeset | 427 | Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn)) | 
| 
64620f1d6f87
identify parallel exceptions where they emerge first -- to achieve unique results within evaluation graph;
 wenzelm parents: 
44247diff
changeset | 428 | | _ => raw_res); | 
| 44110 | 429 | val _ = Single_Assignment.assign result res | 
| 430 | handle exn as Fail _ => | |
| 431 | (case Single_Assignment.peek result of | |
| 432 | SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) | |
| 433 | | _ => reraise exn); | |
| 434 | val ok = | |
| 435 | (case the (Single_Assignment.peek result) of | |
| 44111 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 wenzelm parents: 
44110diff
changeset | 436 | Exn.Exn exn => | 
| 
2d16c693d536
synchronized cancel and flushing of Multithreading.interrupted state, to ensure that interrupts stay within task boundaries;
 wenzelm parents: 
44110diff
changeset | 437 | (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false) | 
| 44110 | 438 | | Exn.Res _ => true); | 
| 439 | in ok end; | |
| 440 | ||
| 44113 | 441 | fun future_job group interrupts (e: unit -> 'a) = | 
| 44110 | 442 | let | 
| 443 | val result = Single_Assignment.var "future" : 'a result; | |
| 444 | val pos = Position.thread_data (); | |
| 445 | fun job ok = | |
| 446 | let | |
| 447 | val res = | |
| 448 | if ok then | |
| 449 | Exn.capture (fn () => | |
| 44113 | 450 | Multithreading.with_attributes | 
| 451 | (if interrupts | |
| 452 | then Multithreading.private_interrupts else Multithreading.no_interrupts) | |
| 44295 
e43f0ea90c9a
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
 wenzelm parents: 
44294diff
changeset | 453 | (fn _ => Position.setmp_thread_data pos e ())) () | 
| 44110 | 454 | else Exn.interrupt_exn; | 
| 455 | in assign_result group result res end; | |
| 456 | in (result, job) end; | |
| 457 | ||
| 458 | ||
| 29366 | 459 | (* fork *) | 
| 460 | ||
| 44427 | 461 | type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
 | 
| 462 | val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
 | |
| 44113 | 463 | |
| 44427 | 464 | fun forks ({name, group, deps, pri, interrupts}: params) es =
 | 
| 41674 | 465 | if null es then [] | 
| 466 | else | |
| 467 | let | |
| 468 | val grp = | |
| 469 | (case group of | |
| 470 | NONE => worker_subgroup () | |
| 471 | | SOME grp => grp); | |
| 41708 | 472 | fun enqueue e queue = | 
| 41674 | 473 | let | 
| 44113 | 474 | val (result, job) = future_job grp interrupts e; | 
| 41708 | 475 | val (task, queue') = Task_Queue.enqueue name grp deps pri job queue; | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 476 |           val future = Future {promised = false, task = task, result = result};
 | 
| 41708 | 477 | in (future, queue') end; | 
| 41674 | 478 | in | 
| 479 | SYNCHRONIZED "enqueue" (fn () => | |
| 480 | let | |
| 41708 | 481 | val (futures, queue') = fold_map enqueue es (! queue); | 
| 482 | val _ = queue := queue'; | |
| 483 | val minimal = forall (not o Task_Queue.known_task queue') deps; | |
| 41674 | 484 | val _ = if minimal then signal work_available else (); | 
| 485 | val _ = scheduler_check (); | |
| 486 | in futures end) | |
| 487 | end; | |
| 28162 | 488 | |
| 44113 | 489 | fun fork_pri pri e = | 
| 44301 | 490 |   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e;
 | 
| 44113 | 491 | |
| 41672 
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
 wenzelm parents: 
41670diff
changeset | 492 | fun fork e = fork_pri 0 e; | 
| 28186 | 493 | |
| 494 | ||
| 29366 | 495 | (* join *) | 
| 496 | ||
| 29551 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 497 | local | 
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 498 | |
| 32099 | 499 | fun get_result x = | 
| 500 | (case peek x of | |
| 37852 
a902f158b4fc
eliminated old-style sys_error/SYS_ERROR in favour of exception Fail -- after careful checking that there is no overlap with existing handling of that;
 wenzelm parents: 
37690diff
changeset | 501 | NONE => Exn.Exn (Fail "Unfinished future") | 
| 39232 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 wenzelm parents: 
38236diff
changeset | 502 | | SOME res => | 
| 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 wenzelm parents: 
38236diff
changeset | 503 | if Exn.is_interrupt_exn res then | 
| 44247 | 504 | (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of | 
| 505 | NONE => res | |
| 506 | | SOME exn => Exn.Exn exn) | |
| 39232 
69c6d3e87660
more abstract treatment of interrupts in structure Exn -- hardly ever need to mention Interrupt literally;
 wenzelm parents: 
38236diff
changeset | 507 | else res); | 
| 28186 | 508 | |
| 32095 | 509 | fun join_next deps = (*requires SYNCHRONIZED*) | 
| 41695 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 510 | if null deps then NONE | 
| 32224 | 511 | else | 
| 41681 
b5d7b15166bf
Future.join_results: discontinued post-hoc recording of dynamic dependencies;
 wenzelm parents: 
41680diff
changeset | 512 | (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of | 
| 41695 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 513 | (NONE, []) => NONE | 
| 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 514 | | (NONE, deps') => | 
| 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 515 | (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') | 
| 32224 | 516 | | (SOME work, deps') => SOME (work, deps')); | 
| 32095 | 517 | |
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 518 | fun execute_work NONE = () | 
| 44110 | 519 | | execute_work (SOME (work, deps')) = | 
| 520 | (worker_joining (fn () => worker_exec work); join_work deps') | |
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 521 | and join_work deps = | 
| 43538 
de5c79682b56
more robust join_results: join_work needs to be uninterruptible, otherwise the task being dequeued by join_next might be never executed/finished!
 wenzelm parents: 
42128diff
changeset | 522 | Multithreading.with_attributes Multithreading.no_interrupts | 
| 
de5c79682b56
more robust join_results: join_work needs to be uninterruptible, otherwise the task being dequeued by join_next might be never executed/finished!
 wenzelm parents: 
42128diff
changeset | 523 | (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps))); | 
| 32814 
81897d30b97f
added Task_Queue.depend (again) -- light-weight version for transitive graph;
 wenzelm parents: 
32738diff
changeset | 524 | |
| 29551 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 525 | in | 
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 526 | |
| 29366 | 527 | fun join_results xs = | 
| 41679 | 528 | let | 
| 529 | val _ = | |
| 530 | if forall is_finished xs then () | |
| 531 | else if Multithreading.self_critical () then | |
| 532 | error "Cannot join future values within critical section" | |
| 41695 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 533 | else if is_some (worker_task ()) then join_work (map task_of xs) | 
| 41681 
b5d7b15166bf
Future.join_results: discontinued post-hoc recording of dynamic dependencies;
 wenzelm parents: 
41680diff
changeset | 534 | else List.app (ignore o Single_Assignment.await o result_of) xs; | 
| 41679 | 535 | in map get_result xs end; | 
| 28186 | 536 | |
| 29551 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 537 | end; | 
| 
95e469919c3e
join_results: when dependencies are resulved (but not finished yet),
 wenzelm parents: 
29431diff
changeset | 538 | |
| 28647 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 wenzelm parents: 
28645diff
changeset | 539 | fun join_result x = singleton join_results x; | 
| 44330 | 540 | fun joins xs = Par_Exn.release_all (join_results xs); | 
| 28647 
8068cdc84e7e
join_results: allow CRITICAL join of finished futures;
 wenzelm parents: 
28645diff
changeset | 541 | fun join x = Exn.release (join_result x); | 
| 28156 | 542 | |
| 44301 | 543 | fun join_tasks [] = () | 
| 544 | | join_tasks tasks = | |
| 545 | (singleton o forks) | |
| 546 |         {name = "join_tasks", group = SOME (new_group NONE),
 | |
| 547 | deps = tasks, pri = 0, interrupts = false} I | |
| 548 | |> join; | |
| 549 | ||
| 29366 | 550 | |
| 44110 | 551 | (* fast-path versions -- bypassing task queue *) | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 552 | |
| 44294 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 553 | fun value_result (res: 'a Exn.result) = | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 554 | let | 
| 45136 
2afb928c71ca
static dummy_task (again) to avoid a few extra allocations;
 wenzelm parents: 
44427diff
changeset | 555 | val task = Task_Queue.dummy_task; | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 556 | val group = Task_Queue.group_of_task task; | 
| 35016 | 557 | val result = Single_Assignment.var "value" : 'a result; | 
| 44294 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 558 | val _ = assign_result group result res; | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 559 |   in Future {promised = false, task = task, result = result} end;
 | 
| 29366 | 560 | |
| 44294 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 561 | fun value x = value_result (Exn.Res x); | 
| 
a0ddd5760444
clarified Future.cond_forks: more uniform handling of exceptional situations;
 wenzelm parents: 
44268diff
changeset | 562 | |
| 44330 | 563 | fun cond_forks args es = | 
| 564 | if Multithreading.enabled () then forks args es | |
| 565 | else map (fn e => value_result (Exn.interruptible_capture e ())) es; | |
| 566 | ||
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 567 | fun map_future f x = | 
| 29366 | 568 | let | 
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 569 | val task = task_of x; | 
| 44300 
349cc426d929
tuned signature -- treat structure Task_Queue as private to implementation;
 wenzelm parents: 
44299diff
changeset | 570 | val group = new_group (SOME (Task_Queue.group_of_task task)); | 
| 44113 | 571 | val (result, job) = future_job group true (fn () => f (join x)); | 
| 29384 
a3c7e9ae9b71
more robust propagation of errors through bulk jobs;
 wenzelm parents: 
29366diff
changeset | 572 | |
| 32246 | 573 | val extended = SYNCHRONIZED "extend" (fn () => | 
| 29366 | 574 | (case Task_Queue.extend task job (! queue) of | 
| 575 | SOME queue' => (queue := queue'; true) | |
| 576 | | NONE => false)); | |
| 577 | in | |
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 578 |     if extended then Future {promised = false, task = task, result = result}
 | 
| 41672 
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
 wenzelm parents: 
41670diff
changeset | 579 | else | 
| 44330 | 580 | (singleton o cond_forks) | 
| 44301 | 581 |         {name = "map_future", group = SOME group, deps = [task],
 | 
| 582 | pri = Task_Queue.pri_of_task task, interrupts = true} | |
| 41672 
2f70b1ddd09f
more direct Future.bulk, which potentially reduces overhead for Par_List;
 wenzelm parents: 
41670diff
changeset | 583 | (fn () => f (join x)) | 
| 29366 | 584 | end; | 
| 28979 
3ce619d8d432
fork/map: no inheritance of group (structure is nested, not parallel);
 wenzelm parents: 
28972diff
changeset | 585 | |
| 28191 | 586 | |
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 587 | (* promised futures -- fulfilled by external means *) | 
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 588 | |
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 589 | fun promise_group group abort : 'a future = | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 590 | let | 
| 35016 | 591 | val result = Single_Assignment.var "promise" : 'a result; | 
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 592 | fun assign () = assign_result group result Exn.interrupt_exn | 
| 39243 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 593 | handle Fail _ => true | 
| 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 594 | | exn => | 
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 595 | if Exn.is_interrupt exn | 
| 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 596 | then raise Fail "Concurrent attempt to fulfill promise" | 
| 39243 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 597 | else reraise exn; | 
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 598 | fun job () = | 
| 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 599 | Multithreading.with_attributes Multithreading.no_interrupts | 
| 47423 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 600 | (fn _ => Exn.release (Exn.capture assign () before abort ())); | 
| 37854 
047c96f41455
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
 wenzelm parents: 
37852diff
changeset | 601 | val task = SYNCHRONIZED "enqueue_passive" (fn () => | 
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 602 | Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job)); | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 603 |   in Future {promised = true, task = task, result = result} end;
 | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 604 | |
| 44298 
b8f8488704e2
Future.promise: explicit abort operation (like uninterruptible future job);
 wenzelm parents: 
44295diff
changeset | 605 | fun promise abort = promise_group (worker_subgroup ()) abort; | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 606 | |
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 607 | fun fulfill_result (Future {promised, task, result}) res =
 | 
| 39243 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 608 | if not promised then raise Fail "Not a promised future" | 
| 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 609 | else | 
| 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 610 | let | 
| 41683 
73dde8006820
maintain Task_Queue.group within Task_Queue.task;
 wenzelm parents: 
41681diff
changeset | 611 | val group = Task_Queue.group_of_task task; | 
| 39243 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 612 | fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn); | 
| 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 613 | val _ = | 
| 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 614 | Multithreading.with_attributes Multithreading.no_interrupts (fn _ => | 
| 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 615 | let | 
| 47423 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 616 | val passive_job = | 
| 39243 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 617 | SYNCHRONIZED "fulfill_result" (fn () => | 
| 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 618 | Unsynchronized.change_result queue | 
| 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 619 | (Task_Queue.dequeue_passive (Thread.self ()) task)); | 
| 47423 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 620 | in | 
| 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 621 | (case passive_job of | 
| 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 622 | SOME true => worker_exec (task, [job]) | 
| 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 623 | | SOME false => () | 
| 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 624 | | NONE => ignore (job (not (Task_Queue.is_canceled group)))) | 
| 
8a179a0493e3
more robust Future.fulfill wrt. duplicate assignment and interrupt;
 wenzelm parents: 
47421diff
changeset | 625 | end); | 
| 41681 
b5d7b15166bf
Future.join_results: discontinued post-hoc recording of dynamic dependencies;
 wenzelm parents: 
41680diff
changeset | 626 | val _ = | 
| 41695 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 627 | if is_some (Single_Assignment.peek result) then () | 
| 
afdbec23b92b
eliminated slightly odd abstract type Task_Queue.deps;
 wenzelm parents: 
41683diff
changeset | 628 | else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); | 
| 39243 
307e3d07d19f
Future.promise: more robust treatment of concurrent abort vs. fulfill (amending 047c96f41455);
 wenzelm parents: 
39232diff
changeset | 629 | in () end; | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 630 | |
| 43761 
e72ba84ae58f
tuned signature -- corresponding to Scala version;
 wenzelm parents: 
43665diff
changeset | 631 | fun fulfill x res = fulfill_result x (Exn.Res res); | 
| 34277 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 632 | |
| 
7325a5e3587f
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
 wenzelm parents: 
33416diff
changeset | 633 | |
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 634 | (* shutdown *) | 
| 29366 | 635 | |
| 28203 | 636 | fun shutdown () = | 
| 28276 | 637 | if Multithreading.available then | 
| 638 | SYNCHRONIZED "shutdown" (fn () => | |
| 32228 
7622c03141b0
scheduler: shutdown spontaneously (after some delay) if queue is empty;
 wenzelm parents: 
32227diff
changeset | 639 | while scheduler_active () do | 
| 34279 
02936e77a07c
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
 wenzelm parents: 
34277diff
changeset | 640 | (wait scheduler_event; broadcast_work ())) | 
| 28276 | 641 | else (); | 
| 28203 | 642 | |
| 29366 | 643 | |
| 47404 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 644 | (* queue status *) | 
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 645 | |
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 646 | fun group_tasks group = Task_Queue.group_tasks (! queue) group; | 
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 647 | |
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 648 | fun queue_status () = Task_Queue.status (! queue); | 
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 649 | |
| 
e6e5750f1311
simplified Future.cancel/cancel_group (again) -- running threads only;
 wenzelm parents: 
45666diff
changeset | 650 | |
| 29366 | 651 | (*final declarations of this structure!*) | 
| 652 | val map = map_future; | |
| 653 | ||
| 28156 | 654 | end; | 
| 28972 | 655 | |
| 656 | type 'a future = 'a Future.future; | |
| 657 |