proper support for exec channel (see also bash.scala);
authorwenzelm
Mon, 10 Oct 2016 21:44:54 +0200
changeset 64134 57581e4026fe
parent 64133 e8407039b572
child 64135 865dda40e1cc
proper support for exec channel (see also bash.scala);
src/Pure/General/ssh.scala
--- a/src/Pure/General/ssh.scala	Mon Oct 10 18:10:03 2016 +0200
+++ b/src/Pure/General/ssh.scala	Mon Oct 10 21:44:54 2016 +0200
@@ -7,9 +7,9 @@
 package isabelle
 
 
-import java.io.{InputStream, OutputStream}
+import java.io.{InputStream, OutputStream, ByteArrayOutputStream}
 
-import scala.collection.JavaConversions
+import scala.collection.{mutable, JavaConversions}
 
 import com.jcraft.jsch.{JSch, Logger => JSch_Logger, Session => JSch_Session,
   OpenSSHConfig, UserInfo, Channel => JSch_Channel, ChannelExec, ChannelSftp, SftpATTRS}
@@ -83,8 +83,8 @@
 
   /* channel */
 
-  class Channel[C <: JSch_Channel] private[SSH](val session: Session,
-    val kind: String, val channel_options: Options, val channel: C)
+  class Channel[C <: JSch_Channel] private[SSH](
+    val session: Session, val kind: String, val options: Options, val channel: C)
   {
     override def toString: String = kind + " " + session.toString
 
@@ -94,11 +94,80 @@
 
   /* exec channel */
 
+  private val exec_wait_delay = Time.seconds(0.3)
+
   class Exec private[SSH](
-    session: Session, kind: String, channel_options: Options, channel: ChannelExec)
-    extends Channel[ChannelExec](session, kind, channel_options, channel)
+    session: Session, kind: String, options: Options, channel: ChannelExec)
+    extends Channel[ChannelExec](session, kind, options, channel)
   {
     def kill(signal: String) { channel.sendSignal(signal) }
+
+    val exit_status: Future[Int] =
+      Future.thread("ssh_wait") {
+        while (!channel.isClosed) Thread.sleep(exec_wait_delay.ms)
+        channel.getExitStatus
+      }
+
+    val stdin: OutputStream = channel.getOutputStream
+    val stdout: InputStream = channel.getInputStream
+    val stderr: InputStream = channel.getErrStream
+
+    // after preparing streams
+    channel.connect(connect_timeout(options))
+
+    def result(
+      progress_stdout: String => Unit = (_: String) => (),
+      progress_stderr: String => Unit = (_: String) => (),
+      strict: Boolean = true): Process_Result =
+    {
+      stdin.close
+
+      def read_lines(stream: InputStream, progress: String => Unit): List[String] =
+      {
+        val result = new mutable.ListBuffer[String]
+        val line_buffer = new ByteArrayOutputStream(100)
+        def line_flush()
+        {
+          val line = line_buffer.toString(UTF8.charset_name)
+          progress(line)
+          result += line
+          line_buffer.reset
+        }
+
+        var c = 0
+        var finished = false
+        while (!finished) {
+          while ({ c = stream.read; c != -1 && c != 10 }) line_buffer.write(c)
+          if (c == 10) line_flush()
+          else if (channel.isClosed) {
+            if (line_buffer.size > 0) line_flush()
+            finished = true
+          }
+          else Thread.sleep(exec_wait_delay.ms)
+        }
+
+        result.toList
+      }
+
+      val out_lines = Future.thread("ssh_stdout") { read_lines(stdout, progress_stdout) }
+      val err_lines = Future.thread("ssh_stderr") { read_lines(stderr, progress_stderr) }
+
+      def terminate()
+      {
+        channel.disconnect
+        out_lines.join
+        err_lines.join
+        exit_status.join
+      }
+
+      val rc =
+        try { exit_status.join }
+        catch { case Exn.Interrupt() => terminate(); Exn.Interrupt.return_code }
+
+      if (strict && rc == Exn.Interrupt.return_code) throw Exn.Interrupt()
+
+      Process_Result(rc, out_lines.join, err_lines.join)
+    }
   }
 
 
@@ -113,9 +182,11 @@
   }
 
   class Sftp private[SSH](
-    session: Session, kind: String, channel_options: Options, channel: ChannelSftp)
-    extends Channel[ChannelSftp](session, kind, channel_options, channel)
+    session: Session, kind: String, options: Options, channel: ChannelSftp)
+    extends Channel[ChannelSftp](session, kind, options, channel)
   {
+    channel.connect(connect_timeout(options))
+
     def home: String = channel.getHome()
 
     def chmod(permissions: Int, remote_path: String) { channel.chmod(permissions, remote_path) }
@@ -189,8 +260,6 @@
       val kind = "exec"
       val channel = session.openChannel(kind).asInstanceOf[ChannelExec]
       channel.setCommand(command)
-
-      channel.connect(connect_timeout(options))
       new Exec(this, kind, options, channel)
     }
 
@@ -198,8 +267,6 @@
     {
       val kind = "sftp"
       val channel = session.openChannel(kind).asInstanceOf[ChannelSftp]
-
-      channel.connect(connect_timeout(options))
       new Sftp(this, kind, options, channel)
     }
   }