132 def build_heap(session: String): Boolean = |
132 def build_heap(session: String): Boolean = |
133 Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session) |
133 Sessions.is_pure(session) || !sessions_structure.build_graph.is_maximal(session) |
134 } |
134 } |
135 |
135 |
136 |
136 |
137 /* queue with scheduling information */ |
|
138 |
|
139 private object Queue { |
|
140 def apply(build_context: Build_Process.Context): Queue = { |
|
141 val build_graph = build_context.sessions_structure.build_graph |
|
142 val build_order = SortedSet.from(build_graph.keys)(build_context.ordering) |
|
143 new Queue(build_graph, build_order) |
|
144 } |
|
145 } |
|
146 |
|
147 private class Queue( |
|
148 build_graph: Graph[String, Sessions.Info], |
|
149 build_order: SortedSet[String] |
|
150 ) { |
|
151 def is_empty: Boolean = build_graph.is_empty |
|
152 |
|
153 def - (name: String): Queue = |
|
154 new Queue(build_graph.del_node(name), build_order - name) |
|
155 |
|
156 def dequeue(skip: String => Boolean): Option[String] = |
|
157 build_order.iterator.dropWhile(name => skip(name) || !build_graph.is_minimal(name)) |
|
158 .nextOption() |
|
159 } |
|
160 |
|
161 |
|
162 /* main */ |
137 /* main */ |
163 |
138 |
164 private def session_finished(session_name: String, process_result: Process_Result): String = |
139 private def session_finished(session_name: String, process_result: Process_Result): String = |
165 "Finished " + session_name + " (" + process_result.timing.message_resources + ")" |
140 "Finished " + session_name + " (" + process_result.timing.message_resources + ")" |
166 |
141 |
204 case "" => No_Logger |
179 case "" => No_Logger |
205 case "-" => Logger.make(progress) |
180 case "-" => Logger.make(progress) |
206 case log_file => Logger.make(Some(Path.explode(log_file))) |
181 case log_file => Logger.make(Some(Path.explode(log_file))) |
207 } |
182 } |
208 |
183 |
|
184 // global state |
209 val numa_nodes = new NUMA.Nodes(numa_shuffling) |
185 val numa_nodes = new NUMA.Nodes(numa_shuffling) |
210 |
186 var build_graph = build_context.sessions_structure.build_graph |
211 @tailrec def loop( |
187 var build_order = SortedSet.from(build_graph.keys)(build_context.ordering) |
212 pending: Queue, |
188 var running = Map.empty[String, (SHA1.Shasum, Build_Job)] |
213 running: Map[String, (SHA1.Shasum, Build_Job)], |
189 var results = Map.empty[String, Result] |
214 results: Map[String, Result] |
190 |
215 ): Map[String, Result] = { |
191 def remove_pending(name: String): Unit = { |
216 def used_node(i: Int): Boolean = |
192 build_graph = build_graph.del_node(name) |
217 running.iterator.exists( |
193 build_order = build_order - name |
218 { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i }) |
194 } |
219 |
195 |
220 if (pending.is_empty) results |
196 def next_pending(): Option[String] = |
221 else { |
197 build_order.iterator |
|
198 .dropWhile(name => running.isDefinedAt(name) || !build_graph.is_minimal(name)) |
|
199 .nextOption() |
|
200 |
|
201 def used_node(i: Int): Boolean = |
|
202 running.iterator.exists( |
|
203 { case (_, (_, job)) => job.numa_node.isDefined && job.numa_node.get == i }) |
|
204 |
|
205 @tailrec def loop(): Unit = { |
|
206 if (!build_graph.is_empty) { |
222 if (progress.stopped) { |
207 if (progress.stopped) { |
223 for ((_, (_, job)) <- running) job.terminate() |
208 for ((_, (_, job)) <- running) job.terminate() |
224 } |
209 } |
225 |
210 |
226 running.find({ case (_, (_, job)) => job.is_finished }) match { |
211 running.find({ case (_, (_, job)) => job.is_finished }) match { |
277 else { |
262 else { |
278 progress.echo(session_name + " FAILED") |
263 progress.echo(session_name + " FAILED") |
279 if (!process_result.interrupted) progress.echo(process_result_tail.out) |
264 if (!process_result.interrupted) progress.echo(process_result_tail.out) |
280 } |
265 } |
281 |
266 |
282 loop(pending - session_name, running - session_name, |
267 remove_pending(session_name) |
283 results + (session_name -> Result(false, output_heap, process_result_tail))) |
268 running -= session_name |
|
269 results += (session_name -> Result(false, output_heap, process_result_tail)) |
|
270 loop() |
284 //}}} |
271 //}}} |
285 case None if running.size < (max_jobs max 1) => |
272 case None if running.size < (max_jobs max 1) => |
286 //{{{ check/start next job |
273 //{{{ check/start next job |
287 pending.dequeue(running.isDefinedAt) match { |
274 next_pending() match { |
288 case Some(session_name) => |
275 case Some(session_name) => |
289 val ancestor_results = |
276 val ancestor_results = |
290 build_deps.sessions_structure.build_requirements(List(session_name)). |
277 build_deps.sessions_structure.build_requirements(List(session_name)). |
291 filterNot(_ == session_name).map(results(_)) |
278 filterNot(_ == session_name).map(results(_)) |
292 val input_heaps = |
279 val input_heaps = |
316 } |
303 } |
317 } |
304 } |
318 val all_current = current && ancestor_results.forall(_.current) |
305 val all_current = current && ancestor_results.forall(_.current) |
319 |
306 |
320 if (all_current) { |
307 if (all_current) { |
321 loop(pending - session_name, running, |
308 remove_pending(session_name) |
322 results + (session_name -> Result(true, output_heap, Process_Result.ok))) |
309 results += (session_name -> Result(true, output_heap, Process_Result.ok)) |
|
310 loop() |
323 } |
311 } |
324 else if (no_build) { |
312 else if (no_build) { |
325 progress.echo_if(verbose, "Skipping " + session_name + " ...") |
313 progress.echo_if(verbose, "Skipping " + session_name + " ...") |
326 loop(pending - session_name, running, |
314 remove_pending(session_name) |
327 results + (session_name -> Result(false, output_heap, Process_Result.error))) |
315 results += (session_name -> Result(false, output_heap, Process_Result.error)) |
|
316 loop() |
328 } |
317 } |
329 else if (ancestor_results.forall(_.ok) && !progress.stopped) { |
318 else if (ancestor_results.forall(_.ok) && !progress.stopped) { |
330 progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...") |
319 progress.echo((if (do_store) "Building " else "Running ") + session_name + " ...") |
331 |
320 |
332 store.clean_output(session_name) |
321 store.clean_output(session_name) |
340 |
329 |
341 val numa_node = numa_nodes.next(used_node) |
330 val numa_node = numa_nodes.next(used_node) |
342 val job = |
331 val job = |
343 new Build_Job(progress, session_background, store, do_store, |
332 new Build_Job(progress, session_background, store, do_store, |
344 resources, session_setup, numa_node) |
333 resources, session_setup, numa_node) |
345 loop(pending, running + (session_name -> (input_heaps, job)), results) |
334 running += (session_name -> (input_heaps, job)) |
|
335 loop() |
346 } |
336 } |
347 else { |
337 else { |
348 progress.echo(session_name + " CANCELLED") |
338 progress.echo(session_name + " CANCELLED") |
349 loop(pending - session_name, running, |
339 remove_pending(session_name) |
350 results + (session_name -> Result(false, output_heap, Process_Result.undefined))) |
340 results += (session_name -> Result(false, output_heap, Process_Result.undefined)) |
351 } |
341 loop() |
352 case None => sleep(); loop(pending, running, results) |
342 } |
|
343 case None => sleep(); loop() |
353 } |
344 } |
354 ///}}} |
345 ///}}} |
355 case None => sleep(); loop(pending, running, results) |
346 case None => sleep(); loop() |
356 } |
347 } |
357 } |
348 } |
358 } |
349 } |
359 |
350 |
360 loop(Queue(build_context), Map.empty, Map.empty) |
351 loop() |
|
352 results |
361 } |
353 } |
362 } |
354 } |