304 } |
304 } |
305 |
305 |
306 def finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty |
306 def finished: Boolean = build_state.pending.isEmpty && build_state.running.isEmpty |
307 } |
307 } |
308 |
308 |
309 abstract class Scheduler { |
309 abstract class Scheduler(timing_data: Timing_Data) { |
310 def ready_jobs(state: Build_Process.State): Build_Process.State.Pending = |
310 def ready_jobs(state: Build_Process.State): Build_Process.State.Pending = |
311 state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name)) |
311 state.pending.filter(entry => entry.is_ready && !state.is_running(entry.name)) |
312 |
312 |
313 def next(timing: Timing_Data, state: Build_Process.State): List[Config] |
313 def next(state: Build_Process.State): List[Config] |
314 |
314 |
315 def build_duration(timing_data: Timing_Data, build_state: Build_Process.State): Time = { |
315 def build_duration(build_state: Build_Process.State): Time = { |
316 @tailrec |
316 @tailrec |
317 def simulate(state: State): State = |
317 def simulate(state: State): State = |
318 if (state.finished) state |
318 if (state.finished) state |
319 else { |
319 else { |
320 val state1 = |
320 val state1 = |
321 next(timing_data, state.build_state).foldLeft(state)(_.start(_)).step(timing_data) |
321 next(state.build_state).foldLeft(state)(_.start(_)).step(timing_data) |
322 simulate(state1) |
322 simulate(state1) |
323 } |
323 } |
324 |
324 |
325 val start = Time.now() |
325 val start = Time.now() |
326 simulate(State(build_state, start)).current_time - start |
326 simulate(State(build_state, start)).current_time - start |
328 } |
328 } |
329 |
329 |
330 |
330 |
331 /* heuristics */ |
331 /* heuristics */ |
332 |
332 |
333 class Timing_Heuristic(threshold: Time) extends Scheduler { |
333 class Timing_Heuristic(threshold: Time, timing_data: Timing_Data) extends Scheduler(timing_data) { |
334 def next(timing_data: Timing_Data, state: Build_Process.State): List[Config] = { |
334 def next(state: Build_Process.State): List[Config] = { |
335 val host_infos = timing_data.host_infos |
335 val host_infos = timing_data.host_infos |
336 val resources = host_infos.available(state) |
336 val resources = host_infos.available(state) |
337 |
337 |
338 def best_threads(task: Build_Process.Task): Int = |
338 def best_threads(task: Build_Process.Task): Int = |
339 timing_data.best_threads(task.name).getOrElse( |
339 timing_data.best_threads(task.name).getOrElse( |
376 configs1 ::: configs2 |
376 configs1 ::: configs2 |
377 } |
377 } |
378 } |
378 } |
379 } |
379 } |
380 |
380 |
381 class Meta_Heuristic(schedulers: List[Scheduler]) extends Scheduler { |
381 class Meta_Heuristic(schedulers: List[Scheduler], timing_data: Timing_Data) |
|
382 extends Scheduler(timing_data) { |
382 require(schedulers.nonEmpty) |
383 require(schedulers.nonEmpty) |
383 |
384 |
384 def next(timing_data: Timing_Data, state: Build_Process.State): List[Config] = { |
385 def next(state: Build_Process.State): List[Config] = { |
385 val (best, _) = schedulers.map(h => h -> h.build_duration(timing_data, state)).minBy(_._2.ms) |
386 val (best, _) = schedulers.map(h => h -> h.build_duration(state)).minBy(_._2.ms) |
386 best.next(timing_data, state) |
387 best.next(state) |
387 } |
388 } |
388 } |
389 } |
389 |
390 |
390 |
391 |
391 /* process for scheduled build */ |
392 /* process for scheduled build */ |
392 |
393 |
393 class Scheduled_Build_Process( |
394 abstract class Scheduled_Build_Process( |
394 scheduler: Scheduler, |
|
395 build_context: Build.Context, |
395 build_context: Build.Context, |
396 build_progress: Progress, |
396 build_progress: Progress, |
397 server: SSH.Server, |
397 server: SSH.Server, |
398 ) extends Build_Process(build_context, build_progress, server) { |
398 ) extends Build_Process(build_context, build_progress, server) { |
399 protected val start_date = Date.now() |
399 protected val start_date = Date.now() |
400 |
400 |
|
401 def init_scheduler(timing_data: Timing_Data): Scheduler |
401 |
402 |
402 /* global resources with common close() operation */ |
403 /* global resources with common close() operation */ |
403 |
404 |
404 private final lazy val _log_store: Build_Log.Store = Build_Log.store(build_options) |
405 private final lazy val _log_store: Build_Log.Store = Build_Log.store(build_options) |
405 private final lazy val _log_database: SQL.Database = |
406 private final lazy val _log_database: SQL.Database = |
450 build_info = _log_store.read_build_info(_log_database, log_name) |
451 build_info = _log_store.read_build_info(_log_database, log_name) |
451 } yield (meta_info, build_info) |
452 } yield (meta_info, build_info) |
452 |
453 |
453 Timing_Data.make(host_infos, build_history) |
454 Timing_Data.make(host_infos, build_history) |
454 } |
455 } |
|
456 private val scheduler = init_scheduler(timing_data) |
455 |
457 |
456 def write_build_log(results: Build.Results, state: Build_Process.State.Results): Unit = { |
458 def write_build_log(results: Build.Results, state: Build_Process.State.Results): Unit = { |
457 val sessions = |
459 val sessions = |
458 for { |
460 for { |
459 (session_name, result) <- state.toList |
461 (session_name, result) <- state.toList |
513 private var cache = Cache(Build_Process.State(), Nil, Date.now()) |
515 private var cache = Cache(Build_Process.State(), Nil, Date.now()) |
514 |
516 |
515 override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = { |
517 override def next_node_info(state: Build_Process.State, session_name: String): Node_Info = { |
516 val configs = |
518 val configs = |
517 if (cache.is_current(state)) cache.configs |
519 if (cache.is_current(state)) cache.configs |
518 else scheduler.next(timing_data, state) |
520 else scheduler.next(state) |
519 configs.find(_.job_name == session_name).get.node_info |
521 configs.find(_.job_name == session_name).get.node_info |
520 } |
522 } |
521 |
523 |
522 override def next_jobs(state: Build_Process.State): List[String] = |
524 override def next_jobs(state: Build_Process.State): List[String] = |
523 if (cache.is_current(state)) cache.configs.map(_.job_name) |
525 if (cache.is_current(state)) cache.configs.map(_.job_name) |
524 else { |
526 else { |
525 val start = Time.now() |
527 val start = Time.now() |
526 val next = scheduler.next(timing_data, state) |
528 val next = scheduler.next(state) |
527 val estimate = Date(Time.now() + scheduler.build_duration(timing_data, state)) |
529 val estimate = Date(Time.now() + scheduler.build_duration(state)) |
528 val elapsed = Time.now() - start |
530 val elapsed = Time.now() - start |
529 |
531 |
530 val timing_msg = if (elapsed.is_relevant) " (in " + elapsed.message + ")" else "" |
532 val timing_msg = if (elapsed.is_relevant) " (in " + elapsed.message + ")" else "" |
531 progress.echo_if(build_context.master && !cache.is_current_estimate(estimate), |
533 progress.echo_if(build_context.master && !cache.is_current_estimate(estimate), |
532 "Estimated completion: " + estimate + timing_msg) |
534 "Estimated completion: " + estimate + timing_msg) |
546 class Engine extends Build.Engine(engine_name) { |
548 class Engine extends Build.Engine(engine_name) { |
547 override def open_build_process( |
549 override def open_build_process( |
548 context: Build.Context, |
550 context: Build.Context, |
549 progress: Progress, |
551 progress: Progress, |
550 server: SSH.Server |
552 server: SSH.Server |
551 ): Build_Process = { |
553 ): Build_Process = |
552 val heuristics = List(5, 10, 20).map(minutes => Timing_Heuristic(Time.minutes(minutes))) |
554 new Scheduled_Build_Process(context, progress, server) { |
553 val scheduler = new Meta_Heuristic(heuristics) |
555 def init_scheduler(timing_data: Timing_Data): Scheduler = { |
554 new Scheduled_Build_Process(scheduler, context, progress, server) |
556 val heuristics = |
555 } |
557 List(5, 10, 20).map(minutes => Timing_Heuristic(Time.minutes(minutes), timing_data)) |
|
558 new Meta_Heuristic(heuristics, timing_data) |
|
559 } |
|
560 } |
556 } |
561 } |
557 } |
562 } |