--- a/src/Pure/General/ssh.scala Mon Oct 10 18:10:03 2016 +0200
+++ b/src/Pure/General/ssh.scala Mon Oct 10 21:44:54 2016 +0200
@@ -7,9 +7,9 @@
package isabelle
-import java.io.{InputStream, OutputStream}
+import java.io.{InputStream, OutputStream, ByteArrayOutputStream}
-import scala.collection.JavaConversions
+import scala.collection.{mutable, JavaConversions}
import com.jcraft.jsch.{JSch, Logger => JSch_Logger, Session => JSch_Session,
OpenSSHConfig, UserInfo, Channel => JSch_Channel, ChannelExec, ChannelSftp, SftpATTRS}
@@ -83,8 +83,8 @@
/* channel */
- class Channel[C <: JSch_Channel] private[SSH](val session: Session,
- val kind: String, val channel_options: Options, val channel: C)
+ class Channel[C <: JSch_Channel] private[SSH](
+ val session: Session, val kind: String, val options: Options, val channel: C)
{
override def toString: String = kind + " " + session.toString
@@ -94,11 +94,80 @@
/* exec channel */
+ private val exec_wait_delay = Time.seconds(0.3)
+
class Exec private[SSH](
- session: Session, kind: String, channel_options: Options, channel: ChannelExec)
- extends Channel[ChannelExec](session, kind, channel_options, channel)
+ session: Session, kind: String, options: Options, channel: ChannelExec)
+ extends Channel[ChannelExec](session, kind, options, channel)
{
def kill(signal: String) { channel.sendSignal(signal) }
+
+ val exit_status: Future[Int] =
+ Future.thread("ssh_wait") {
+ while (!channel.isClosed) Thread.sleep(exec_wait_delay.ms)
+ channel.getExitStatus
+ }
+
+ val stdin: OutputStream = channel.getOutputStream
+ val stdout: InputStream = channel.getInputStream
+ val stderr: InputStream = channel.getErrStream
+
+ // after preparing streams
+ channel.connect(connect_timeout(options))
+
+ def result(
+ progress_stdout: String => Unit = (_: String) => (),
+ progress_stderr: String => Unit = (_: String) => (),
+ strict: Boolean = true): Process_Result =
+ {
+ stdin.close
+
+ def read_lines(stream: InputStream, progress: String => Unit): List[String] =
+ {
+ val result = new mutable.ListBuffer[String]
+ val line_buffer = new ByteArrayOutputStream(100)
+ def line_flush()
+ {
+ val line = line_buffer.toString(UTF8.charset_name)
+ progress(line)
+ result += line
+ line_buffer.reset
+ }
+
+ var c = 0
+ var finished = false
+ while (!finished) {
+ while ({ c = stream.read; c != -1 && c != 10 }) line_buffer.write(c)
+ if (c == 10) line_flush()
+ else if (channel.isClosed) {
+ if (line_buffer.size > 0) line_flush()
+ finished = true
+ }
+ else Thread.sleep(exec_wait_delay.ms)
+ }
+
+ result.toList
+ }
+
+ val out_lines = Future.thread("ssh_stdout") { read_lines(stdout, progress_stdout) }
+ val err_lines = Future.thread("ssh_stderr") { read_lines(stderr, progress_stderr) }
+
+ def terminate()
+ {
+ channel.disconnect
+ out_lines.join
+ err_lines.join
+ exit_status.join
+ }
+
+ val rc =
+ try { exit_status.join }
+ catch { case Exn.Interrupt() => terminate(); Exn.Interrupt.return_code }
+
+ if (strict && rc == Exn.Interrupt.return_code) throw Exn.Interrupt()
+
+ Process_Result(rc, out_lines.join, err_lines.join)
+ }
}
@@ -113,9 +182,11 @@
}
class Sftp private[SSH](
- session: Session, kind: String, channel_options: Options, channel: ChannelSftp)
- extends Channel[ChannelSftp](session, kind, channel_options, channel)
+ session: Session, kind: String, options: Options, channel: ChannelSftp)
+ extends Channel[ChannelSftp](session, kind, options, channel)
{
+ channel.connect(connect_timeout(options))
+
def home: String = channel.getHome()
def chmod(permissions: Int, remote_path: String) { channel.chmod(permissions, remote_path) }
@@ -189,8 +260,6 @@
val kind = "exec"
val channel = session.openChannel(kind).asInstanceOf[ChannelExec]
channel.setCommand(command)
-
- channel.connect(connect_timeout(options))
new Exec(this, kind, options, channel)
}
@@ -198,8 +267,6 @@
{
val kind = "sftp"
val channel = session.openChannel(kind).asInstanceOf[ChannelSftp]
-
- channel.connect(connect_timeout(options))
new Sftp(this, kind, options, channel)
}
}