author | wenzelm |
Mon, 08 Sep 2008 20:35:38 +0200 | |
changeset 28170 | a18cf8a0e656 |
parent 28167 | 27e2ca41b58c |
child 28177 | 8c0335bc9336 |
permissions | -rw-r--r-- |
28156 | 1 |
(* Title: Pure/Concurrent/future.ML |
2 |
ID: $Id$ |
|
3 |
Author: Makarius |
|
4 |
||
5 |
Functional threads as future values. |
|
6 |
*) |
|
7 |
||
8 |
signature FUTURE = |
|
9 |
sig |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
10 |
type task = TaskQueue.task |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
11 |
type group = TaskQueue.group |
28156 | 12 |
type 'a T |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
13 |
val task_of: 'a T -> task |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
14 |
val group_of: 'a T -> group option |
28167 | 15 |
val interrupt_task: task -> unit |
16 |
val interrupt_group: group -> unit |
|
17 |
val interrupt_task_group: task -> unit |
|
18 |
val interrupt: 'a T -> unit |
|
19 |
val shutdown: unit -> unit |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
20 |
val future: group option -> task list -> (unit -> 'a) -> 'a T |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
21 |
val fork: (unit -> 'a) -> 'a T |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
22 |
val join: 'a T -> 'a |
28156 | 23 |
end; |
24 |
||
25 |
structure Future: FUTURE = |
|
26 |
struct |
|
27 |
||
28167 | 28 |
(* identifiers *) |
29 |
||
30 |
type task = TaskQueue.task; |
|
31 |
type group = TaskQueue.group; |
|
32 |
||
33 |
local val tag = Universal.tag () : task option Universal.tag in |
|
34 |
fun get_task () = the_default NONE (Thread.getLocal tag); |
|
35 |
fun set_task x = Thread.setLocal (tag, x); |
|
36 |
end; |
|
37 |
||
38 |
local val tag = Universal.tag () : group option Universal.tag in |
|
39 |
fun get_group () = the_default NONE (Thread.getLocal tag); |
|
40 |
fun set_group x = Thread.setLocal (tag, x); |
|
41 |
end; |
|
42 |
||
43 |
||
44 |
(* datatype future *) |
|
45 |
||
46 |
datatype 'a T = Future of |
|
47 |
{task: task, |
|
48 |
group: group option, |
|
49 |
result: 'a Exn.result option ref}; |
|
50 |
||
51 |
fun task_of (Future {task, ...}) = task; |
|
52 |
fun group_of (Future {group, ...}) = group; |
|
53 |
||
54 |
||
28156 | 55 |
(* synchronized execution *) |
56 |
||
57 |
local |
|
58 |
val lock = Mutex.mutex (); |
|
59 |
val cond = ConditionVar.conditionVar (); |
|
60 |
in |
|
61 |
||
28162 | 62 |
fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () => |
63 |
let |
|
64 |
val _ = Mutex.lock lock; |
|
65 |
val result = Exn.capture (restore_attributes e) (); |
|
66 |
val _ = Mutex.unlock lock; |
|
67 |
in Exn.release result end) (); |
|
28156 | 68 |
|
28167 | 69 |
fun wait name = (*requires SYNCHRONIZED*) |
70 |
let |
|
71 |
val _ = Multithreading.tracing 4 (fn () => name ^ " : waiting"); |
|
72 |
val _ = ConditionVar.wait (cond, lock); |
|
73 |
val _ = Multithreading.tracing 4 (fn () => name ^ " : notified"); |
|
74 |
in () end; |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
75 |
|
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
76 |
fun notify_all () = (*requires SYNCHRONIZED*) |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
77 |
ConditionVar.broadcast cond; |
28156 | 78 |
|
79 |
end; |
|
80 |
||
81 |
||
28167 | 82 |
(** scheduling **) |
28156 | 83 |
|
28167 | 84 |
datatype request = Shutdown | CancelGroup of group; |
85 |
val requests = Mailbox.create () : request Mailbox.T; |
|
28156 | 86 |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
87 |
val queue = ref TaskQueue.empty; |
28156 | 88 |
val scheduler = ref (NONE: Thread.thread option); |
89 |
val workers = ref ([]: Thread.thread list); |
|
90 |
||
91 |
||
28167 | 92 |
(* signals *) |
93 |
||
94 |
fun interrupt_task x = SYNCHRONIZED (fn () => TaskQueue.interrupt_task (! queue) x); |
|
95 |
fun interrupt_group x = SYNCHRONIZED (fn () => TaskQueue.interrupt_group (! queue) x); |
|
96 |
fun interrupt_task_group x = SYNCHRONIZED (fn () => TaskQueue.interrupt_task_group (! queue) x); |
|
97 |
||
98 |
fun interrupt (Future {task, ...}) = interrupt_task_group task; |
|
99 |
||
28170 | 100 |
fun shutdown () = Mailbox.send requests Shutdown; |
28167 | 101 |
|
102 |
||
103 |
(* execute *) |
|
28156 | 104 |
|
28167 | 105 |
fun execute name (task, group, run) = |
106 |
let |
|
107 |
val _ = set_task (SOME task); |
|
108 |
val _ = set_group group; |
|
109 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": running"); |
|
110 |
val ok = run (); |
|
111 |
val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); |
|
112 |
val _ = set_task NONE; |
|
113 |
val _ = set_group NONE; |
|
114 |
val _ = SYNCHRONIZED (fn () => (change queue (TaskQueue.finished task); notify_all ())); |
|
28170 | 115 |
val _ = (case (ok, group) of (false, SOME g) => Mailbox.send requests (CancelGroup g) | _ => ()); |
28167 | 116 |
in () end; |
117 |
||
118 |
||
119 |
(* worker threads *) |
|
120 |
||
121 |
val excessive = ref 0; |
|
122 |
val active = ref 0; |
|
28162 | 123 |
|
124 |
fun change_active b = (*requires SYNCHRONIZED*) |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
125 |
(change active (fn n => if b then n + 1 else n - 1); |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
126 |
Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ string_of_int (! active) ^ " active")); |
28162 | 127 |
|
28167 | 128 |
fun worker_next name = (*requires SYNCHRONIZED*) |
129 |
if ! excessive > 0 then |
|
130 |
(dec excessive; |
|
131 |
change_active false; |
|
132 |
change workers (remove Thread.equal (Thread.self ())); |
|
133 |
NONE) |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
134 |
else |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
135 |
(case change_result queue (TaskQueue.dequeue (Thread.self ())) of |
28167 | 136 |
NONE => (change_active false; wait name; change_active true; worker_next name) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
137 |
| some => some); |
28156 | 138 |
|
28167 | 139 |
fun worker_loop name = |
140 |
(case SYNCHRONIZED (fn () => worker_next name) of |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
141 |
NONE => () |
28167 | 142 |
| SOME work => (execute name work; worker_loop name)); |
28156 | 143 |
|
28167 | 144 |
fun worker_start name = (*requires SYNCHRONIZED*) |
28156 | 145 |
(change_active true; |
28167 | 146 |
change workers (cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts)))); |
28156 | 147 |
|
148 |
||
149 |
(* scheduler *) |
|
150 |
||
28167 | 151 |
fun scheduler_fork () = SYNCHRONIZED (fn () => |
28156 | 152 |
let |
153 |
val m = Multithreading.max_threads_value (); |
|
28167 | 154 |
val l = length (! workers); |
155 |
val _ = excessive := l - m; |
|
156 |
in List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1) end); |
|
157 |
||
158 |
fun scheduler_loop () = |
|
159 |
(scheduler_fork (); |
|
160 |
(case Mailbox.receive_timeout (Time.fromMilliseconds 300) requests of |
|
161 |
SOME Shutdown => () (* FIXME *) |
|
162 |
| SOME (CancelGroup group) => (interrupt_group group; scheduler_loop ()) (* FIXME *) |
|
163 |
| NONE => scheduler_loop ())); |
|
28156 | 164 |
|
165 |
fun check_scheduler () = SYNCHRONIZED (fn () => |
|
28167 | 166 |
if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () |
167 |
else scheduler := SOME (Thread.fork (scheduler_loop, Multithreading.no_interrupts))); |
|
28156 | 168 |
|
169 |
||
170 |
(* future values *) |
|
171 |
||
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
172 |
fun future group deps (e: unit -> 'a) = |
28156 | 173 |
let |
174 |
val _ = check_scheduler (); |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
175 |
val result = ref (NONE: 'a Exn.result option); |
28167 | 176 |
val run = Multithreading.with_attributes (Thread.getAttributes ()) (fn _ => fn () => |
177 |
let val res = Exn.capture e () in result := SOME res; is_some (Exn.get_result res) end); |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
178 |
val task = SYNCHRONIZED (fn () => |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
179 |
change_result queue (TaskQueue.enqueue group deps run) before notify_all ()); |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
180 |
in Future {task = task, group = group, result = result} end; |
28162 | 181 |
|
28167 | 182 |
fun fork e = future (get_group ()) [] e; |
28162 | 183 |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
184 |
fun join (Future {result, ...}) = |
28156 | 185 |
let |
186 |
val _ = check_scheduler (); |
|
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
187 |
fun loop () = |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
188 |
(case ! result of |
28167 | 189 |
NONE => (wait "join"; loop ()) |
28166
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
190 |
| SOME res => res); |
43087721a66e
moved task, thread_data, group, queue to task_queue.ML;
wenzelm
parents:
28163
diff
changeset
|
191 |
in Exn.release (SYNCHRONIZED loop) end; |
28156 | 192 |
|
193 |
end; |