438 timeout, old_time, old_command_timings_blob, build_uuid) |
438 timeout, old_time, old_command_timings_blob, build_uuid) |
439 } |
439 } |
440 ) |
440 ) |
441 } |
441 } |
442 |
442 |
443 def update_sessions(db: SQL.Database, sessions: Build_Process.Sessions): Boolean = { |
443 def update_sessions( |
444 val old_sessions = read_sessions_domain(db) |
444 db: SQL.Database, |
445 val insert = sessions.iterator.filterNot(s => old_sessions.contains(s.name)).toList |
445 sessions: Build_Process.Sessions, |
|
446 old_sessions: Build_Process.Sessions |
|
447 ): Boolean = { |
|
448 val insert = sessions.iterator.filterNot(s => old_sessions.defined(s.name)).toList |
446 |
449 |
447 if (insert.nonEmpty) { |
450 if (insert.nonEmpty) { |
448 db.execute_batch_statement(Sessions.table.insert(), batch = |
451 db.execute_batch_statement(Sessions.table.insert(), batch = |
449 for (session <- insert) yield { (stmt: SQL.Statement) => |
452 for (session <- insert) yield { (stmt: SQL.Statement) => |
450 stmt.string(1) = session.name |
453 stmt.string(1) = session.name |
578 val info = res.string(Pending.info) |
581 val info = res.string(Pending.info) |
579 val build_uuid = res.string(Pending.build_uuid) |
582 val build_uuid = res.string(Pending.build_uuid) |
580 Task(name, split_lines(deps), JSON.Object.parse(info), build_uuid) |
583 Task(name, split_lines(deps), JSON.Object.parse(info), build_uuid) |
581 }) |
584 }) |
582 |
585 |
583 def update_pending(db: SQL.Database, pending: State.Pending): Boolean = { |
586 def update_pending( |
584 val old_pending = read_pending(db) |
587 db: SQL.Database, |
|
588 pending: State.Pending, |
|
589 old_pending: State.Pending |
|
590 ): Boolean = { |
585 val (delete, insert) = Library.symmetric_difference(old_pending, pending) |
591 val (delete, insert) = Library.symmetric_difference(old_pending, pending) |
586 |
592 |
587 if (delete.nonEmpty) { |
593 if (delete.nonEmpty) { |
588 db.execute_statement( |
594 db.execute_statement( |
589 Pending.table.delete(sql = Generic.sql_where(names = delete.map(_.name)))) |
595 Pending.table.delete(sql = Generic.sql_where(names = delete.map(_.name)))) |
629 val numa_node = res.get_int(Running.numa_node) |
635 val numa_node = res.get_int(Running.numa_node) |
630 name -> Job(name, worker_uuid, build_uuid, Host.Node_Info(hostname, numa_node), None) |
636 name -> Job(name, worker_uuid, build_uuid, Host.Node_Info(hostname, numa_node), None) |
631 } |
637 } |
632 ) |
638 ) |
633 |
639 |
634 def update_running(db: SQL.Database, running: State.Running): Boolean = { |
640 def update_running( |
635 val running0 = read_running(db).valuesIterator.toList |
641 db: SQL.Database, |
|
642 running: State.Running, |
|
643 old_running: State.Running |
|
644 ): Boolean = { |
|
645 val running0 = old_running.valuesIterator.toList |
636 val running1 = running.valuesIterator.map(_.no_build).toList |
646 val running1 = running.valuesIterator.map(_.no_build).toList |
637 val (delete, insert) = Library.symmetric_difference(running0, running1) |
647 val (delete, insert) = Library.symmetric_difference(running0, running1) |
638 |
648 |
639 if (delete.nonEmpty) { |
649 if (delete.nonEmpty) { |
640 db.execute_statement( |
650 db.execute_statement( |
718 name -> |
728 name -> |
719 Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current) |
729 Result(name, worker_uuid, build_uuid, node_info, process_result, output_shasum, current) |
720 } |
730 } |
721 ) |
731 ) |
722 |
732 |
723 def update_results(db: SQL.Database, results: State.Results): Boolean = { |
733 def update_results( |
|
734 db: SQL.Database, |
|
735 results: State.Results, |
|
736 old_results: State.Results |
|
737 ): Boolean = { |
724 val insert = |
738 val insert = |
725 if (results.isEmpty) Nil |
739 results.valuesIterator.filterNot(res => old_results.isDefinedAt(res.name)).toList |
726 else { |
|
727 val old_results = read_results_domain(db) |
|
728 results.valuesIterator.filterNot(res => old_results.contains(res.name)).toList |
|
729 } |
|
730 |
740 |
731 if (insert.nonEmpty) { |
741 if (insert.nonEmpty) { |
732 db.execute_batch_statement(Results.table.insert(), batch = |
742 db.execute_batch_statement(Results.table.insert(), batch = |
733 for (result <- insert) yield { (stmt: SQL.Statement) => |
743 for (result <- insert) yield { (stmt: SQL.Statement) => |
734 val process_result = result.process_result |
744 val process_result = result.process_result |
783 state.copy(serial = serial, sessions = sessions, pending = pending, |
793 state.copy(serial = serial, sessions = sessions, pending = pending, |
784 running = running, results = results) |
794 running = running, results = results) |
785 } |
795 } |
786 } |
796 } |
787 |
797 |
788 def update_database(db: SQL.Database, worker_uuid: String, state: State): State = { |
798 def update_database( |
|
799 db: SQL.Database, |
|
800 worker_uuid: String, |
|
801 state: State, |
|
802 old_state: State |
|
803 ): State = { |
789 val changed = |
804 val changed = |
790 List( |
805 List( |
791 update_sessions(db, state.sessions), |
806 update_sessions(db, state.sessions, old_state.sessions), |
792 update_pending(db, state.pending), |
807 update_pending(db, state.pending, old_state.pending), |
793 update_running(db, state.running), |
808 update_running(db, state.running, old_state.running), |
794 update_results(db, state.results)) |
809 update_results(db, state.results, old_state.results)) |
795 |
810 |
796 val serial0 = state.serial |
811 val serial0 = state.serial |
797 val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0 |
812 val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0 |
798 |
813 |
799 stamp_worker(db, worker_uuid, serial) |
814 stamp_worker(db, worker_uuid, serial) |
921 synchronized { |
936 synchronized { |
922 _build_database match { |
937 _build_database match { |
923 case None => body |
938 case None => body |
924 case Some(db) => |
939 case Some(db) => |
925 Build_Process.private_data.transaction_lock(db, label = label) { |
940 Build_Process.private_data.transaction_lock(db, label = label) { |
926 _state = Build_Process.private_data.pull_database(db, worker_uuid, _state) |
941 val old_state = Build_Process.private_data.pull_database(db, worker_uuid, _state) |
|
942 _state = old_state |
927 val res = body |
943 val res = body |
928 _state = Build_Process.private_data.update_database(db, worker_uuid, _state) |
944 _state = Build_Process.private_data.update_database(db, worker_uuid, _state, old_state) |
929 res |
945 res |
930 } |
946 } |
931 } |
947 } |
932 } |
948 } |
933 |
949 |