src/Pure/Tools/build_cluster.scala
changeset 78430 0461fc9d43e8
parent 78427 5b7d1cb073db
child 78433 872f10c80810
equal deleted inserted replaced
78429:103a81e60126 78430:0461fc9d43e8
   106       val rest = (params ::: options.map(Host.print_option)).mkString(",")
   106       val rest = (params ::: options.map(Host.print_option)).mkString(",")
   107 
   107 
   108       SSH.print_local(name) + if_proper(rest, ":" + rest)
   108       SSH.print_local(name) + if_proper(rest, ":" + rest)
   109     }
   109     }
   110 
   110 
   111     def open_ssh_session(options: Options): SSH.Session =
   111     def message(msg: String): String = "Host " + quote(name) + if_proper(msg, ": " + msg)
   112       SSH.open_session(options, name, port = port, user = user)
       
   113   }
   112   }
   114 
   113 
   115 
   114 
   116   /* remote sessions */
   115   /* remote sessions */
   117 
   116 
   118   class Session(host: Host) extends AutoCloseable {
   117   def capture_open_session(
   119     override def close(): Unit = ()
   118     options: Options,
       
   119     host: Host,
       
   120     progress: Progress = new Progress
       
   121   ): Exn.Result[Session] = {
       
   122     progress.echo(host.message("connect ..."))
       
   123     try {
       
   124       val ssh_options = options ++ host.options
       
   125       val ssh = SSH.open_session(ssh_options, host.name, port = host.port, user = host.user)
       
   126       Exn.Res[Session](new Session(host, ssh))
       
   127     }
       
   128     catch {
       
   129       case exn: Throwable =>
       
   130         progress.echo_error_message(host.message("failed to connect\n" + Exn.message(exn)))
       
   131         Exn.Exn[Session](exn)
       
   132     }
       
   133   }
       
   134 
       
   135   final class Session private[Build_Cluster](val host: Host, val ssh: SSH.Session)
       
   136   extends AutoCloseable {
       
   137     override def toString: String = ssh.toString
       
   138 
       
   139     def start(): Result = {
       
   140       val res = Process_Result.ok     // FIXME
       
   141       Result(host, res)
       
   142     }
       
   143 
       
   144     override def close(): Unit = ssh.close()
       
   145   }
       
   146 
       
   147   sealed case class Result(host: Host, process_result: Process_Result) {
       
   148     def ok: Boolean = process_result.ok
   120   }
   149   }
   121 }
   150 }
   122 
   151 
   123 // class extensible via Build.Engine.build_process() and Build_Process.init_cluster()
   152 // class extensible via Build.Engine.build_process() and Build_Process.init_cluster()
   124 class Build_Cluster(
   153 class Build_Cluster(
   128 ) extends AutoCloseable {
   157 ) extends AutoCloseable {
   129   require(remote_hosts.nonEmpty && !remote_hosts.exists(_.is_local), "remote hosts required")
   158   require(remote_hosts.nonEmpty && !remote_hosts.exists(_.is_local), "remote hosts required")
   130 
   159 
   131   override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")")
   160   override def toString: String = remote_hosts.mkString("Build_Cluster(", ", ", ")")
   132 
   161 
   133   progress.echo("Remote hosts:\n" + cat_lines(remote_hosts.map("  " + _)))
   162 
   134 
   163   /* SSH sessions */
   135   def start(): Unit = ()
   164 
   136   def stop(): Unit = ()
   165   private var _sessions = List.empty[Build_Cluster.Session]
   137 
   166 
   138   override def close(): Unit = ()
   167   def open(): Unit = synchronized {
       
   168     require(_sessions.isEmpty)
       
   169 
       
   170     val attempts =
       
   171       Par_List.map(
       
   172         Build_Cluster.capture_open_session(build_context.build_options, _, progress = progress),
       
   173         remote_hosts, thread = true)
       
   174 
       
   175     if (attempts.forall(Exn.the_res.isDefinedAt)) {
       
   176       _sessions = attempts.map(Exn.the_res)
       
   177     }
       
   178     else {
       
   179       for (Exn.Res(session) <- attempts) session.close()
       
   180       error("Failed to connect build cluster")
       
   181     }
       
   182   }
       
   183 
       
   184   override def close(): Unit = synchronized {
       
   185     join
       
   186     _sessions.foreach(_.close())
       
   187     _sessions = Nil
       
   188   }
       
   189 
       
   190 
       
   191   /* workers */
       
   192 
       
   193   private var _workers = List.empty[Future[Build_Cluster.Result]]
       
   194 
       
   195   def start(): Unit = synchronized {
       
   196     require(_sessions.nonEmpty && _workers.isEmpty)
       
   197     _workers = _sessions.map(session =>
       
   198       Future.thread(session.host.message("session")) { session.start() })
       
   199   }
       
   200 
       
   201   def join: List[Exn.Result[Build_Cluster.Result]] = synchronized {
       
   202     val res = _workers.map(_.join_result)
       
   203     _workers = Nil
       
   204     res
       
   205   }
       
   206 
       
   207 
       
   208   /* init */
       
   209 
       
   210   open()
   139 }
   211 }