--- a/src/Pure/PIDE/byte_message.ML Tue Dec 11 23:59:41 2018 +0100
+++ b/src/Pure/PIDE/byte_message.ML Wed Dec 12 00:01:11 2018 +0100
@@ -6,15 +6,38 @@
signature BYTE_MESSAGE =
sig
- val read_line: BinIO.instream -> string option
+ val write: BinIO.outstream -> string -> unit
+ val newline: BinIO.outstream -> unit
+ val flush: BinIO.outstream -> unit
val read: BinIO.instream -> int -> string
val read_block: BinIO.instream -> int -> string option
+ val read_line: BinIO.instream -> string option
+
+ val write_header: BinIO.outstream -> int list -> unit
+ val read_header: BinIO.instream -> int list option
+ val read_header1: BinIO.instream -> int option
+
+ val write_message: BinIO.outstream -> string list -> unit
val read_message: BinIO.instream -> string list option
+
+ val write_line_message: BinIO.outstream -> string -> unit
+ val read_line_message: BinIO.instream -> string option
end;
structure Byte_Message: BYTE_MESSAGE =
struct
+(* output operations *)
+
+fun write stream s = BinIO.output (stream, Byte.stringToBytes s);
+
+fun newline stream = write stream "\n";
+
+fun flush stream = ignore (try BinIO.flushOut stream);
+
+
+(* input operations *)
+
fun read stream n = Byte.bytesToString (BinIO.inputN (stream, n));
fun read_block stream n =
@@ -34,10 +57,40 @@
in read [] end;
+(* header with chunk lengths *)
+
+fun write_header stream ns =
+ (write stream (space_implode "," (map string_of_int ns));
+ newline stream);
+
+fun err_header line = error ("Malformed message header: " ^ quote line)
+
+fun parse_header line =
+ map Value.parse_nat (space_explode "," line)
+ handle Fail _ => err_header line
+
+fun read_header stream =
+ read_line stream |> Option.map parse_header;
+
+fun read_header1 stream =
+ read_line stream |> Option.map (fn line =>
+ (case parse_header line of
+ [n] => n
+ | _ => err_header line));
+
+
(* messages with multiple chunks (arbitrary content) *)
+fun write_message stream chunks =
+ (write_header stream (map size chunks);
+ List.app (write stream) chunks;
+ flush stream);
+
fun read_chunk stream n =
- let val (len, chunk) = `size (read stream n) in
+ let
+ val chunk = read stream n;
+ val len = size chunk;
+ in
if len = n then chunk
else
error ("Malformed message chunk: unexpected EOF after " ^
@@ -45,10 +98,36 @@
end;
fun read_message stream =
- read_line stream |> Option.map (fn line =>
- let
- val ns = map Value.parse_nat (space_explode "," line)
- handle Fail _ => error ("Malformed message header: " ^ quote line);
- in map (read_chunk stream) ns end);
+ read_header stream |> (Option.map o map) (read_chunk stream);
+
+
+(* hybrid messages: line or length+block (with content restriction) *)
+
+fun is_length s =
+ s <> "" andalso forall_string Symbol.is_ascii_digit s;
+
+fun has_line_terminator s =
+ String.isSuffix "\r" s orelse String.isSuffix "\n" s;
+
+fun write_line_message stream msg =
+ if is_length msg orelse has_line_terminator msg then
+ error ("Bad content for line message:\n" ^ implode (take 100 (Symbol.explode msg)))
+ else
+ let val n = size msg in
+ if n > 100 orelse exists_string (fn s => s = "\n") msg then
+ write_header stream [n + 1]
+ else ();
+ write stream msg;
+ newline stream;
+ flush stream
+ end;
+
+fun read_line_message stream =
+ (case read_line stream of
+ SOME line =>
+ (case try Value.parse_nat line of
+ NONE => SOME line
+ | SOME n => Option.map trim_line (read_block stream n))
+ | NONE => NONE) handle IO.Io _ => NONE;
end;
--- a/src/Pure/PIDE/byte_message.scala Tue Dec 11 23:59:41 2018 +0100
+++ b/src/Pure/PIDE/byte_message.scala Wed Dec 12 00:01:11 2018 +0100
@@ -11,6 +11,19 @@
object Byte_Message
{
+ /* output operations */
+
+ def write(stream: OutputStream, bytes: Bytes) { bytes.write_stream(stream) }
+
+ def newline(stream: OutputStream) { stream.write(10) }
+
+ def flush(stream: OutputStream): Unit =
+ try { stream.flush() }
+ catch { case _: IOException => }
+
+
+ /* input operations */
+
def read(stream: InputStream, length: Int): Bytes =
Bytes.read_stream(stream, limit = length)
@@ -36,7 +49,54 @@
}
- /* hybrid messages: line or length+block (with content restriction) */
+ /* header with chunk lengths */
+
+ private def err_header(line: String): Nothing =
+ error("Malformed message header: " + quote(line))
+
+ private def parse_header(line: String): List[Int] =
+ try { space_explode(',', line).map(Value.Int.parse_nat) }
+ catch { case ERROR(_) => err_header(line) }
+
+ def read_header(stream: InputStream): Option[List[Int]] =
+ read_line(stream).map(_.text).map(parse_header(_))
+
+ def read_header1(stream: InputStream): Option[Int] =
+ read_line(stream).map(_.text).map(line =>
+ parse_header(line) match {
+ case List(n) => n
+ case _ => err_header(line)
+ })
+
+ def write_header(stream: OutputStream, ns: List[Int])
+ {
+ stream.write(UTF8.bytes(ns.mkString(",")))
+ newline(stream)
+ }
+
+
+ /* messages with multiple chunks (arbitrary content) */
+
+ def write_message(stream: OutputStream, chunks: List[Bytes])
+ {
+ write_header(stream, chunks.map(_.length))
+ chunks.foreach(write(stream, _))
+ flush(stream)
+ }
+
+ private def read_chunk(stream: InputStream, n: Int): Bytes =
+ {
+ val chunk = read(stream, n)
+ val len = chunk.length
+ if (len == n) chunk
+ else error("Malformed message chunk: unexpected EOF after " + len + " of " + n + " bytes")
+ }
+
+ def read_message(stream: InputStream): Option[List[Bytes]] =
+ read_header(stream).map(ns => ns.map(n => read_chunk(stream, n)))
+
+
+ /* hybrid messages: line or length+block (restricted content) */
private def is_length(msg: Bytes): Boolean =
!msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar))
@@ -53,25 +113,18 @@
error ("Bad content for line message:\n" ++ msg.text.take(100))
if (msg.length > 100 || msg.iterator.contains(10)) {
- stream.write(UTF8.bytes((msg.length + 1).toString))
- stream.write(10)
+ write_header(stream, List(msg.length + 1))
}
- msg.write_stream(stream)
- stream.write(10)
-
- try { stream.flush() } catch { case _: IOException => }
+ write(stream, msg)
+ newline(stream)
+ flush(stream)
}
def read_line_message(stream: InputStream): Option[Bytes] =
- {
- try {
- read_line(stream) match {
- case Some(line) =>
- if (is_length(line)) read_block(stream, Value.Int.parse(line.text)).map(_.trim_line)
- else Some(line)
- case None => None
- }
+ read_line(stream) match {
+ case Some(line) =>
+ if (is_length(line)) read_block(stream, Value.Int.parse(line.text)).map(_.trim_line)
+ else Some(line)
+ case None => None
}
- catch { case _: IOException => None }
- }
}
--- a/src/Pure/System/message_channel.ML Tue Dec 11 23:59:41 2018 +0100
+++ b/src/Pure/System/message_channel.ML Wed Dec 12 00:01:11 2018 +0100
@@ -41,16 +41,15 @@
fun send (Message_Channel {send, ...}) = send;
fun shutdown (Message_Channel {shutdown, ...}) = shutdown ();
-fun flush stream = ignore (try BinIO.flushOut stream);
val flush_timeout = SOME (seconds 0.02);
fun message_output mbox stream =
let
fun continue timeout =
(case Mailbox.receive timeout mbox of
- [] => (flush stream; continue NONE)
+ [] => (Byte_Message.flush stream; continue NONE)
| msgs => received timeout msgs)
- and received _ (NONE :: _) = flush stream
+ and received _ (NONE :: _) = Byte_Message.flush stream
| received _ (SOME msg :: rest) = (output_message stream msg; received flush_timeout rest)
| received timeout [] = continue timeout;
in fn () => continue NONE end;
--- a/src/Pure/Tools/server.scala Tue Dec 11 23:59:41 2018 +0100
+++ b/src/Pure/Tools/server.scala Wed Dec 12 00:01:11 2018 +0100
@@ -181,7 +181,8 @@
interrupt = interrupt)
def read_message(): Option[String] =
- Byte_Message.read_line_message(in).map(_.text)
+ try { Byte_Message.read_line_message(in).map(_.text) }
+ catch { case _: IOException => None }
def write_message(msg: String): Unit =
out_lock.synchronized { Byte_Message.write_line_message(out, Bytes(UTF8.bytes(msg))) }