fifo: raw byte stream;
authorwenzelm
Thu, 17 Dec 2009 20:14:00 +0100
changeset 34100 ea24958c2af5
parent 34099 2541de190d92
child 34107 9996f47a1310
fifo: raw byte stream; Result: fully decoded symbols and tree structure; adapted to simplified message format; tuned;
src/Pure/System/isabelle_process.scala
src/Pure/System/isabelle_system.scala
--- a/src/Pure/System/isabelle_process.scala	Thu Dec 17 20:09:19 2009 +0100
+++ b/src/Pure/System/isabelle_process.scala	Thu Dec 17 20:14:00 2009 +0100
@@ -82,12 +82,15 @@
       kind == STATUS
   }
 
-  class Result(val kind: Kind.Value, val props: List[(String, String)], val result: String) {
-    override def toString = {
-      val trees = YXML.parse_body_failsafe(result)
+  class Result(val kind: Kind.Value, val props: List[(String, String)], val body: List[XML.Tree])
+  {
+    def message = XML.Elem(Markup.MESSAGE, (Markup.CLASS, Kind.markup(kind)) :: props, body)
+
+    override def toString: String =
+    {
       val res =
-        if (kind == Kind.STATUS) trees.map(_.toString).mkString
-        else trees.flatMap(XML.content(_).mkString).mkString
+        if (kind == Kind.STATUS) body.map(_.toString).mkString
+        else body.map(XML.content(_).mkString).mkString
       if (props.isEmpty)
         kind.toString + " [[" + res + "]]"
       else
@@ -98,16 +101,10 @@
     def is_control = Kind.is_control(kind)
     def is_system = Kind.is_system(kind)
   }
-
-  def parse_message(isabelle_system: Isabelle_System, result: Result) =
-  {
-    XML.Elem(Markup.MESSAGE, (Markup.CLASS, Kind.markup(result.kind)) :: result.props,
-      YXML.parse_body_failsafe(isabelle_system.symbols.decode(result.result)))
-  }
 }
 
 
-class Isabelle_Process(isabelle_system: Isabelle_System, receiver: Actor, args: String*)
+class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
 {
   import Isabelle_Process._
 
@@ -130,14 +127,19 @@
 
   /* results */
 
-  private def put_result(kind: Kind.Value, props: List[(String, String)], result: String)
+  private def put_result(kind: Kind.Value, props: List[(String, String)], body: List[XML.Tree])
   {
     if (kind == Kind.INIT) {
       val map = Map(props: _*)
       if (map.isDefinedAt(Markup.PID)) pid = map(Markup.PID)
       if (map.isDefinedAt(Markup.SESSION)) the_session = map(Markup.SESSION)
     }
-    receiver ! new Result(kind, props, result)
+    receiver ! new Result(kind, props, body)
+  }
+
+  private def put_result(kind: Kind.Value, text: String)
+  {
+    put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
   }
 
 
@@ -145,13 +147,13 @@
 
   def interrupt() = synchronized {
     if (proc == null) error("Cannot interrupt Isabelle: no process")
-    if (pid == null) put_result(Kind.SYSTEM, Nil, "Cannot interrupt: unknown pid")
+    if (pid == null) put_result(Kind.SYSTEM, "Cannot interrupt: unknown pid")
     else {
       try {
-        if (isabelle_system.execute(true, "kill", "-INT", pid).waitFor == 0)
-          put_result(Kind.SIGNAL, Nil, "INT")
+        if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
+          put_result(Kind.SIGNAL, "INT")
         else
-          put_result(Kind.SYSTEM, Nil, "Cannot interrupt: kill command failed")
+          put_result(Kind.SYSTEM, "Cannot interrupt: kill command failed")
       }
       catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
     }
@@ -162,7 +164,7 @@
     else {
       try_close()
       Thread.sleep(500)
-      put_result(Kind.SIGNAL, Nil, "KILL")
+      put_result(Kind.SIGNAL, "KILL")
       proc.destroy
       proc = null
       pid = null
@@ -222,17 +224,17 @@
             finished = true
           }
           else {
-            put_result(Kind.STDIN, Nil, s)
+            put_result(Kind.STDIN, s)
             writer.write(s)
             writer.flush
           }
           //}}}
         }
         catch {
-          case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdin thread: " + e.getMessage)
+          case e: IOException => put_result(Kind.SYSTEM, "Stdin thread: " + e.getMessage)
         }
       }
-      put_result(Kind.SYSTEM, Nil, "Stdin thread terminated")
+      put_result(Kind.SYSTEM, "Stdin thread terminated")
     }
   }
 
