src/Pure/Tools/build_process.scala
changeset 77657 a2a4adc268b8
parent 77656 fd553b54fce1
child 77658 4240e9528586
equal deleted inserted replaced
77656:fd553b54fce1 77657:a2a4adc268b8
   188     def ok: Boolean = process_result.ok
   188     def ok: Boolean = process_result.ok
   189   }
   189   }
   190 
   190 
   191   object State {
   191   object State {
   192     type Sessions = Map[String, Build_Job.Session_Context]
   192     type Sessions = Map[String, Build_Job.Session_Context]
   193     type Workers = List[Worker]
       
   194     type Pending = List[Task]
   193     type Pending = List[Task]
   195     type Running = Map[String, Job]
   194     type Running = Map[String, Job]
   196     type Results = Map[String, Result]
   195     type Results = Map[String, Result]
   197 
   196 
   198     def inc_serial(serial: Long): Long = {
   197     def inc_serial(serial: Long): Long = {
   204   sealed case class State(
   203   sealed case class State(
   205     serial: Long = 0,
   204     serial: Long = 0,
   206     progress_seen: Long = 0,
   205     progress_seen: Long = 0,
   207     numa_next: Int = 0,
   206     numa_next: Int = 0,
   208     sessions: State.Sessions = Map.empty,   // static build targets
   207     sessions: State.Sessions = Map.empty,   // static build targets
   209     workers: State.Workers = Nil,           // available worker processes
       
   210     pending: State.Pending = Nil,           // dynamic build "queue"
   208     pending: State.Pending = Nil,           // dynamic build "queue"
   211     running: State.Running = Map.empty,     // presently running jobs
   209     running: State.Running = Map.empty,     // presently running jobs
   212     results: State.Results = Map.empty      // finished results
   210     results: State.Results = Map.empty      // finished results
   213   ) {
   211   ) {
   214     require(serial >= 0, "serial underflow")
   212     require(serial >= 0, "serial underflow")
   219     }
   217     }
   220 
   218 
   221     def progress_serial(message_serial: Long = serial): State =
   219     def progress_serial(message_serial: Long = serial): State =
   222       if (message_serial > progress_seen) copy(progress_seen = message_serial)
   220       if (message_serial > progress_seen) copy(progress_seen = message_serial)
   223       else error("Bad serial " + message_serial + " for progress output (already seen)")
   221       else error("Bad serial " + message_serial + " for progress output (already seen)")
   224 
       
   225     def set_workers(new_workers: State.Workers): State = copy(workers = new_workers)
       
   226 
   222 
   227     def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) =
   223     def next_numa_node(numa_nodes: List[Int]): (Option[Int], State) =
   228       if (numa_nodes.isEmpty) (None, this)
   224       if (numa_nodes.isEmpty) (None, this)
   229       else {
   225       else {
   230         val available = numa_nodes.zipWithIndex
   226         val available = numa_nodes.zipWithIndex
   554 
   550 
   555     def read_workers(
   551     def read_workers(
   556       db: SQL.Database,
   552       db: SQL.Database,
   557       build_uuid: String = "",
   553       build_uuid: String = "",
   558       worker_uuid: String = ""
   554       worker_uuid: String = ""
   559     ): State.Workers = {
   555     ): List[Worker] = {
   560       db.execute_query_statement(
   556       db.execute_query_statement(
   561         Workers.table.select(sql =
   557         Workers.table.select(sql =
   562           SQL.where(Generic.sql(build_uuid = build_uuid, worker_uuid = worker_uuid))),
   558           SQL.where(Generic.sql(build_uuid = build_uuid, worker_uuid = worker_uuid))),
   563           List.from[Worker],
   559           List.from[Worker],
   564           { res =>
   560           { res =>
   842         val serial = serial_db max state.serial
   838         val serial = serial_db max state.serial
   843         stamp_worker(db, worker_uuid, serial)
   839         stamp_worker(db, worker_uuid, serial)
   844 
   840 
   845         val numa_next = Host.Data.read_numa_next(db, hostname)
   841         val numa_next = Host.Data.read_numa_next(db, hostname)
   846         val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions)
   842         val sessions = pull1(read_sessions_domain(db), read_sessions(db, _), state.sessions)
   847         val workers = read_workers(db)
       
   848         val pending = read_pending(db)
   843         val pending = read_pending(db)
   849         val running = pull0(read_running(db), state.running)
   844         val running = pull0(read_running(db), state.running)
   850         val results = pull1(read_results_domain(db), read_results(db, _), state.results)
   845         val results = pull1(read_results_domain(db), read_results(db, _), state.results)
   851 
   846 
   852         state.copy(serial = serial, numa_next = numa_next, sessions = sessions,
   847         state.copy(serial = serial, numa_next = numa_next, sessions = sessions,
   853           workers = workers, pending = pending, running = running, results = results)
   848           pending = pending, running = running, results = results)
   854       }
   849       }
   855     }
   850     }
   856 
   851 
   857     def update_database(
   852     def update_database(
   858       db: SQL.Database,
   853       db: SQL.Database,
   871 
   866 
   872       val serial0 = state.serial
   867       val serial0 = state.serial
   873       val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
   868       val serial = if (changed.exists(identity)) State.inc_serial(serial0) else serial0
   874 
   869 
   875       stamp_worker(db, worker_uuid, serial)
   870       stamp_worker(db, worker_uuid, serial)
   876       state.set_serial(serial).set_workers(read_workers(db))
   871       state.set_serial(serial)
   877     }
   872     }
   878   }
   873   }
   879 }
   874 }
   880 
   875 
   881 
   876 
   953     synchronized_database {
   948     synchronized_database {
   954       _state = _state.inc_serial.progress_serial()
   949       _state = _state.inc_serial.progress_serial()
   955       for (db <- _database) {
   950       for (db <- _database) {
   956         Build_Process.Data.write_progress(db, _state.serial, message, build_uuid)
   951         Build_Process.Data.write_progress(db, _state.serial, message, build_uuid)
   957         Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial)
   952         Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial)
   958         _state = _state.set_workers(Build_Process.Data.read_workers(db))
       
   959       }
   953       }
   960       build_progress_output
   954       build_progress_output
   961     }
   955     }
   962   }
   956   }
   963 
   957 
  1092       val java_pid = java.pid
  1086       val java_pid = java.pid
  1093       val java_start = Date.instant(java.info.startInstant.get)
  1087       val java_start = Date.instant(java.info.startInstant.get)
  1094       _state = _state.inc_serial
  1088       _state = _state.inc_serial
  1095       Build_Process.Data.start_worker(
  1089       Build_Process.Data.start_worker(
  1096         db, worker_uuid, build_uuid, hostname, java_pid, java_start, _state.serial)
  1090         db, worker_uuid, build_uuid, hostname, java_pid, java_start, _state.serial)
  1097       _state = _state.set_workers(Build_Process.Data.read_workers(db))
       
  1098     }
  1091     }
  1099   }
  1092   }
  1100 
  1093 
  1101   protected final def stop_worker(): Unit = synchronized_database {
  1094   protected final def stop_worker(): Unit = synchronized_database {
  1102     for (db <- _database) {
  1095     for (db <- _database) {
  1103       Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true)
  1096       Build_Process.Data.stamp_worker(db, worker_uuid, _state.serial, stop = true)
  1104       _state = _state.set_workers(Build_Process.Data.read_workers(db))
       
  1105     }
  1097     }
  1106   }
  1098   }
  1107 
  1099 
  1108 
  1100 
  1109   /* run */
  1101   /* run */