77 local |
80 local |
78 val lock = Mutex.mutex (); |
81 val lock = Mutex.mutex (); |
79 val cond = ConditionVar.conditionVar (); |
82 val cond = ConditionVar.conditionVar (); |
80 in |
83 in |
81 |
84 |
82 fun SYNCHRONIZED e = uninterruptible (fn restore_attributes => fn () => |
85 fun SYNCHRONIZED name e = uninterruptible (fn restore_attributes => fn () => |
83 let |
86 let |
|
87 val _ = Multithreading.tracing 4 (fn () => name ^ ": locking"); |
84 val _ = Mutex.lock lock; |
88 val _ = Mutex.lock lock; |
|
89 val _ = Multithreading.tracing 4 (fn () => name ^ ": locked"); |
85 val result = Exn.capture (restore_attributes e) (); |
90 val result = Exn.capture (restore_attributes e) (); |
86 val _ = Mutex.unlock lock; |
91 val _ = Mutex.unlock lock; |
|
92 val _ = Multithreading.tracing 4 (fn () => name ^ ": unlocked"); |
87 in Exn.release result end) (); |
93 in Exn.release result end) (); |
88 |
94 |
89 fun wait name = (*requires SYNCHRONIZED*) |
95 fun wait name = (*requires SYNCHRONIZED*) |
90 let |
96 let |
91 val _ = Multithreading.tracing 4 (fn () => name ^ " : waiting"); |
97 val _ = Multithreading.tracing 4 (fn () => name ^ ": waiting"); |
92 val _ = ConditionVar.wait (cond, lock); |
98 val _ = ConditionVar.wait (cond, lock); |
93 val _ = Multithreading.tracing 4 (fn () => name ^ " : notified"); |
99 val _ = Multithreading.tracing 4 (fn () => name ^ ": notified"); |
94 in () end; |
100 in () end; |
95 |
101 |
96 fun notify_all () = (*requires SYNCHRONIZED*) |
102 fun notify_all () = (*requires SYNCHRONIZED*) |
97 ConditionVar.broadcast cond; |
103 ConditionVar.broadcast cond; |
98 |
104 |
106 val _ = set_thread_data (SOME (task, group)); |
112 val _ = set_thread_data (SOME (task, group)); |
107 val _ = Multithreading.tracing 4 (fn () => name ^ ": running"); |
113 val _ = Multithreading.tracing 4 (fn () => name ^ ": running"); |
108 val ok = run (); |
114 val ok = run (); |
109 val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); |
115 val _ = Multithreading.tracing 4 (fn () => name ^ ": finished"); |
110 val _ = set_thread_data NONE; |
116 val _ = set_thread_data NONE; |
111 val _ = SYNCHRONIZED (fn () => |
117 val _ = SYNCHRONIZED "execute" (fn () => |
112 (change queue (TaskQueue.finish task); |
118 (change queue (TaskQueue.finish task); |
113 if ok then () |
119 if ok then () |
114 else if TaskQueue.cancel (! queue) group then () |
120 else if TaskQueue.cancel (! queue) group then () |
115 else cancel_request group; |
121 else cancel_request group; |
116 notify_all ())); |
122 notify_all ())); |
117 in () end; |
123 in () end; |
118 |
124 |
119 |
125 |
120 (* worker threads *) |
126 (* worker threads *) |
121 |
127 |
122 fun change_active b = (*requires SYNCHRONIZED*) |
128 fun change_active active = (*requires SYNCHRONIZED*) |
123 (change active (fn n => if b then n + 1 else n - 1); trace_active ()); |
129 (change workers (AList.update Thread.equal (Thread.self (), active)); trace_active ()); |
124 |
130 |
125 fun worker_wait name = (*requires SYNCHRONIZED*) |
131 fun worker_wait name = (*requires SYNCHRONIZED*) |
126 (change_active false; wait name; change_active true); |
132 (change_active false; wait name; change_active true); |
127 |
133 |
128 fun worker_next name = (*requires SYNCHRONIZED*) |
134 fun worker_next name = (*requires SYNCHRONIZED*) |
129 if ! excessive > 0 then |
135 if ! excessive > 0 then |
130 (dec excessive; |
136 (dec excessive; |
131 change_active false; |
137 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ()))); |
132 change workers (remove Thread.equal (Thread.self ())); |
|
133 NONE) |
138 NONE) |
134 else |
139 else |
135 (case change_result queue TaskQueue.dequeue of |
140 (case change_result queue TaskQueue.dequeue of |
136 NONE => (worker_wait name; worker_next name) |
141 NONE => (worker_wait name; worker_next name) |
137 | some => some); |
142 | some => some); |
138 |
143 |
139 fun worker_loop name = |
144 fun worker_loop name = |
140 (case SYNCHRONIZED (fn () => worker_next name) of |
145 (case SYNCHRONIZED name (fn () => worker_next name) of |
141 NONE => () |
146 NONE => () |
142 | SOME work => (execute name work; worker_loop name)); |
147 | SOME work => (execute name work; worker_loop name)); |
143 |
148 |
144 fun worker_start name = (*requires SYNCHRONIZED*) |
149 fun worker_start name = (*requires SYNCHRONIZED*) |
145 (change_active true; |
150 change workers |
146 change workers (cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts)))); |
151 (cons (Thread.fork (fn () => worker_loop name, Multithreading.no_interrupts), true)); |
147 |
152 |
148 |
153 |
149 (* scheduler *) |
154 (* scheduler *) |
150 |
155 |
151 fun scheduler_fork shutdown = SYNCHRONIZED (fn () => |
156 fun scheduler_fork shutdown = SYNCHRONIZED "scheduler_fork" (fn () => |
152 let |
157 let |
153 val _ = trace_active (); |
158 val _ = trace_active (); |
154 val _ = |
159 val _ = |
155 (case List.partition Thread.isActive (! workers) of |
160 (case List.partition (Thread.isActive o #1) (! workers) of |
156 (_, []) => () |
161 (_, []) => () |
157 | (active, inactive) => |
162 | (active, inactive) => |
158 (workers := active; Multithreading.tracing 0 (fn () => |
163 (workers := active; Multithreading.tracing 0 (fn () => |
159 "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " inactive worker threads"))); |
164 "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads"))); |
160 |
165 |
161 val m = if shutdown then 0 else Multithreading.max_threads_value (); |
166 val m = if shutdown then 0 else Multithreading.max_threads_value (); |
162 val l = length (! workers); |
167 val l = length (! workers); |
163 val _ = excessive := l - m; |
168 val _ = excessive := l - m; |
164 val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1); |
169 val _ = List.app (fn i => worker_start ("worker " ^ string_of_int i)) (l upto m - 1); |
165 in null (! workers) end); |
170 val _ = if shutdown then notify_all () else (); |
|
171 in shutdown andalso null (! workers) end); |
166 |
172 |
167 fun scheduler_loop (shutdown, canceled) = |
173 fun scheduler_loop (shutdown, canceled) = |
168 if scheduler_fork shutdown then () |
174 if scheduler_fork shutdown then () |
169 else |
175 else |
170 let val canceled' = SYNCHRONIZED (fn () => filter_out (TaskQueue.cancel (! queue)) canceled) in |
176 let |
|
177 val canceled' = SYNCHRONIZED "scheduler" |
|
178 (fn () => filter_out (TaskQueue.cancel (! queue)) canceled); |
|
179 in |
171 (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of |
180 (case Mailbox.receive_timeout (Time.fromSeconds 1) requests of |
172 SOME Shutdown => scheduler_loop (true, canceled') |
181 SOME Shutdown => scheduler_loop (true, canceled') |
173 | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled') |
182 | SOME (Cancel group) => scheduler_loop (shutdown, group :: canceled') |
174 | NONE => scheduler_loop (shutdown, canceled')) |
183 | NONE => scheduler_loop (shutdown, canceled')) |
175 end; |
184 end; |
176 |
185 |
177 fun scheduler_check () = SYNCHRONIZED (fn () => |
186 fun scheduler_check () = SYNCHRONIZED "scheduler_check" (fn () => |
178 if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () |
187 if (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread) then () |
179 else scheduler := |
188 else scheduler := |
180 SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts))); |
189 SOME (Thread.fork (fn () => scheduler_loop (false, []), Multithreading.no_interrupts))); |
181 |
190 |
182 |
191 |