@@ -256,7 +258,7 @@
             else done = true
           }
           if (result.length > 0) {
-            put_result(Kind.STDOUT, Nil, result.toString)
+            put_result(Kind.STDOUT, result.toString)
             result.length = 0
           }
           else {
@@ -267,91 +269,89 @@
           //}}}
         }
         catch {
-          case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdout thread: " + e.getMessage)
+          case e: IOException => put_result(Kind.SYSTEM, "Stdout thread: " + e.getMessage)
         }
       }
-      put_result(Kind.SYSTEM, Nil, "Stdout thread terminated")
+      put_result(Kind.SYSTEM, "Stdout thread terminated")
     }
   }
 
 
   /* messages */
 
-  private class MessageThread(fifo: String) extends Thread("isabelle: messages") {
-    override def run() = {
-      val reader = isabelle_system.fifo_reader(fifo)
-      var kind: Kind.Value = null
-      var props: List[(String, String)] = Nil
-      var result = new StringBuilder
+  private class MessageThread(fifo: String) extends Thread("isabelle: messages")
+  {
+    private class Protocol_Error(msg: String) extends Exception(msg)
+    override def run()
+    {
+      val stream = system.fifo_stream(fifo)
+      val default_buffer = new Array[Byte](65536)
+      var c = -1
 
-      var finished = false
-      while (!finished) {
+      def read_chunk(): List[XML.Tree] =
+      {
+        //{{{
+        // chunk size
+        var n = 0
+        c = stream.read
+        while (48 <= c && c <= 57) {
+          n = 10 * n + (c - 48)
+          c = stream.read
+        }
+        if (c != 10) throw new Protocol_Error("bad message chunk header")
+
+        // chunk content
+        val buf =
+          if (n <= default_buffer.size) default_buffer
+          else new Array[Byte](n)
+
+        var i = 0
+        var m = 0
+        do {
+          m = stream.read(buf, i, n - i)
+          i += m
+        } while (m > 0 && n > i)
+
+        if (i != n) throw new Protocol_Error("bad message chunk content")
+
+        YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
+        //}}}
+      }
+
+      do {
         try {
-          if (kind == null) {
-            //{{{ Char mode -- resync
-            var c = -1
-            do {
-              c = reader.read
-              if (c >= 0 && c != 2) result.append(c.asInstanceOf[Char])
-            } while (c >= 0 && c != 2)
-
-            if (result.length > 0) {
-              put_result(Kind.SYSTEM, Nil, "Malformed message:\n" + result.toString)
-              result.length = 0
-            }
-            if (c < 0) {
-              reader.close
-              finished = true
-              try_close()
-            }
-            else {
-              c = reader.read
-              if (Kind.code.isDefinedAt(c)) kind = Kind.code(c)
-              else kind = null
-            }
-            //}}}
+          //{{{
+          c = stream.read
+          var non_sync = 0
+          while (c >= 0 && c != 2) {
+            non_sync += 1
+            c = stream.read
           }
-          else {
-            //{{{ Line mode
-            val line = reader.readLine
-            if (line == null) {
-              reader.close
-              finished = true
-              try_close()
+          if (non_sync > 0)
+            throw new Protocol_Error("lost synchronization -- skipping " + non_sync + " bytes")
+          if (c == 2) {
+            val header = read_chunk()
+            val body = read_chunk()
+            header match {
+              case List(XML.Elem(name, props, Nil))
+                  if name.size == 1 && Kind.code.isDefinedAt(name(0)) =>
+                put_result(Kind.code(name(0)), props, body)
+              case _ => throw new Protocol_Error("bad header: " + header.toString)
             }
-            else {
-              val len = line.length
-              // property
-              if (line.endsWith("\u0002,")) {
-                val i = line.indexOf('=')
-                if (i > 0) {
-                  val name = line.substring(0, i)
-                  val value = line.substring(i + 1, len - 2)
-                  props = (name, value) :: props
-                }
-              }
-              // last text line
-              else if (line.endsWith("\u0002.")) {
-                result.append(line.substring(0, len - 2))
-                put_result(kind, props.reverse, result.toString)
-                kind = null
-                props = Nil
-                result.length = 0
-              }
-              // text line
-              else {
-                result.append(line)
-                result.append('\n')
-              }
-            }
-            //}}}
           }
+          //}}}
         }
         catch {
-          case e: IOException => put_result(Kind.SYSTEM, Nil, "Message thread: " + e.getMessage)
+          case e: IOException =>
+            put_result(Kind.SYSTEM, "Cannot read message:\n" + e.getMessage)
+          case e: Protocol_Error =>
+            put_result(Kind.SYSTEM, "Malformed message:\n" + e.getMessage)
         }
-      }
-      put_result(Kind.SYSTEM, Nil, "Message thread terminated")
+      } while (c != -1)
+      stream.close
+      try_close()
+
+      put_result(Kind.SYSTEM, "Message thread terminated")
     }
   }
 
@@ -363,16 +363,16 @@
     /* isabelle version */
 
     {
-      val (msg, rc) = isabelle_system.isabelle_tool("version")
+      val (msg, rc) = system.isabelle_tool("version")
       if (rc != 0) error("Version check failed -- bad Isabelle installation:\n" + msg)
-      put_result(Kind.SYSTEM, Nil, msg)
+      put_result(Kind.SYSTEM, msg)
     }
 
 
     /* messages */
 
-    val message_fifo = isabelle_system.mk_fifo()
-    def rm_fifo() = isabelle_system.rm_fifo(message_fifo)
+    val message_fifo = system.mk_fifo()
+    def rm_fifo() = system.rm_fifo(message_fifo)
 
     val message_thread = new MessageThread(message_fifo)
     message_thread.start
@@ -381,9 +381,8 @@
     /* exec process */
 
     try {
-      val cmdline =
-        List(isabelle_system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args
-      proc = isabelle_system.execute(true, cmdline: _*)
+      val cmdline = List(system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args
+      proc = system.execute(true, cmdline: _*)
     }
     catch {
       case e: IOException =>
@@ -404,8 +403,8 @@
       override def run() = {
         val rc = proc.waitFor()
         Thread.sleep(300)
-        put_result(Kind.SYSTEM, Nil, "Exit thread terminated")
-        put_result(Kind.EXIT, Nil, Integer.toString(rc))
+        put_result(Kind.SYSTEM, "Exit thread terminated")
+        put_result(Kind.EXIT, rc.toString)
         rm_fifo()
       }
     }.start
--- a/src/Pure/System/isabelle_system.scala	Thu Dec 17 20:09:19 2009 +0100
+++ b/src/Pure/System/isabelle_system.scala	Thu Dec 17 20:14:00 2009 +0100
@@ -8,7 +8,7 @@
 
 import java.util.regex.Pattern
 import java.util.Locale
-import java.io.{BufferedReader, InputStreamReader, FileInputStream, File, IOException}
+import java.io.{BufferedInputStream, FileInputStream, File, IOException}
 import java.awt.{GraphicsEnvironment, Font}
 
 import scala.io.Source
@@ -279,13 +279,13 @@
     if (rc != 0) error(result)
   }
 
-  def fifo_reader(fifo: String): BufferedReader =
+  def fifo_stream(fifo: String): BufferedInputStream =
   {
     // blocks until writer is ready
     val stream =
       if (Platform.is_windows) execute(false, "cat", fifo).getInputStream
       else new FileInputStream(fifo)
-    new BufferedReader(new InputStreamReader(stream, Isabelle_System.charset))
+    new BufferedInputStream(stream)
   }