|
1 (* Title: Pure/Concurrent/future.ML |
|
2 ID: $Id$ |
|
3 Author: Makarius |
|
4 |
|
5 Functional threads as future values. |
|
6 *) |
|
7 |
|
8 signature FUTURE = |
|
9 sig |
|
10 type 'a T |
|
11 eqtype id |
|
12 val id_of: 'a T -> id |
|
13 val interrupt: id -> unit |
|
14 val dependent_future: id list -> (unit -> 'a) -> 'a T |
|
15 val future: (unit -> 'a) -> 'a T |
|
16 val await: 'a T -> 'a |
|
17 end; |
|
18 |
|
19 structure Future: FUTURE = |
|
20 struct |
|
21 |
|
22 (* synchronized execution *) |
|
23 |
|
24 local |
|
25 val thread = ref (NONE: Thread.thread option); |
|
26 val lock = Mutex.mutex (); |
|
27 val cond = ConditionVar.conditionVar (); |
|
28 in |
|
29 |
|
30 fun self_synchronized () = |
|
31 (case ! thread of |
|
32 NONE => false |
|
33 | SOME t => Thread.equal (t, Thread.self ())); |
|
34 |
|
35 fun SYNCHRONIZED e = |
|
36 if self_synchronized () then e () |
|
37 else |
|
38 uninterruptible (fn restore_attributes => fn () => |
|
39 let |
|
40 val _ = Mutex.lock lock; |
|
41 val _ = thread := SOME (Thread.self ()); |
|
42 val result = Exn.capture (restore_attributes e) (); |
|
43 val _ = thread := NONE; |
|
44 val _ = Mutex.unlock lock; |
|
45 in Exn.release result end) (); |
|
46 |
|
47 fun wait () = ConditionVar.wait (cond, lock); |
|
48 fun wait_timeout timeout = ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)); |
|
49 |
|
50 fun notify_all () = ConditionVar.broadcast cond; |
|
51 |
|
52 end; |
|
53 |
|
54 |
|
55 (* typed futures, unytped ids *) |
|
56 |
|
57 datatype 'a T = Future of serial * 'a Exn.result option ref; |
|
58 |
|
59 datatype id = Id of serial; |
|
60 fun id_of (Future (id, _)) = Id id; |
|
61 |
|
62 local val tag = Universal.tag () : serial Universal.tag in |
|
63 fun get_id () = Thread.getLocal tag; |
|
64 fun put_id id = Thread.setLocal (tag, id); |
|
65 end; |
|
66 |
|
67 |
|
68 (* ordered queue of tasks *) |
|
69 |
|
70 datatype task = |
|
71 Task of (unit -> unit) | |
|
72 Running of Thread.thread; |
|
73 |
|
74 datatype queue = Queue of task IntGraph.T * (serial * (unit -> unit)) Queue.T; |
|
75 |
|
76 val empty_queue = Queue (IntGraph.empty, Queue.empty); |
|
77 |
|
78 fun check_cache (queue as Queue (tasks, cache)) = |
|
79 if not (Queue.is_empty cache) then queue |
|
80 else |
|
81 let |
|
82 val cache' = fold (fn id => |
|
83 (case IntGraph.get_node tasks id of |
|
84 Task task => Queue.enqueue (id, task) |
|
85 | Running _ => I)) (IntGraph.minimals tasks) Queue.empty; |
|
86 in Queue (tasks, cache') end; |
|
87 |
|
88 val next_task = check_cache #> (fn queue as Queue (tasks, cache) => |
|
89 if Queue.is_empty cache then (NONE, queue) |
|
90 else |
|
91 let val (task, cache') = Queue.dequeue cache |
|
92 in (SOME task, Queue (tasks, cache')) end); |
|
93 |
|
94 fun get_task (Queue (tasks, _)) id = IntGraph.get_node tasks id; |
|
95 |
|
96 fun new_task deps id task (Queue (tasks, _)) = |
|
97 let |
|
98 fun add_dep (Id dep) G = IntGraph.add_edge_acyclic (dep, id) G |
|
99 handle IntGraph.UNDEF _ => G; (*dep already finished*) |
|
100 val tasks' = tasks |> IntGraph.new_node (id, Task task) |> fold add_dep deps; |
|
101 in Queue (tasks', Queue.empty) end; |
|
102 |
|
103 fun running_task id thread (Queue (tasks, cache)) = |
|
104 Queue (IntGraph.map_node id (K (Running thread)) tasks, cache); |
|
105 |
|
106 fun finished_task id (Queue (tasks, _)) = |
|
107 Queue (IntGraph.del_nodes [id] tasks, Queue.empty); |
|
108 |
|
109 |
|
110 (* global state *) |
|
111 |
|
112 local val active = ref 0 in |
|
113 |
|
114 fun change_active b = SYNCHRONIZED (fn () => |
|
115 let |
|
116 val _ = change active (fn n => if b then n + 1 else n - 1); |
|
117 val n = ! active; |
|
118 val _ = Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int n ^ " active tasks"); |
|
119 in () end); |
|
120 |
|
121 end; |
|
122 |
|
123 val tasks = ref empty_queue; |
|
124 val scheduler = ref (NONE: Thread.thread option); |
|
125 val workers = ref ([]: Thread.thread list); |
|
126 |
|
127 |
|
128 fun interrupt (Id id) = SYNCHRONIZED (fn () => |
|
129 (case try (get_task (! tasks)) id of |
|
130 SOME (Running thread) => Thread.interrupt thread |
|
131 | _ => ())); |
|
132 |
|
133 |
|
134 (* worker thread *) |
|
135 |
|
136 fun excessive_threads () = false; (* FIXME *) |
|
137 |
|
138 fun worker_stop () = |
|
139 (change_active false; change workers (filter (fn t => not (Thread.equal (t, Thread.self ()))))); |
|
140 |
|
141 fun worker_wait () = |
|
142 (change_active false; wait (); change_active true); |
|
143 |
|
144 fun worker_loop () = |
|
145 (case SYNCHRONIZED (fn () => change_result tasks next_task) of |
|
146 SOME (id, task) => |
|
147 let |
|
148 val _ = SYNCHRONIZED (fn () => change tasks (running_task id (Thread.self ()))); |
|
149 val _ = task (); |
|
150 val _ = SYNCHRONIZED (fn () => change tasks (finished_task id)); |
|
151 val _ = notify_all (); |
|
152 in if excessive_threads () then worker_stop () else worker_loop () end |
|
153 | NONE => (worker_wait (); worker_loop ())); |
|
154 |
|
155 fun worker_start () = |
|
156 (change_active true; |
|
157 change workers (cons (Thread.fork (worker_loop, Multithreading.no_interrupts)))); |
|
158 |
|
159 |
|
160 (* scheduler *) |
|
161 |
|
162 fun scheduler_loop () = |
|
163 let |
|
164 val m = Multithreading.max_threads_value (); |
|
165 val k = m - length (! workers); |
|
166 val _ = if k > 0 then funpow k worker_start () else (); |
|
167 in wait_timeout (Time.fromSeconds 1); scheduler_loop () end; |
|
168 |
|
169 fun check_scheduler () = SYNCHRONIZED (fn () => |
|
170 let |
|
171 val scheduler_active = |
|
172 (case ! scheduler of |
|
173 NONE => false |
|
174 | SOME t => Thread.isActive t); |
|
175 in |
|
176 if scheduler_active then () |
|
177 else scheduler := SOME (Thread.fork (SYNCHRONIZED scheduler_loop, Multithreading.no_interrupts)) |
|
178 end); |
|
179 |
|
180 |
|
181 (* future values *) |
|
182 |
|
183 fun dependent_future deps (e: unit -> 'a) = |
|
184 let |
|
185 val _ = check_scheduler (); |
|
186 |
|
187 val r = ref (NONE: 'a Exn.result option); |
|
188 val task = Multithreading.with_attributes (Thread.getAttributes ()) |
|
189 (fn _ => fn () => r := SOME (Exn.capture e ())); |
|
190 val id = serial (); |
|
191 val _ = SYNCHRONIZED (fn () => change tasks (new_task deps id task)); |
|
192 val _ = notify_all (); |
|
193 in Future (id, r) end; |
|
194 |
|
195 fun future e = dependent_future [] e; |
|
196 |
|
197 fun await (Future (_, r)) = |
|
198 let |
|
199 val _ = check_scheduler (); |
|
200 |
|
201 fun loop () = |
|
202 (case SYNCHRONIZED (fn () => ! r) of |
|
203 NONE => (wait (); loop ()) |
|
204 | SOME res => Exn.release res); |
|
205 in loop () end; |
|
206 |
|
207 end; |