author | wenzelm |
Wed, 10 Sep 2008 20:28:01 +0200 | |
changeset 28192 | 6d977729c8fa |
parent 28191 | 9e5f556409c6 |
child 28193 | 7ed74d0ba607 |
permissions | -rw-r--r-- |
28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
5 |
Functional threads as future values. |
|
6 |
*) |
|
7 |
||
8 |
signature FUTURE = |
|
9 |
sig |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
10 |
type task = TaskQueue.task |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
11 |
type group = TaskQueue.group |
28156 | 12 |
type 'a T |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
13 |
val task_of: 'a T -> task |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
14 |
val group_of: 'a T -> group |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
15 |
val shutdown_request: unit -> unit |
28191 | 16 |
val future: group option -> task list -> (unit -> 'a) -> 'a T |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
17 |
val fork: (unit -> 'a) -> 'a T |
28186 | 18 |
val cancel: 'a T -> unit |
19 |
val join_all: 'a T list -> 'a list |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
20 |
val join: 'a T -> 'a |
28156 | 21 |
end; |
22 |
||
23 |
structure Future: FUTURE = |
|
24 |
struct |
|
25 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
26 |
(** future values **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
27 |
|
28167 | 28 |
(* identifiers *) |
29 |
||
30 |
type task = TaskQueue.task; |
|
31 |
type group = TaskQueue.group; |
|
32 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
33 |
local val tag = Universal.tag () : (task * group) option Universal.tag in |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
34 |
fun thread_data () = the_default NONE (Thread.getLocal tag); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
35 |
fun set_thread_data x = Thread.setLocal (tag, x); |
28167 | 36 |
end; |
37 |
||
38 |
||
39 |
(* datatype future *) |
|
40 |
||
41 |
datatype 'a T = Future of |
|
42 |
{task: task, |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
43 |
group: group, |
28167 | 44 |
result: 'a Exn.result option ref}; |
45 |
||
46 |
fun task_of (Future {task, ...}) = task; |
|
47 |
fun group_of (Future {group, ...}) = group; |
|
48 |
||
49 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
50 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
51 |
(** scheduling **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
52 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
53 |
(* global state *) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
54 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
55 |
val queue = ref TaskQueue.empty; |
28192 | 56 |
val workers = ref ([]: (Thread.thread * bool) list); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
57 |
val scheduler = ref (NONE: Thread.thread option); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
58 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
59 |
val excessive = ref 0; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
60 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
61 |
fun trace_active () = |
28192 | 62 |
let |
63 |
val ws = ! workers; |
|
64 |
val m = string_of_int (length ws); |
|
65 |
val n = string_of_int (length (filter #2 ws)); |
|
66 |
in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end; |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
67 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
68 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
69 |
(* requests *) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
70 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
71 |
datatype request = Shutdown | Cancel of group; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
72 |
val requests = Mailbox.create () : request Mailbox.T; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
73 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
74 |
fun shutdown_request () = Mailbox.send requests Shutdown; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
75 |
fun cancel_request group = Mailbox.send requests (Cancel group); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
76 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
77 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
78 |
(* synchronization *) |
28156 | 79 |
|
80 |
local |
|
81 |
val lock = Mutex.mutex (); |
|
82 |
val cond = ConditionVar.conditionVar (); |
|
83 |
in |
|
84 |
||
28192 | 85 |
fun SYNCHRONIZED name e = uninterruptible (fn restore_attributes => fn () => |
28162 | 86 |
let |
28192 | 87 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": locking"); |
28162 | 88 |
val _ = Mutex.lock lock; |
28192 | 89 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": locked"); |
28162 | 90 |
val result = Exn.capture (restore_attributes e) (); |
91 |
val _ = Mutex.unlock lock; |
|
28192 | 92 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": unlocked"); |
28162 | 93 |
in Exn.release result end) (); |
28156 | 94 |
|
28167 | 95 |
fun wait name = (*requires SYNCHRONIZED*) |
96 |
let |
|
28192 | 97 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": waiting"); |
28167 | 98 |
val _ = ConditionVar.wait (cond, lock); |
28192 | 99 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": notified"); |
28167 | 100 |
in () end; |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
101 |
|
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
102 |
fun notify_all () = (*requires SYNCHRONIZED*) |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
103 |
ConditionVar.broadcast cond; |
28156 | 104 |
|
105 |
end; |
|
106 |
||
107 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
108 |
(* execute *) |
28156 | 109 |
|
28167 | 110 |
fun execute name (task, group, run) = |
111 |
let |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
112 |
val _ = set_thread_data (SOME (task, group)); |
28167 | 113 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": running"); |
114 |
val ok = run (); |
|
115 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
116 |
val _ = set_thread_data NONE; |
28192 | 117 |
val _ = SYNCHRONIZED "execute" (fn () => |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
118 |
(change queue (TaskQueue.finish task); |
28186 | 119 |
if ok then () |
28191 | 120 |
else if TaskQueue.cancel (! queue) group then () |
28186 | 121 |
else cancel_request group; |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
122 |
notify_all ())); |
28167 | 123 |
in () end; |
124 |
||
125 |
||
126 |
(* worker threads *) |
|
127 |
||
28192 | 128 |
fun change_active active = (*requires SYNCHRONIZED*) |
129 |
(change workers (AList.update Thread.equal (Thread.self (), active)); trace_active ()); |
|
28186 | 130 |
|
131 |
fun worker_wait name = (*requires SYNCHRONIZED*) |
|
132 |
(change_active false; wait name; change_active true); |
|
28162 | 133 |
|
28167 | 134 |
fun worker_next name = (*requires SYNCHRONIZED*) |
135 |
if ! excessive > 0 then |
|
136 |
(dec excessive; |
|
28192 | 137 |
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
28167 | 138 |
NONE) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
139 |
else |
28186 | 140 |
(case change_result queue TaskQueue.dequeue of |
141 |
NONE => (worker_wait name; worker_next name) |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
142 |
| some => some); |
28156 | 143 |
|
28167 | 144 |
fun worker_loop name = |
28192 | 145 |
(case SYNCHRONIZED name (fn () => worker_next name) of |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
146 |
NONE => () |
28167 | 147 |
| SOME work => (execute name work; worker_loop name)); |
28156 | 148 |
|
28167 | 149 |
fun worker_start name = (*requires SYNCHRONIZED*) |
28192 | 150 |
change workers |
151 |
(cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts), true)); |
|
28156 | 152 |
|
153 |
||
154 |
(* scheduler *) |
|
155 |
||
28192 | 156 |
fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () => |
28156 | 157 |
let |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
158 |
val _ = trace_active (); |
28191 | 159 |
val _ = |
28192 | 160 |
(case List.partition (Thread.isActive o #1) (! workers) of |
28191 | 161 |
(_, []) => () |
162 |
| (active, inactive) => |
|
163 |
(workers := active; Multithreading.tracing 0 (fn () => |
|
28192 | 164 |
"SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); |
28191 | 165 |
|
166 |
val m = if shutdown then 0 else Multithreading.max_threads_value (); |
|
28167 | 167 |
val l = length (! workers); |
168 |
val _ = excessive := l - m; |
|
28191 | 169 |
val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1); |
28192 | 170 |
val _ = if shutdown then notify_all () else (); |
171 |
in shutdown andalso null (! workers) end); |
|
28167 | 172 |
|
28191 | 173 |
fun scheduler_loop (shutdown, canceled) = |
174 |
if scheduler_fork shutdown then () |
|
175 |
else |
|
28192 | 176 |
let |
177 |
val canceled' = SYNCHRONIZED "scheduler" |
|
178 |
(fn () => filter_out (TaskQueue.cancel (! queue)) canceled); |
|
179 |
in |
|
28191 | 180 |
(case Mailbox.receive_timeout (Time.fromSeconds 1) requests of |
181 |
SOME Shutdown => scheduler_loop (true, canceled') |
|
182 |
| SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled') |
|
183 |
| NONE => scheduler_loop (shutdown, canceled')) |
|
184 |
end; |
|
28156 | 185 |
|
28192 | 186 |
fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () => |
28167 | 187 |
if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () |
28191 | 188 |
else scheduler := |
189 |
SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts))); |
|
28156 | 190 |
|
191 |
||
28191 | 192 |
(* future values: fork independent computation *) |
28156 | 193 |
|
28191 | 194 |
fun future opt_group deps (e: unit -> 'a) = |
28156 | 195 |
let |
28191 | 196 |
val _ = scheduler_check (); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
197 |
|
28191 | 198 |
val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ()); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
199 |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
200 |
val result = ref (NONE: 'a Exn.result option); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
201 |
val run = Multithreading.with_attributes (Thread.getAttributes ()) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
202 |
(fn _ => fn ok => |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
203 |
let val res = if ok then Exn.capture e () else Exn.Exn Interrupt |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
204 |
in result := SOME res; is_some (Exn.get_result res) end); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
205 |
|
28192 | 206 |
val task = SYNCHRONIZED "future" (fn () => |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
207 |
change_result queue (TaskQueue.enqueue group deps run) before notify_all ()); |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
208 |
in Future {task = task, group = group, result = result} end; |
28162 | 209 |
|
28191 | 210 |
fun fork e = future (Option.map #2 (thread_data ())) [] e; |
28186 | 211 |
|
212 |
||
28191 | 213 |
(* join: retrieve results *) |
28186 | 214 |
|
215 |
fun join_all xs = |
|
28156 | 216 |
let |
28191 | 217 |
val _ = scheduler_check (); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
218 |
|
28186 | 219 |
fun unfinished () = |
220 |
xs |> map_filter (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE); |
|
221 |
||
222 |
(*alien thread -- refrain from contending for resources*) |
|
223 |
fun passive_join () = (*requires SYNCHRONIZED*) |
|
224 |
(case unfinished () of [] => () |
|
225 |
| _ => (wait "join"; passive_join ())); |
|
226 |
||
227 |
(*proper worker thread -- actively work towards results*) |
|
228 |
fun active_join () = (*requires SYNCHRONIZED*) |
|
229 |
(case unfinished () of [] => () |
|
230 |
| tasks => |
|
231 |
(case change_result queue (TaskQueue.dequeue_towards tasks) of |
|
232 |
NONE => (worker_wait "join"; active_join ()) |
|
233 |
| SOME work => (execute "join" work; active_join ()))); |
|
234 |
||
235 |
val _ = |
|
236 |
(case thread_data () of |
|
28192 | 237 |
NONE => SYNCHRONIZED "join" passive_join |
238 |
| SOME (task, _) => SYNCHRONIZED "join" (fn () => |
|
28186 | 239 |
(change queue (TaskQueue.depend (unfinished ()) task); active_join ()))); |
240 |
||
241 |
val res = xs |> map (fn Future {result = ref (SOME res), ...} => res); |
|
242 |
in |
|
243 |
(case get_first (fn Exn.Exn Interrupt => NONE | Exn.Exn e => SOME e | _ => NONE) res of |
|
244 |
NONE => map Exn.release res |
|
245 |
| SOME e => raise e) |
|
246 |
end; |
|
247 |
||
248 |
fun join x = singleton join_all x; |
|
28156 | 249 |
|
28191 | 250 |
|
251 |
(* termination *) |
|
252 |
||
253 |
(*cancel: present and future group members will be interrupted eventually*) |
|
254 |
fun cancel x = (scheduler_check (); cancel_request (group_of x)); |
|
255 |
||
256 |
(*interrupt: adhoc signal, permissive, may get ignored*) |
|
28192 | 257 |
fun interrupt_task id = SYNCHRONIZED "interrupt" (fn () => TaskQueue.interrupt (! queue) id); |
28191 | 258 |
|
28156 | 259 |
end; |