# HG changeset patch # User wenzelm # Date 1261077240 -3600 # Node ID ea24958c2af517259171fa8e519475b864ba6af6 # Parent 2541de190d921e25c5d29250f455081b285dcf2d fifo: raw byte stream; Result: fully decoded symbols and tree structure; adapted to simplified message format; tuned; diff -r 2541de190d92 -r ea24958c2af5 src/Pure/System/isabelle_process.scala --- a/src/Pure/System/isabelle_process.scala Thu Dec 17 20:09:19 2009 +0100 +++ b/src/Pure/System/isabelle_process.scala Thu Dec 17 20:14:00 2009 +0100 @@ -82,12 +82,15 @@ kind == STATUS } - class Result(val kind: Kind.Value, val props: List[(String, String)], val result: String) { - override def toString = { - val trees = YXML.parse_body_failsafe(result) + class Result(val kind: Kind.Value, val props: List[(String, String)], val body: List[XML.Tree]) + { + def message = XML.Elem(Markup.MESSAGE, (Markup.CLASS, Kind.markup(kind)) :: props, body) + + override def toString: String = + { val res = - if (kind == Kind.STATUS) trees.map(_.toString).mkString - else trees.flatMap(XML.content(_).mkString).mkString + if (kind == Kind.STATUS) body.map(_.toString).mkString + else body.map(XML.content(_).mkString).mkString if (props.isEmpty) kind.toString + " [[" + res + "]]" else @@ -98,16 +101,10 @@ def is_control = Kind.is_control(kind) def is_system = Kind.is_system(kind) } - - def parse_message(isabelle_system: Isabelle_System, result: Result) = - { - XML.Elem(Markup.MESSAGE, (Markup.CLASS, Kind.markup(result.kind)) :: result.props, - YXML.parse_body_failsafe(isabelle_system.symbols.decode(result.result))) - } } -class Isabelle_Process(isabelle_system: Isabelle_System, receiver: Actor, args: String*) +class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*) { import Isabelle_Process._ @@ -130,14 +127,19 @@ /* results */ - private def put_result(kind: Kind.Value, props: List[(String, String)], result: String) + private def put_result(kind: Kind.Value, props: List[(String, String)], body: List[XML.Tree]) { if (kind == Kind.INIT) { val map = Map(props: _*) if (map.isDefinedAt(Markup.PID)) pid = map(Markup.PID) if (map.isDefinedAt(Markup.SESSION)) the_session = map(Markup.SESSION) } - receiver ! new Result(kind, props, result) + receiver ! new Result(kind, props, body) + } + + private def put_result(kind: Kind.Value, text: String) + { + put_result(kind, Nil, List(XML.Text(system.symbols.decode(text)))) } @@ -145,13 +147,13 @@ def interrupt() = synchronized { if (proc == null) error("Cannot interrupt Isabelle: no process") - if (pid == null) put_result(Kind.SYSTEM, Nil, "Cannot interrupt: unknown pid") + if (pid == null) put_result(Kind.SYSTEM, "Cannot interrupt: unknown pid") else { try { - if (isabelle_system.execute(true, "kill", "-INT", pid).waitFor == 0) - put_result(Kind.SIGNAL, Nil, "INT") + if (system.execute(true, "kill", "-INT", pid).waitFor == 0) + put_result(Kind.SIGNAL, "INT") else - put_result(Kind.SYSTEM, Nil, "Cannot interrupt: kill command failed") + put_result(Kind.SYSTEM, "Cannot interrupt: kill command failed") } catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) } } @@ -162,7 +164,7 @@ else { try_close() Thread.sleep(500) - put_result(Kind.SIGNAL, Nil, "KILL") + put_result(Kind.SIGNAL, "KILL") proc.destroy proc = null pid = null @@ -222,17 +224,17 @@ finished = true } else { - put_result(Kind.STDIN, Nil, s) + put_result(Kind.STDIN, s) writer.write(s) writer.flush } //}}} } catch { - case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdin thread: " + e.getMessage) + case e: IOException => put_result(Kind.SYSTEM, "Stdin thread: " + e.getMessage) } } - put_result(Kind.SYSTEM, Nil, "Stdin thread terminated") + put_result(Kind.SYSTEM, "Stdin thread terminated") } } @@ -256,7 +258,7 @@ else done = true } if (result.length > 0) { - put_result(Kind.STDOUT, Nil, result.toString) + put_result(Kind.STDOUT, result.toString) result.length = 0 } else { @@ -267,91 +269,89 @@ //}}} } catch { - case e: IOException => put_result(Kind.SYSTEM, Nil, "Stdout thread: " + e.getMessage) + case e: IOException => put_result(Kind.SYSTEM, "Stdout thread: " + e.getMessage) } } - put_result(Kind.SYSTEM, Nil, "Stdout thread terminated") + put_result(Kind.SYSTEM, "Stdout thread terminated") } } /* messages */ - private class MessageThread(fifo: String) extends Thread("isabelle: messages") { - override def run() = { - val reader = isabelle_system.fifo_reader(fifo) - var kind: Kind.Value = null - var props: List[(String, String)] = Nil - var result = new StringBuilder + private class MessageThread(fifo: String) extends Thread("isabelle: messages") + { + private class Protocol_Error(msg: String) extends Exception(msg) + override def run() + { + val stream = system.fifo_stream(fifo) + val default_buffer = new Array[Byte](65536) + var c = -1 - var finished = false - while (!finished) { + def read_chunk(): List[XML.Tree] = + { + //{{{ + // chunk size + var n = 0 + c = stream.read + while (48 <= c && c <= 57) { + n = 10 * n + (c - 48) + c = stream.read + } + if (c != 10) throw new Protocol_Error("bad message chunk header") + + // chunk content + val buf = + if (n <= default_buffer.size) default_buffer + else new Array[Byte](n) + + var i = 0 + var m = 0 + do { + m = stream.read(buf, i, n - i) + i += m + } while (m > 0 && n > i) + + if (i != n) throw new Protocol_Error("bad message chunk content") + + YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n)) + //}}} + } + + do { try { - if (kind == null) { - //{{{ Char mode -- resync - var c = -1 - do { - c = reader.read - if (c >= 0 && c != 2) result.append(c.asInstanceOf[Char]) - } while (c >= 0 && c != 2) - - if (result.length > 0) { - put_result(Kind.SYSTEM, Nil, "Malformed message:\n" + result.toString) - result.length = 0 - } - if (c < 0) { - reader.close - finished = true - try_close() - } - else { - c = reader.read - if (Kind.code.isDefinedAt(c)) kind = Kind.code(c) - else kind = null - } - //}}} + //{{{ + c = stream.read + var non_sync = 0 + while (c >= 0 && c != 2) { + non_sync += 1 + c = stream.read } - else { - //{{{ Line mode - val line = reader.readLine - if (line == null) { - reader.close - finished = true - try_close() + if (non_sync > 0) + throw new Protocol_Error("lost synchronization -- skipping " + non_sync + " bytes") + if (c == 2) { + val header = read_chunk() + val body = read_chunk() + header match { + case List(XML.Elem(name, props, Nil)) + if name.size == 1 && Kind.code.isDefinedAt(name(0)) => + put_result(Kind.code(name(0)), props, body) + case _ => throw new Protocol_Error("bad header: " + header.toString) } - else { - val len = line.length - // property - if (line.endsWith("\u0002,")) { - val i = line.indexOf('=') - if (i > 0) { - val name = line.substring(0, i) - val value = line.substring(i + 1, len - 2) - props = (name, value) :: props - } - } - // last text line - else if (line.endsWith("\u0002.")) { - result.append(line.substring(0, len - 2)) - put_result(kind, props.reverse, result.toString) - kind = null - props = Nil - result.length = 0 - } - // text line - else { - result.append(line) - result.append('\n') - } - } - //}}} } + //}}} } catch { - case e: IOException => put_result(Kind.SYSTEM, Nil, "Message thread: " + e.getMessage) + case e: IOException => + put_result(Kind.SYSTEM, "Cannot read message:\n" + e.getMessage) + case e: Protocol_Error => + put_result(Kind.SYSTEM, "Malformed message:\n" + e.getMessage) } - } - put_result(Kind.SYSTEM, Nil, "Message thread terminated") + } while (c != -1) + stream.close + try_close() + + put_result(Kind.SYSTEM, "Message thread terminated") } } @@ -363,16 +363,16 @@ /* isabelle version */ { - val (msg, rc) = isabelle_system.isabelle_tool("version") + val (msg, rc) = system.isabelle_tool("version") if (rc != 0) error("Version check failed -- bad Isabelle installation:\n" + msg) - put_result(Kind.SYSTEM, Nil, msg) + put_result(Kind.SYSTEM, msg) } /* messages */ - val message_fifo = isabelle_system.mk_fifo() - def rm_fifo() = isabelle_system.rm_fifo(message_fifo) + val message_fifo = system.mk_fifo() + def rm_fifo() = system.rm_fifo(message_fifo) val message_thread = new MessageThread(message_fifo) message_thread.start @@ -381,9 +381,8 @@ /* exec process */ try { - val cmdline = - List(isabelle_system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args - proc = isabelle_system.execute(true, cmdline: _*) + val cmdline = List(system.getenv_strict("ISABELLE_PROCESS"), "-W", message_fifo) ++ args + proc = system.execute(true, cmdline: _*) } catch { case e: IOException => @@ -404,8 +403,8 @@ override def run() = { val rc = proc.waitFor() Thread.sleep(300) - put_result(Kind.SYSTEM, Nil, "Exit thread terminated") - put_result(Kind.EXIT, Nil, Integer.toString(rc)) + put_result(Kind.SYSTEM, "Exit thread terminated") + put_result(Kind.EXIT, rc.toString) rm_fifo() } }.start diff -r 2541de190d92 -r ea24958c2af5 src/Pure/System/isabelle_system.scala --- a/src/Pure/System/isabelle_system.scala Thu Dec 17 20:09:19 2009 +0100 +++ b/src/Pure/System/isabelle_system.scala Thu Dec 17 20:14:00 2009 +0100 @@ -8,7 +8,7 @@ import java.util.regex.Pattern import java.util.Locale -import java.io.{BufferedReader, InputStreamReader, FileInputStream, File, IOException} +import java.io.{BufferedInputStream, FileInputStream, File, IOException} import java.awt.{GraphicsEnvironment, Font} import scala.io.Source @@ -279,13 +279,13 @@ if (rc != 0) error(result) } - def fifo_reader(fifo: String): BufferedReader = + def fifo_stream(fifo: String): BufferedInputStream = { // blocks until writer is ready val stream = if (Platform.is_windows) execute(false, "cat", fifo).getInputStream else new FileInputStream(fifo) - new BufferedReader(new InputStreamReader(stream, Isabelle_System.charset)) + new BufferedInputStream(stream) }