# HG changeset patch # User wenzelm # Date 1544569271 -3600 # Node ID 387894c2fb2c21738e771d94cf9cdf384d3ee9f8 # Parent b28b001e7ee815b205cc6ca28d8784051b540f4d more uniform multi-language operations; clarified modules and signature; diff -r b28b001e7ee8 -r 387894c2fb2c src/Pure/PIDE/byte_message.ML --- a/src/Pure/PIDE/byte_message.ML Tue Dec 11 23:59:41 2018 +0100 +++ b/src/Pure/PIDE/byte_message.ML Wed Dec 12 00:01:11 2018 +0100 @@ -6,15 +6,38 @@ signature BYTE_MESSAGE = sig - val read_line: BinIO.instream -> string option + val write: BinIO.outstream -> string -> unit + val newline: BinIO.outstream -> unit + val flush: BinIO.outstream -> unit val read: BinIO.instream -> int -> string val read_block: BinIO.instream -> int -> string option + val read_line: BinIO.instream -> string option + + val write_header: BinIO.outstream -> int list -> unit + val read_header: BinIO.instream -> int list option + val read_header1: BinIO.instream -> int option + + val write_message: BinIO.outstream -> string list -> unit val read_message: BinIO.instream -> string list option + + val write_line_message: BinIO.outstream -> string -> unit + val read_line_message: BinIO.instream -> string option end; structure Byte_Message: BYTE_MESSAGE = struct +(* output operations *) + +fun write stream s = BinIO.output (stream, Byte.stringToBytes s); + +fun newline stream = write stream "\n"; + +fun flush stream = ignore (try BinIO.flushOut stream); + + +(* input operations *) + fun read stream n = Byte.bytesToString (BinIO.inputN (stream, n)); fun read_block stream n = @@ -34,10 +57,40 @@ in read [] end; +(* header with chunk lengths *) + +fun write_header stream ns = + (write stream (space_implode "," (map string_of_int ns)); + newline stream); + +fun err_header line = error ("Malformed message header: " ^ quote line) + +fun parse_header line = + map Value.parse_nat (space_explode "," line) + handle Fail _ => err_header line + +fun read_header stream = + read_line stream |> Option.map parse_header; + +fun read_header1 stream = + read_line stream |> Option.map (fn line => + (case parse_header line of + [n] => n + | _ => err_header line)); + + (* messages with multiple chunks (arbitrary content) *) +fun write_message stream chunks = + (write_header stream (map size chunks); + List.app (write stream) chunks; + flush stream); + fun read_chunk stream n = - let val (len, chunk) = `size (read stream n) in + let + val chunk = read stream n; + val len = size chunk; + in if len = n then chunk else error ("Malformed message chunk: unexpected EOF after " ^ @@ -45,10 +98,36 @@ end; fun read_message stream = - read_line stream |> Option.map (fn line => - let - val ns = map Value.parse_nat (space_explode "," line) - handle Fail _ => error ("Malformed message header: " ^ quote line); - in map (read_chunk stream) ns end); + read_header stream |> (Option.map o map) (read_chunk stream); + + +(* hybrid messages: line or length+block (with content restriction) *) + +fun is_length s = + s <> "" andalso forall_string Symbol.is_ascii_digit s; + +fun has_line_terminator s = + String.isSuffix "\r" s orelse String.isSuffix "\n" s; + +fun write_line_message stream msg = + if is_length msg orelse has_line_terminator msg then + error ("Bad content for line message:\n" ^ implode (take 100 (Symbol.explode msg))) + else + let val n = size msg in + if n > 100 orelse exists_string (fn s => s = "\n") msg then + write_header stream [n + 1] + else (); + write stream msg; + newline stream; + flush stream + end; + +fun read_line_message stream = + (case read_line stream of + SOME line => + (case try Value.parse_nat line of + NONE => SOME line + | SOME n => Option.map trim_line (read_block stream n)) + | NONE => NONE) handle IO.Io _ => NONE; end; diff -r b28b001e7ee8 -r 387894c2fb2c src/Pure/PIDE/byte_message.scala --- a/src/Pure/PIDE/byte_message.scala Tue Dec 11 23:59:41 2018 +0100 +++ b/src/Pure/PIDE/byte_message.scala Wed Dec 12 00:01:11 2018 +0100 @@ -11,6 +11,19 @@ object Byte_Message { + /* output operations */ + + def write(stream: OutputStream, bytes: Bytes) { bytes.write_stream(stream) } + + def newline(stream: OutputStream) { stream.write(10) } + + def flush(stream: OutputStream): Unit = + try { stream.flush() } + catch { case _: IOException => } + + + /* input operations */ + def read(stream: InputStream, length: Int): Bytes = Bytes.read_stream(stream, limit = length) @@ -36,7 +49,54 @@ } - /* hybrid messages: line or length+block (with content restriction) */ + /* header with chunk lengths */ + + private def err_header(line: String): Nothing = + error("Malformed message header: " + quote(line)) + + private def parse_header(line: String): List[Int] = + try { space_explode(',', line).map(Value.Int.parse_nat) } + catch { case ERROR(_) => err_header(line) } + + def read_header(stream: InputStream): Option[List[Int]] = + read_line(stream).map(_.text).map(parse_header(_)) + + def read_header1(stream: InputStream): Option[Int] = + read_line(stream).map(_.text).map(line => + parse_header(line) match { + case List(n) => n + case _ => err_header(line) + }) + + def write_header(stream: OutputStream, ns: List[Int]) + { + stream.write(UTF8.bytes(ns.mkString(","))) + newline(stream) + } + + + /* messages with multiple chunks (arbitrary content) */ + + def write_message(stream: OutputStream, chunks: List[Bytes]) + { + write_header(stream, chunks.map(_.length)) + chunks.foreach(write(stream, _)) + flush(stream) + } + + private def read_chunk(stream: InputStream, n: Int): Bytes = + { + val chunk = read(stream, n) + val len = chunk.length + if (len == n) chunk + else error("Malformed message chunk: unexpected EOF after " + len + " of " + n + " bytes") + } + + def read_message(stream: InputStream): Option[List[Bytes]] = + read_header(stream).map(ns => ns.map(n => read_chunk(stream, n))) + + + /* hybrid messages: line or length+block (restricted content) */ private def is_length(msg: Bytes): Boolean = !msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar)) @@ -53,25 +113,18 @@ error ("Bad content for line message:\n" ++ msg.text.take(100)) if (msg.length > 100 || msg.iterator.contains(10)) { - stream.write(UTF8.bytes((msg.length + 1).toString)) - stream.write(10) + write_header(stream, List(msg.length + 1)) } - msg.write_stream(stream) - stream.write(10) - - try { stream.flush() } catch { case _: IOException => } + write(stream, msg) + newline(stream) + flush(stream) } def read_line_message(stream: InputStream): Option[Bytes] = - { - try { - read_line(stream) match { - case Some(line) => - if (is_length(line)) read_block(stream, Value.Int.parse(line.text)).map(_.trim_line) - else Some(line) - case None => None - } + read_line(stream) match { + case Some(line) => + if (is_length(line)) read_block(stream, Value.Int.parse(line.text)).map(_.trim_line) + else Some(line) + case None => None } - catch { case _: IOException => None } - } } diff -r b28b001e7ee8 -r 387894c2fb2c src/Pure/System/message_channel.ML --- a/src/Pure/System/message_channel.ML Tue Dec 11 23:59:41 2018 +0100 +++ b/src/Pure/System/message_channel.ML Wed Dec 12 00:01:11 2018 +0100 @@ -41,16 +41,15 @@ fun send (Message_Channel {send, ...}) = send; fun shutdown (Message_Channel {shutdown, ...}) = shutdown (); -fun flush stream = ignore (try BinIO.flushOut stream); val flush_timeout = SOME (seconds 0.02); fun message_output mbox stream = let fun continue timeout = (case Mailbox.receive timeout mbox of - [] => (flush stream; continue NONE) + [] => (Byte_Message.flush stream; continue NONE) | msgs => received timeout msgs) - and received _ (NONE :: _) = flush stream + and received _ (NONE :: _) = Byte_Message.flush stream | received _ (SOME msg :: rest) = (output_message stream msg; received flush_timeout rest) | received timeout [] = continue timeout; in fn () => continue NONE end; diff -r b28b001e7ee8 -r 387894c2fb2c src/Pure/Tools/server.scala --- a/src/Pure/Tools/server.scala Tue Dec 11 23:59:41 2018 +0100 +++ b/src/Pure/Tools/server.scala Wed Dec 12 00:01:11 2018 +0100 @@ -181,7 +181,8 @@ interrupt = interrupt) def read_message(): Option[String] = - Byte_Message.read_line_message(in).map(_.text) + try { Byte_Message.read_line_message(in).map(_.text) } + catch { case _: IOException => None } def write_message(msg: String): Unit = out_lock.synchronized { Byte_Message.write_line_message(out, Bytes(UTF8.bytes(msg))) }