11 type group = TaskQueue.group |
11 type group = TaskQueue.group |
12 type 'a T |
12 type 'a T |
13 val task_of: 'a T -> task |
13 val task_of: 'a T -> task |
14 val group_of: 'a T -> group |
14 val group_of: 'a T -> group |
15 val shutdown_request: unit -> unit |
15 val shutdown_request: unit -> unit |
16 val cancel: 'a T -> unit |
|
17 val future: bool -> task list -> (unit -> 'a) -> 'a T |
16 val future: bool -> task list -> (unit -> 'a) -> 'a T |
18 val fork: (unit -> 'a) -> 'a T |
17 val fork: (unit -> 'a) -> 'a T |
|
18 val cancel: 'a T -> unit |
|
19 val join_all: 'a T list -> 'a list |
19 val join: 'a T -> 'a |
20 val join: 'a T -> 'a |
20 end; |
21 end; |
21 |
22 |
22 structure Future: FUTURE = |
23 structure Future: FUTURE = |
23 struct |
24 struct |
67 datatype request = Shutdown | Cancel of group; |
68 datatype request = Shutdown | Cancel of group; |
68 val requests = Mailbox.create () : request Mailbox.T; |
69 val requests = Mailbox.create () : request Mailbox.T; |
69 |
70 |
70 fun shutdown_request () = Mailbox.send requests Shutdown; |
71 fun shutdown_request () = Mailbox.send requests Shutdown; |
71 fun cancel_request group = Mailbox.send requests (Cancel group); |
72 fun cancel_request group = Mailbox.send requests (Cancel group); |
72 fun cancel x = cancel_request (group_of x); |
|
73 |
73 |
74 |
74 |
75 (* synchronization *) |
75 (* synchronization *) |
76 |
76 |
77 local |
77 local |
98 |
98 |
99 end; |
99 end; |
100 |
100 |
101 |
101 |
102 (* execute *) |
102 (* execute *) |
103 |
|
104 fun cancel_group group = (*requires SYNCHRONIZED*) |
|
105 (case change_result queue (TaskQueue.cancel group) of |
|
106 [] => true |
|
107 | running => (List.app (fn t => Thread.interrupt t handle Thread _ => ()) running; false)); |
|
108 |
103 |
109 fun execute name (task, group, run) = |
104 fun execute name (task, group, run) = |
110 let |
105 let |
111 val _ = set_thread_data (SOME (task, group)); |
106 val _ = set_thread_data (SOME (task, group)); |
112 val _ = Multithreading.tracing 4 (fn () => name ^ ": running"); |
107 val _ = Multithreading.tracing 4 (fn () => name ^ ": running"); |
113 val ok = run (); |
108 val ok = run (); |
114 val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); |
109 val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); |
115 val _ = set_thread_data NONE; |
110 val _ = set_thread_data NONE; |
116 val _ = SYNCHRONIZED (fn () => |
111 val _ = SYNCHRONIZED (fn () => |
117 (change queue (TaskQueue.finish task); |
112 (change queue (TaskQueue.finish task); |
118 if ok then () else if cancel_group group then () else cancel_request group; |
113 if ok then () |
|
114 else if change_result queue (TaskQueue.cancel group) then () |
|
115 else cancel_request group; |
119 notify_all ())); |
116 notify_all ())); |
120 in () end; |
117 in () end; |
121 |
118 |
122 |
119 |
123 (* worker threads *) |
120 (* worker threads *) |
124 |
121 |
125 fun change_active b = (*requires SYNCHRONIZED*) |
122 fun change_active b = (*requires SYNCHRONIZED*) |
126 (change active (fn n => if b then n + 1 else n - 1); trace_active ()); |
123 (change active (fn n => if b then n + 1 else n - 1); trace_active ()); |
|
124 |
|
125 fun worker_wait name = (*requires SYNCHRONIZED*) |
|
126 (change_active false; wait name; change_active true); |
127 |
127 |
128 fun worker_next name = (*requires SYNCHRONIZED*) |
128 fun worker_next name = (*requires SYNCHRONIZED*) |
129 if ! excessive > 0 then |
129 if ! excessive > 0 then |
130 (dec excessive; |
130 (dec excessive; |
131 change_active false; |
131 change_active false; |
132 change workers (remove Thread.equal (Thread.self ())); |
132 change workers (remove Thread.equal (Thread.self ())); |
133 NONE) |
133 NONE) |
134 else |
134 else |
135 (case change_result queue (TaskQueue.dequeue (Thread.self ())) of |
135 (case change_result queue TaskQueue.dequeue of |
136 NONE => (change_active false; wait name; change_active true; worker_next name) |
136 NONE => (worker_wait name; worker_next name) |
137 | some => some); |
137 | some => some); |
138 |
138 |
139 fun worker_loop name = |
139 fun worker_loop name = |
140 (case SYNCHRONIZED (fn () => worker_next name) of |
140 (case SYNCHRONIZED (fn () => worker_next name) of |
141 NONE => () |
141 NONE => () |
156 val _ = excessive := l - m; |
156 val _ = excessive := l - m; |
157 in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end); |
157 in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end); |
158 |
158 |
159 fun scheduler_loop canceled = |
159 fun scheduler_loop canceled = |
160 let |
160 let |
161 val canceled' = SYNCHRONIZED (fn () => filter_out cancel_group canceled); |
161 val canceled' = SYNCHRONIZED (fn () => |
|
162 filter_out (change_result queue o TaskQueue.cancel) canceled); |
162 val _ = scheduler_fork (); |
163 val _ = scheduler_fork (); |
163 in |
164 in |
164 (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of |
165 (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of |
165 SOME Shutdown => () (* FIXME proper worker shutdown *) |
166 SOME Shutdown => () (* FIXME proper worker shutdown *) |
166 | SOME (Cancel group) => scheduler_loop (group :: canceled') |
167 | SOME (Cancel group) => scheduler_loop (group :: canceled') |
193 change_result queue (TaskQueue.enqueue group deps run) before notify_all ()); |
194 change_result queue (TaskQueue.enqueue group deps run) before notify_all ()); |
194 in Future {task = task, group = group, result = result} end; |
195 in Future {task = task, group = group, result = result} end; |
195 |
196 |
196 fun fork e = future false [] e; |
197 fun fork e = future false [] e; |
197 |
198 |
198 fun join (Future {result, ...}) = |
199 fun cancel x = (check_scheduler (); cancel_request (group_of x)); |
|
200 |
|
201 |
|
202 (* join *) |
|
203 |
|
204 fun join_all xs = |
199 let |
205 let |
200 val _ = check_scheduler (); |
206 val _ = check_scheduler (); |
201 |
207 |
202 fun passive_loop () = |
208 fun unfinished () = |
203 (case ! result of |
209 xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); |
204 NONE => (wait "join"; passive_loop ()) |
210 |
205 | SOME res => res); |
211 (*alien thread -- refrain from contending for resources*) |
206 in Exn.release (SYNCHRONIZED passive_loop) end; |
212 fun passive_join () = (*requires SYNCHRONIZED*) |
207 |
213 (case unfinished () of [] => () |
208 end; |
214 | _ => (wait "join"; passive_join ())); |
|
215 |
|
216 (*proper worker thread -- actively work towards results*) |
|
217 fun active_join () = (*requires SYNCHRONIZED*) |
|
218 (case unfinished () of [] => () |
|
219 | tasks => |
|
220 (case change_result queue (TaskQueue.dequeue_towards tasks) of |
|
221 NONE => (worker_wait "join"; active_join ()) |
|
222 | SOME work => (execute "join" work; active_join ()))); |
|
223 |
|
224 val _ = |
|
225 (case thread_data () of |
|
226 NONE => SYNCHRONIZED passive_join |
|
227 | SOME (task, _) => SYNCHRONIZED (fn () => |
|
228 (change queue (TaskQueue.depend (unfinished ()) task); active_join ()))); |
|
229 |
|
230 val res = xs |> map (fn Future {result = ref (SOME res), ...} => res); |
|
231 in |
|
232 (case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of |
|
233 NONE => map Exn.release res |
|
234 | SOME e => raise e) |
|
235 end; |
|
236 |
|
237 fun join x = singleton join_all x; |
|
238 |
|
239 end; |