author | wenzelm |
Tue, 09 Sep 2008 16:29:34 +0200 | |
changeset 28177 | 8c0335bc9336 |
parent 28170 | a18cf8a0e656 |
child 28186 | 6a8417f36837 |
permissions | -rw-r--r-- |
28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
5 |
Functional threads as future values. |
|
6 |
*) |
|
7 |
||
8 |
signature FUTURE = |
|
9 |
sig |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
10 |
type task = TaskQueue.task |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
11 |
type group = TaskQueue.group |
28156 | 12 |
type 'a T |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
13 |
val task_of: 'a T -> task |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
14 |
val group_of: 'a T -> group |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
15 |
val shutdown_request: unit -> unit |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
16 |
val cancel: 'a T -> unit |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
17 |
val future: bool -> task list -> (unit -> 'a) -> 'a T |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
18 |
val fork: (unit -> 'a) -> 'a T |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
19 |
val join: 'a T -> 'a |
28156 | 20 |
end; |
21 |
||
22 |
structure Future: FUTURE = |
|
23 |
struct |
|
24 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
25 |
(** future values **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
26 |
|
28167 | 27 |
(* identifiers *) |
28 |
||
29 |
type task = TaskQueue.task; |
|
30 |
type group = TaskQueue.group; |
|
31 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
32 |
local val tag = Universal.tag () : (task * group) option Universal.tag in |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
33 |
fun thread_data () = the_default NONE (Thread.getLocal tag); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
34 |
fun set_thread_data x = Thread.setLocal (tag, x); |
28167 | 35 |
end; |
36 |
||
37 |
||
38 |
(* datatype future *) |
|
39 |
||
40 |
datatype 'a T = Future of |
|
41 |
{task: task, |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
42 |
group: group, |
28167 | 43 |
result: 'a Exn.result option ref}; |
44 |
||
45 |
fun task_of (Future {task, ...}) = task; |
|
46 |
fun group_of (Future {group, ...}) = group; |
|
47 |
||
48 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
49 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
50 |
(** scheduling **) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
51 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
52 |
(* global state *) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
53 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
54 |
val queue = ref TaskQueue.empty; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
55 |
val workers = ref ([]: Thread.thread list); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
56 |
val scheduler = ref (NONE: Thread.thread option); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
57 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
58 |
val excessive = ref 0; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
59 |
val active = ref 0; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
60 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
61 |
fun trace_active () = |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
62 |
Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int (! active) ^ " active"); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
63 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
64 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
65 |
(* requests *) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
66 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
67 |
datatype request = Shutdown | Cancel of group; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
68 |
val requests = Mailbox.create () : request Mailbox.T; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
69 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
70 |
fun shutdown_request () = Mailbox.send requests Shutdown; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
71 |
fun cancel_request group = Mailbox.send requests (Cancel group); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
72 |
fun cancel x = cancel_request (group_of x); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
73 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
74 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
75 |
(* synchronization *) |
28156 | 76 |
|
77 |
local |
|
78 |
val lock = Mutex.mutex (); |
|
79 |
val cond = ConditionVar.conditionVar (); |
|
80 |
in |
|
81 |
||
28162 | 82 |
fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () => |
83 |
let |
|
84 |
val _ = Mutex.lock lock; |
|
85 |
val result = Exn.capture (restore_attributes e) (); |
|
86 |
val _ = Mutex.unlock lock; |
|
87 |
in Exn.release result end) (); |
|
28156 | 88 |
|
28167 | 89 |
fun wait name = (*requires SYNCHRONIZED*) |
90 |
let |
|
91 |
val _ = Multithreading.tracing 4 (fn () => name ^ " : waiting"); |
|
92 |
val _ = ConditionVar.wait (cond, lock); |
|
93 |
val _ = Multithreading.tracing 4 (fn () => name ^ " : notified"); |
|
94 |
in () end; |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
95 |
|
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
96 |
fun notify_all () = (*requires SYNCHRONIZED*) |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
97 |
ConditionVar.broadcast cond; |
28156 | 98 |
|
99 |
end; |
|
100 |
||
101 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
102 |
(* execute *) |
28156 | 103 |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
104 |
fun cancel_group group = (*requires SYNCHRONIZED*) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
105 |
(case change_result queue (TaskQueue.cancel group) of |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
106 |
[] => true |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
107 |
| running => (List.app (fn t => Thread.interrupt t handle Thread _ => ()) running; false)); |
28156 | 108 |
|
28167 | 109 |
fun execute name (task, group, run) = |
110 |
let |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
111 |
val _ = set_thread_data (SOME (task, group)); |
28167 | 112 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": running"); |
113 |
val ok = run (); |
|
114 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
115 |
val _ = set_thread_data NONE; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
116 |
val _ = SYNCHRONIZED (fn () => |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
117 |
(change queue (TaskQueue.finish task); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
118 |
if ok then () else if cancel_group group then () else cancel_request group; |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
119 |
notify_all ())); |
28167 | 120 |
in () end; |
121 |
||
122 |
||
123 |
(* worker threads *) |
|
124 |
||
28162 | 125 |
fun change_active b = (*requires SYNCHRONIZED*) |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
126 |
(change active (fn n => if b then n + 1 else n - 1); trace_active ()); |
28162 | 127 |
|
28167 | 128 |
fun worker_next name = (*requires SYNCHRONIZED*) |
129 |
if ! excessive > 0 then |
|
130 |
(dec excessive; |
|
131 |
change_active false; |
|
132 |
change workers (remove Thread.equal (Thread.self ())); |
|
133 |
NONE) |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
134 |
else |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
135 |
(case change_result queue (TaskQueue.dequeue (Thread.self ())) of |
28167 | 136 |
NONE => (change_active false; wait name; change_active true; worker_next name) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
137 |
| some => some); |
28156 | 138 |
|
28167 | 139 |
fun worker_loop name = |
140 |
(case SYNCHRONIZED (fn () => worker_next name) of |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
141 |
NONE => () |
28167 | 142 |
| SOME work => (execute name work; worker_loop name)); |
28156 | 143 |
|
28167 | 144 |
fun worker_start name = (*requires SYNCHRONIZED*) |
28156 | 145 |
(change_active true; |
28167 | 146 |
change workers (cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts)))); |
28156 | 147 |
|
148 |
||
149 |
(* scheduler *) |
|
150 |
||
28167 | 151 |
fun scheduler_fork () = SYNCHRONIZED (fn () => |
28156 | 152 |
let |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
153 |
val _ = trace_active (); |
28156 | 154 |
val m = Multithreading.max_threads_value (); |
28167 | 155 |
val l = length (! workers); |
156 |
val _ = excessive := l - m; |
|
157 |
in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end); |
|
158 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
159 |
fun scheduler_loop canceled = |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
160 |
let |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
161 |
val canceled' = SYNCHRONIZED (fn () => filter_out cancel_group canceled); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
162 |
val _ = scheduler_fork (); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
163 |
in |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
164 |
(case Mailbox.receive_timeout (Time.fromSeconds 1) requests of |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
165 |
SOME Shutdown => () (* FIXME proper worker shutdown *) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
166 |
| SOME (Cancel group) => scheduler_loop (group :: canceled') |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
167 |
| NONE => scheduler_loop canceled') |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
168 |
end; |
28156 | 169 |
|
170 |
fun check_scheduler () = SYNCHRONIZED (fn () => |
|
28167 | 171 |
if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
172 |
else scheduler := SOME (Thread.fork (fn () => scheduler_loop [], Multithreading.no_interrupts))); |
28156 | 173 |
|
174 |
||
175 |
(* future values *) |
|
176 |
||
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
177 |
fun future new_group deps (e: unit -> 'a) = |
28156 | 178 |
let |
179 |
val _ = check_scheduler (); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
180 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
181 |
val group = |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
182 |
(case (new_group, thread_data ()) of |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
183 |
(false, SOME (_, group)) => group |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
184 |
| _ => TaskQueue.new_group ()); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
185 |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
186 |
val result = ref (NONE: 'a Exn.result option); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
187 |
val run = Multithreading.with_attributes (Thread.getAttributes ()) |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
188 |
(fn _ => fn ok => |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
189 |
let val res = if ok then Exn.capture e () else Exn.Exn Interrupt |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
190 |
in result := SOME res; is_some (Exn.get_result res) end); |
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
191 |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
192 |
val task = SYNCHRONIZED (fn () => |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
193 |
change_result queue (TaskQueue.enqueue group deps run) before notify_all ()); |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
194 |
in Future {task = task, group = group, result = result} end; |
28162 | 195 |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
196 |
fun fork e = future false [] e; |
28162 | 197 |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
198 |
fun join (Future {result, ...}) = |
28156 | 199 |
let |
200 |
val _ = check_scheduler (); |
|
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
201 |
|
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
202 |
fun passive_loop () = |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
203 |
(case ! result of |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
204 |
NONE => (wait "join"; passive_loop ()) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
205 |
| SOME res => res); |
28177
8c0335bc9336
inherit group from running thread, or create a new one -- make it harder to re-use canceled groups;
wenzelm
parents:
28170
diff
changeset
|
206 |
in Exn.release (SYNCHRONIZED passive_loop) end; |
28156 | 207 |
|
208 |
end; |