# HG changeset patch # User wenzelm # Date 1476128694 -7200 # Node ID 57581e4026feefa5e7203705676d8bcbc576cff5 # Parent e8407039b5725b98bc77686784c421c4fb122b9b proper support for exec channel (see also bash.scala); diff -r e8407039b572 -r 57581e4026fe src/Pure/General/ssh.scala --- a/src/Pure/General/ssh.scala Mon Oct 10 18:10:03 2016 +0200 +++ b/src/Pure/General/ssh.scala Mon Oct 10 21:44:54 2016 +0200 @@ -7,9 +7,9 @@ package isabelle -import java.io.{InputStream, OutputStream} +import java.io.{InputStream, OutputStream, ByteArrayOutputStream} -import scala.collection.JavaConversions +import scala.collection.{mutable, JavaConversions} import com.jcraft.jsch.{JSch, Logger => JSch_Logger, Session => JSch_Session, OpenSSHConfig, UserInfo, Channel => JSch_Channel, ChannelExec, ChannelSftp, SftpATTRS} @@ -83,8 +83,8 @@ /* channel */ - class Channel[C <: JSch_Channel] private[SSH](val session: Session, - val kind: String, val channel_options: Options, val channel: C) + class Channel[C <: JSch_Channel] private[SSH]( + val session: Session, val kind: String, val options: Options, val channel: C) { override def toString: String = kind + " " + session.toString @@ -94,11 +94,80 @@ /* exec channel */ + private val exec_wait_delay = Time.seconds(0.3) + class Exec private[SSH]( - session: Session, kind: String, channel_options: Options, channel: ChannelExec) - extends Channel[ChannelExec](session, kind, channel_options, channel) + session: Session, kind: String, options: Options, channel: ChannelExec) + extends Channel[ChannelExec](session, kind, options, channel) { def kill(signal: String) { channel.sendSignal(signal) } + + val exit_status: Future[Int] = + Future.thread("ssh_wait") { + while (!channel.isClosed) Thread.sleep(exec_wait_delay.ms) + channel.getExitStatus + } + + val stdin: OutputStream = channel.getOutputStream + val stdout: InputStream = channel.getInputStream + val stderr: InputStream = channel.getErrStream + + // after preparing streams + channel.connect(connect_timeout(options)) + + def result( + progress_stdout: String => Unit = (_: String) => (), + progress_stderr: String => Unit = (_: String) => (), + strict: Boolean = true): Process_Result = + { + stdin.close + + def read_lines(stream: InputStream, progress: String => Unit): List[String] = + { + val result = new mutable.ListBuffer[String] + val line_buffer = new ByteArrayOutputStream(100) + def line_flush() + { + val line = line_buffer.toString(UTF8.charset_name) + progress(line) + result += line + line_buffer.reset + } + + var c = 0 + var finished = false + while (!finished) { + while ({ c = stream.read; c != -1 && c != 10 }) line_buffer.write(c) + if (c == 10) line_flush() + else if (channel.isClosed) { + if (line_buffer.size > 0) line_flush() + finished = true + } + else Thread.sleep(exec_wait_delay.ms) + } + + result.toList + } + + val out_lines = Future.thread("ssh_stdout") { read_lines(stdout, progress_stdout) } + val err_lines = Future.thread("ssh_stderr") { read_lines(stderr, progress_stderr) } + + def terminate() + { + channel.disconnect + out_lines.join + err_lines.join + exit_status.join + } + + val rc = + try { exit_status.join } + catch { case Exn.Interrupt() => terminate(); Exn.Interrupt.return_code } + + if (strict && rc == Exn.Interrupt.return_code) throw Exn.Interrupt() + + Process_Result(rc, out_lines.join, err_lines.join) + } } @@ -113,9 +182,11 @@ } class Sftp private[SSH]( - session: Session, kind: String, channel_options: Options, channel: ChannelSftp) - extends Channel[ChannelSftp](session, kind, channel_options, channel) + session: Session, kind: String, options: Options, channel: ChannelSftp) + extends Channel[ChannelSftp](session, kind, options, channel) { + channel.connect(connect_timeout(options)) + def home: String = channel.getHome() def chmod(permissions: Int, remote_path: String) { channel.chmod(permissions, remote_path) } @@ -189,8 +260,6 @@ val kind = "exec" val channel = session.openChannel(kind).asInstanceOf[ChannelExec] channel.setCommand(command) - - channel.connect(connect_timeout(options)) new Exec(this, kind, options, channel) } @@ -198,8 +267,6 @@ { val kind = "sftp" val channel = session.openChannel(kind).asInstanceOf[ChannelSftp] - - channel.connect(connect_timeout(options)) new Sftp(this, kind, options, channel) } }