20 |
20 |
21 private val USER = "user" |
21 private val USER = "user" |
22 private val PORT = "port" |
22 private val PORT = "port" |
23 private val JOBS = "jobs" |
23 private val JOBS = "jobs" |
24 private val NUMA = "numa" |
24 private val NUMA = "numa" |
|
25 private val SHARED = "shared" |
25 |
26 |
26 val parameters: Options = |
27 val parameters: Options = |
27 Options.inline(""" |
28 Options.inline(""" |
28 option user : string = "" -- "explicit SSH user" |
29 option user : string = "" -- "explicit SSH user" |
29 option port : int = 0 -- "explicit SSH port" |
30 option port : int = 0 -- "explicit SSH port" |
30 option jobs : int = 1 -- "maximum number of parallel jobs" |
31 option jobs : int = 1 -- "maximum number of parallel jobs" |
31 option numa : bool = false -- "cyclic shuffling of NUMA CPU nodes" |
32 option numa : bool = false -- "cyclic shuffling of NUMA CPU nodes" |
|
33 option shared : bool = false -- "shared directory: omit sync + init" |
32 """) |
34 """) |
33 |
35 |
34 def is_parameter(spec: Options.Spec): Boolean = parameters.defined(spec.name) |
36 def is_parameter(spec: Options.Spec): Boolean = parameters.defined(spec.name) |
35 |
37 |
36 lazy val test_options: Options = Options.init0() |
38 lazy val test_options: Options = Options.init0() |
39 name: String = "", |
41 name: String = "", |
40 user: String = parameters.string(USER), |
42 user: String = parameters.string(USER), |
41 port: Int = parameters.int(PORT), |
43 port: Int = parameters.int(PORT), |
42 jobs: Int = parameters.int(JOBS), |
44 jobs: Int = parameters.int(JOBS), |
43 numa: Boolean = parameters.bool(NUMA), |
45 numa: Boolean = parameters.bool(NUMA), |
|
46 shared: Boolean = parameters.bool(SHARED), |
44 options: List[Options.Spec] = Nil |
47 options: List[Options.Spec] = Nil |
45 ): Host = new Host(name, user, port, jobs, numa, options) |
48 ): Host = new Host(name, user, port, jobs, numa, shared, options) |
46 |
49 |
47 def parse(str: String): Host = { |
50 def parse(str: String): Host = { |
48 val name = str.takeWhile(c => !rfc822_specials.contains(c)) |
51 val name = str.takeWhile(c => !rfc822_specials.contains(c)) |
49 val (params, options) = |
52 val (params, options) = |
50 try { |
53 try { |
67 apply(name = name, |
70 apply(name = name, |
68 user = params.string(USER), |
71 user = params.string(USER), |
69 port = params.int(PORT), |
72 port = params.int(PORT), |
70 jobs = params.int(JOBS), |
73 jobs = params.int(JOBS), |
71 numa = params.bool(NUMA), |
74 numa = params.bool(NUMA), |
|
75 shared = params.bool(SHARED), |
72 options = options) |
76 options = options) |
73 } |
77 } |
74 } |
78 } |
75 |
79 |
76 class Host( |
80 class Host( |
77 val name: String, |
81 val name: String, |
78 val user: String, |
82 val user: String, |
79 val port: Int, |
83 val port: Int, |
80 val jobs: Int, |
84 val jobs: Int, |
81 val numa: Boolean, |
85 val numa: Boolean, |
|
86 val shared: Boolean, |
82 val options: List[Options.Spec] |
87 val options: List[Options.Spec] |
83 ) { |
88 ) { |
84 host => |
89 host => |
85 |
90 |
86 def is_local: Boolean = SSH.is_local(host.name) |
91 def is_local: Boolean = SSH.is_local(host.name) |
91 val params = |
96 val params = |
92 List( |
97 List( |
93 if (host.user.isEmpty) "" else Options.Spec.print(Host.USER, host.user), |
98 if (host.user.isEmpty) "" else Options.Spec.print(Host.USER, host.user), |
94 if (host.port == 0) "" else Options.Spec.print(Host.PORT, host.port.toString), |
99 if (host.port == 0) "" else Options.Spec.print(Host.PORT, host.port.toString), |
95 if (host.jobs == 1) "" else Options.Spec.print(Host.JOBS, host.jobs.toString), |
100 if (host.jobs == 1) "" else Options.Spec.print(Host.JOBS, host.jobs.toString), |
96 if_proper(host.numa, Host.NUMA) |
101 if_proper(host.numa, Host.NUMA), |
|
102 if_proper(host.shared, Host.SHARED) |
97 ).filter(_.nonEmpty) |
103 ).filter(_.nonEmpty) |
98 val rest = (params ::: host.options.map(_.print)).mkString(",") |
104 val rest = (params ::: host.options.map(_.print)).mkString(",") |
99 |
105 |
100 SSH.print_local(host.name) + if_proper(rest, ":" + rest) |
106 SSH.print_local(host.name) + if_proper(rest, ":" + rest) |
101 } |
107 } |
252 override def init(): Unit = synchronized { |
258 override def init(): Unit = synchronized { |
253 require(_sessions.nonEmpty, "build cluster not yet open") |
259 require(_sessions.nonEmpty, "build cluster not yet open") |
254 |
260 |
255 if (_init.isEmpty) { |
261 if (_init.isEmpty) { |
256 _init = |
262 _init = |
257 for (session <- _sessions) yield { |
263 for (session <- _sessions if !session.host.shared) yield { |
258 Future.thread(session.host.message("session")) { |
264 Future.thread(session.host.message("session")) { |
259 capture(session.host, "sync") { session.sync() } |
265 capture(session.host, "sync") { session.sync() } |
260 capture(session.host, "init") { session.init() } |
266 capture(session.host, "init") { session.init() } |
261 } |
267 } |
262 } |
268 } |