106 val rest = (params ::: options.map(Host.print_option)).mkString(",") |
106 val rest = (params ::: options.map(Host.print_option)).mkString(",") |
107 |
107 |
108 SSH.print_local(name) + if_proper(rest, ":" + rest) |
108 SSH.print_local(name) + if_proper(rest, ":" + rest) |
109 } |
109 } |
110 |
110 |
111 def open_ssh_session(options: Options): SSH.Session = |
111 def message(msg: String): String = "Host " + quote(name) + if_proper(msg, ": " + msg) |
112 SSH.open_session(options, name, port = port, user = user) |
|
113 } |
112 } |
114 |
113 |
115 |
114 |
116 /* remote sessions */ |
115 /* remote sessions */ |
117 |
116 |
118 class Session(host: Host) extends AutoCloseable { |
117 def capture_open_session( |
119 override def close(): Unit = () |
118 options: Options, |
|
119 host: Host, |
|
120 progress: Progress = new Progress |
|
121 ): Exn.Result[Session] = { |
|
122 progress.echo(host.message("connect ...")) |
|
123 try { |
|
124 val ssh_options = options ++ host.options |
|
125 val ssh = SSH.open_session(ssh_options, host.name, port = host.port, user = host.user) |
|
126 Exn.Res[Session](new Session(host, ssh)) |
|
127 } |
|
128 catch { |
|
129 case exn: Throwable => |
|
130 progress.echo_error_message(host.message("failed to connect\n" + Exn.message(exn))) |
|
131 Exn.Exn[Session](exn) |
|
132 } |
|
133 } |
|
134 |
|
135 final class Session private[Build_Cluster](val host: Host, val ssh: SSH.Session) |
|
136 extends AutoCloseable { |
|
137 override def toString: String = ssh.toString |
|
138 |
|
139 def start(): Result = { |
|
140 val res = Process_Result.ok // FIXME |
|
141 Result(host, res) |
|
142 } |
|
143 |
|
144 override def close(): Unit = ssh.close() |
|
145 } |
|
146 |
|
147 sealed case class Result(host: Host, process_result: Process_Result) { |
|
148 def ok: Boolean = process_result.ok |
120 } |
149 } |
121 } |
150 } |
122 |
151 |
123 // class extensible via Build.Engine.build_process() and Build_Process.init_cluster() |
152 // class extensible via Build.Engine.build_process() and Build_Process.init_cluster() |
124 class Build_Cluster( |
153 class Build_Cluster( |
128 ) extends AutoCloseable { |
157 ) extends AutoCloseable { |
129 require(remote_hosts.nonEmpty && !remote_hosts.exists(_.is_local), "remote hosts required") |
158 require(remote_hosts.nonEmpty && !remote_hosts.exists(_.is_local), "remote hosts required") |
130 |
159 |
131 override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")") |
160 override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")") |
132 |
161 |
133 progress.echo("Remote hosts:\n" + cat_lines(remote_hosts.map(" " + _))) |
162 |
134 |
163 /* SSH sessions */ |
135 def start(): Unit = () |
164 |
136 def stop(): Unit = () |
165 private var _sessions = List.empty[Build_Cluster.Session] |
137 |
166 |
138 override def close(): Unit = () |
167 def open(): Unit = synchronized { |
|
168 require(_sessions.isEmpty) |
|
169 |
|
170 val attempts = |
|
171 Par_List.map( |
|
172 Build_Cluster.capture_open_session(build_context.build_options, _, progress = progress), |
|
173 remote_hosts, thread = true) |
|
174 |
|
175 if (attempts.forall(Exn.the_res.isDefinedAt)) { |
|
176 _sessions = attempts.map(Exn.the_res) |
|
177 } |
|
178 else { |
|
179 for (Exn.Res(session) <- attempts) session.close() |
|
180 error("Failed to connect build cluster") |
|
181 } |
|
182 } |
|
183 |
|
184 override def close(): Unit = synchronized { |
|
185 join |
|
186 _sessions.foreach(_.close()) |
|
187 _sessions = Nil |
|
188 } |
|
189 |
|
190 |
|
191 /* workers */ |
|
192 |
|
193 private var _workers = List.empty[Future[Build_Cluster.Result]] |
|
194 |
|
195 def start(): Unit = synchronized { |
|
196 require(_sessions.nonEmpty && _workers.isEmpty) |
|
197 _workers = _sessions.map(session => |
|
198 Future.thread(session.host.message("session")) { session.start() }) |
|
199 } |
|
200 |
|
201 def join: List[Exn.Result[Build_Cluster.Result]] = synchronized { |
|
202 val res = _workers.map(_.join_result) |
|
203 _workers = Nil |
|
204 res |
|
205 } |
|
206 |
|
207 |
|
208 /* init */ |
|
209 |
|
210 open() |
139 } |
211 } |