188 def ok: Boolean = process_result.ok |
188 def ok: Boolean = process_result.ok |
189 } |
189 } |
190 |
190 |
191 object State { |
191 object State { |
192 type Sessions = Map[String, Build_Job.Session_Context] |
192 type Sessions = Map[String, Build_Job.Session_Context] |
193 type Workers = List[Worker] |
|
194 type Pending = List[Task] |
193 type Pending = List[Task] |
195 type Running = Map[String, Job] |
194 type Running = Map[String, Job] |
196 type Results = Map[String, Result] |
195 type Results = Map[String, Result] |
197 |
196 |
198 def inc_serial(serial: Long): Long = { |
197 def inc_serial(serial: Long): Long = { |
204 sealed case class State( |
203 sealed case class State( |
205 serial: Long = 0, |
204 serial: Long = 0, |
206 progress_seen: Long = 0, |
205 progress_seen: Long = 0, |
207 numa_next: Int = 0, |
206 numa_next: Int = 0, |
208 sessions: State.Sessions = Map.empty, // static build targets |
207 sessions: State.Sessions = Map.empty, // static build targets |
209 workers: State.Workers = Nil, // available worker processes |
|
210 pending: State.Pending = Nil, // dynamic build "queue" |
208 pending: State.Pending = Nil, // dynamic build "queue" |
211 running: State.Running = Map.empty, // presently running jobs |
209 running: State.Running = Map.empty, // presently running jobs |
212 results: State.Results = Map.empty // finished results |
210 results: State.Results = Map.empty // finished results |
213 ) { |
211 ) { |
214 require(serial >= 0, "serial underflow") |
212 require(serial >= 0, "serial underflow") |
219 } |
217 } |
220 |
218 |
221 def progress_serial(message_serial: Long = serial): State = |
219 def progress_serial(message_serial: Long = serial): State = |
222 if (message_serial > progress_seen) copy(progress_seen = message_serial) |
220 if (message_serial > progress_seen) copy(progress_seen = message_serial) |
223 else error("Bad serial " + message_serial + " for progress output (already seen)") |
221 else error("Bad serial " + message_serial + " for progress output (already seen)") |
224 |
|
225 def set_workers(new_workers: State.Workers): State = copy(workers = new_workers) |
|
226 |
222 |
227 def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) = |
223 def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) = |
228 if (numa_nodes.isEmpty) (None, this) |
224 if (numa_nodes.isEmpty) (None, this) |
229 else { |
225 else { |
230 val available = numa_nodes.zipWithIndex |
226 val available = numa_nodes.zipWithIndex |
554 |
550 |
555 def read_workers( |
551 def read_workers( |
556 db: SQL.Database, |
552 db: SQL.Database, |
557 build_uuid: String = "", |
553 build_uuid: String = "", |
558 worker_uuid: String = "" |
554 worker_uuid: String = "" |
559 ): State.Workers = { |
555 ): List[Worker] = { |
560 db.execute_query_statement( |
556 db.execute_query_statement( |
561 Workers.table.select(sql = |
557 Workers.table.select(sql = |
562 SQL.where(Generic.sql(build_uuid = build_uuid, worker_uuid = worker_uuid))), |
558 SQL.where(Generic.sql(build_uuid = build_uuid, worker_uuid = worker_uuid))), |
563 List.from[Worker], |
559 List.from[Worker], |
564 { res => |
560 { res => |
842 val serial = serial_db max state.serial |
838 val serial = serial_db max state.serial |
843 stamp_worker(db, worker_uuid, serial) |
839 stamp_worker(db, worker_uuid, serial) |
844 |
840 |
845 val numa_next = Host.Data.read_numa_next(db, hostname) |
841 val numa_next = Host.Data.read_numa_next(db, hostname) |
846 val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions) |
842 val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions) |
847 val workers = read_workers(db) |
|
848 val pending = read_pending(db) |
843 val pending = read_pending(db) |
849 val running = pull0(read_running(db), state.running) |
844 val running = pull0(read_running(db), state.running) |
850 val results = pull1(read_results_domain(db), read_results(db, _), state.results) |
845 val results = pull1(read_results_domain(db), read_results(db, _), state.results) |
851 |
846 |
852 state.copy(serial = serial, numa_next = numa_next, sessions = sessions, |
847 state.copy(serial = serial, numa_next = numa_next, sessions = sessions, |
853 workers = workers, pending = pending, running = running, results = results) |
848 pending = pending, running = running, results = results) |
854 } |
849 } |
855 } |
850 } |
856 |
851 |
857 def update_database( |
852 def update_database( |
858 db: SQL.Database, |
853 db: SQL.Database, |
871 |
866 |
872 val serial0 = state.serial |
867 val serial0 = state.serial |
873 val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0 |
868 val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0 |
874 |
869 |
875 stamp_worker(db, worker_uuid, serial) |
870 stamp_worker(db, worker_uuid, serial) |
876 state.set_serial(serial).set_workers(read_workers(db)) |
871 state.set_serial(serial) |
877 } |
872 } |
878 } |
873 } |
879 } |
874 } |
880 |
875 |
881 |
876 |
953 synchronized_database { |
948 synchronized_database { |
954 _state = _state.inc_serial.progress_serial() |
949 _state = _state.inc_serial.progress_serial() |
955 for (db <- _database) { |
950 for (db <- _database) { |
956 Build_Process.Data.write_progress(db, _state.serial, message, build_uuid) |
951 Build_Process.Data.write_progress(db, _state.serial, message, build_uuid) |
957 Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial) |
952 Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial) |
958 _state = _state.set_workers(Build_Process.Data.read_workers(db)) |
|
959 } |
953 } |
960 build_progress_output |
954 build_progress_output |
961 } |
955 } |
962 } |
956 } |
963 |
957 |
1092 val java_pid = java.pid |
1086 val java_pid = java.pid |
1093 val java_start = Date.instant(java.info.startInstant.get) |
1087 val java_start = Date.instant(java.info.startInstant.get) |
1094 _state = _state.inc_serial |
1088 _state = _state.inc_serial |
1095 Build_Process.Data.start_worker( |
1089 Build_Process.Data.start_worker( |
1096 db, worker_uuid, build_uuid, hostname, java_pid, java_start, _state.serial) |
1090 db, worker_uuid, build_uuid, hostname, java_pid, java_start, _state.serial) |
1097 _state = _state.set_workers(Build_Process.Data.read_workers(db)) |
|
1098 } |
1091 } |
1099 } |
1092 } |
1100 |
1093 |
1101 protected final def stop_worker(): Unit = synchronized_database { |
1094 protected final def stop_worker(): Unit = synchronized_database { |
1102 for (db <- _database) { |
1095 for (db <- _database) { |
1103 Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true) |
1096 Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true) |
1104 _state = _state.set_workers(Build_Process.Data.read_workers(db)) |
|
1105 } |
1097 } |
1106 } |
1098 } |
1107 |
1099 |
1108 |
1100 |
1109 /* run */ |
1101 /* run */ |