more uniform multi-language operations;
authorwenzelm
Wed, 12 Dec 2018 00:01:11 +0100 (2018-12-11)
changeset 69451 387894c2fb2c
parent 69450 b28b001e7ee8
child 69452 704915cf59fa
more uniform multi-language operations; clarified modules and signature;
src/Pure/PIDE/byte_message.ML
src/Pure/PIDE/byte_message.scala
src/Pure/System/message_channel.ML
src/Pure/Tools/server.scala
--- a/src/Pure/PIDE/byte_message.ML	Tue Dec 11 23:59:41 2018 +0100
+++ b/src/Pure/PIDE/byte_message.ML	Wed Dec 12 00:01:11 2018 +0100
@@ -6,15 +6,38 @@
 
 signature BYTE_MESSAGE =
 sig
-  val read_line: BinIO.instream -> string option
+  val write: BinIO.outstream -> string -> unit
+  val newline: BinIO.outstream -> unit
+  val flush: BinIO.outstream -> unit
   val read: BinIO.instream -> int -> string
   val read_block: BinIO.instream -> int -> string option
+  val read_line: BinIO.instream -> string option
+
+  val write_header: BinIO.outstream -> int list -> unit
+  val read_header: BinIO.instream -> int list option
+  val read_header1: BinIO.instream -> int option
+
+  val write_message: BinIO.outstream -> string list -> unit
   val read_message: BinIO.instream -> string list option
+
+  val write_line_message: BinIO.outstream -> string -> unit
+  val read_line_message: BinIO.instream -> string option
 end;
 
 structure Byte_Message: BYTE_MESSAGE =
 struct
 
+(* output operations *)
+
+fun write stream s = BinIO.output (stream, Byte.stringToBytes s);
+
+fun newline stream = write stream "\n";
+
+fun flush stream = ignore (try BinIO.flushOut stream);
+
+
+(* input operations *)
+
 fun read stream n = Byte.bytesToString (BinIO.inputN (stream, n));
 
 fun read_block stream n =
@@ -34,10 +57,40 @@
   in read [] end;
 
 
+(* header with chunk lengths *)
+
+fun write_header stream ns =
+ (write stream (space_implode "," (map string_of_int ns));
+  newline stream);
+
+fun err_header line = error ("Malformed message header: " ^ quote line)
+
+fun parse_header line =
+  map Value.parse_nat (space_explode "," line)
+    handle Fail _ => err_header line
+
+fun read_header stream =
+  read_line stream |> Option.map parse_header;
+
+fun read_header1 stream =
+  read_line stream |> Option.map (fn line =>
+    (case parse_header line of
+      [n] => n
+    | _ => err_header line));
+
+
 (* messages with multiple chunks (arbitrary content) *)
 
+fun write_message stream chunks =
+ (write_header stream (map size chunks);
+  List.app (write stream) chunks;
+  flush stream);
+
 fun read_chunk stream n =
-  let val (len, chunk) = `size (read stream n) in
+  let
+    val chunk = read stream n;
+    val len = size chunk;
+  in
     if len = n then chunk
     else
       error ("Malformed message chunk: unexpected EOF after " ^
@@ -45,10 +98,36 @@
   end;
 
 fun read_message stream =
-  read_line stream |> Option.map (fn line =>
-    let
-      val ns = map Value.parse_nat (space_explode "," line)
-        handle Fail _ => error ("Malformed message header: " ^ quote line);
-    in map (read_chunk stream) ns end);
+  read_header stream |> (Option.map o map) (read_chunk stream);
+
+
+(* hybrid messages: line or length+block (with content restriction) *)
+
+fun is_length s =
+  s <> "" andalso forall_string Symbol.is_ascii_digit s;
+
+fun has_line_terminator s =
+  String.isSuffix "\r" s orelse String.isSuffix "\n" s;
+
+fun write_line_message stream msg =
+  if is_length msg orelse has_line_terminator msg then
+    error ("Bad content for line message:\n" ^ implode (take 100 (Symbol.explode msg)))
+  else
+    let val n = size msg in
+      if n > 100 orelse exists_string (fn s => s = "\n") msg then
+        write_header stream [n + 1]
+      else ();
+      write stream msg;
+      newline stream;
+      flush stream
+    end;
+
+fun read_line_message stream =
+  (case read_line stream of
+    SOME line =>
+      (case try Value.parse_nat line of
+        NONE => SOME line
+      | SOME n => Option.map trim_line (read_block stream n))
+  | NONE => NONE) handle IO.Io _ => NONE;
 
 end;
--- a/src/Pure/PIDE/byte_message.scala	Tue Dec 11 23:59:41 2018 +0100
+++ b/src/Pure/PIDE/byte_message.scala	Wed Dec 12 00:01:11 2018 +0100
@@ -11,6 +11,19 @@
 
 object Byte_Message
 {
+  /* output operations */
+
+  def write(stream: OutputStream, bytes: Bytes) { bytes.write_stream(stream) }
+
+  def newline(stream: OutputStream) { stream.write(10) }
+
+  def flush(stream: OutputStream): Unit =
+    try { stream.flush() }
+    catch { case _: IOException => }
+
+
+  /* input operations */
+
   def read(stream: InputStream, length: Int): Bytes =
     Bytes.read_stream(stream, limit = length)
 
@@ -36,7 +49,54 @@
   }
 
 
-  /* hybrid messages: line or length+block (with content restriction) */
+  /* header with chunk lengths */
+
+  private def err_header(line: String): Nothing =
+    error("Malformed message header: " + quote(line))
+
+  private def parse_header(line: String): List[Int] =
+    try { space_explode(',', line).map(Value.Int.parse_nat) }
+    catch { case ERROR(_) => err_header(line) }
+
+  def read_header(stream: InputStream): Option[List[Int]] =
+    read_line(stream).map(_.text).map(parse_header(_))
+
+  def read_header1(stream: InputStream): Option[Int] =
+    read_line(stream).map(_.text).map(line =>
+      parse_header(line) match {
+        case List(n) => n
+        case _ => err_header(line)
+      })
+
+  def write_header(stream: OutputStream, ns: List[Int])
+  {
+    stream.write(UTF8.bytes(ns.mkString(",")))
+    newline(stream)
+  }
+
+
+  /* messages with multiple chunks (arbitrary content) */
+
+  def write_message(stream: OutputStream, chunks: List[Bytes])
+  {
+    write_header(stream, chunks.map(_.length))
+    chunks.foreach(write(stream, _))
+    flush(stream)
+  }
+
+  private def read_chunk(stream: InputStream, n: Int): Bytes =
+  {
+    val chunk = read(stream, n)
+    val len = chunk.length
+    if (len == n) chunk
+    else error("Malformed message chunk: unexpected EOF after " + len + " of " + n + " bytes")
+  }
+
+  def read_message(stream: InputStream): Option[List[Bytes]] =
+    read_header(stream).map(ns => ns.map(n => read_chunk(stream, n)))
+
+
+  /* hybrid messages: line or length+block (restricted content) */
 
   private def is_length(msg: Bytes): Boolean =
     !msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar))
@@ -53,25 +113,18 @@
       error ("Bad content for line message:\n" ++ msg.text.take(100))
 
     if (msg.length > 100 || msg.iterator.contains(10)) {
-      stream.write(UTF8.bytes((msg.length + 1).toString))
-      stream.write(10)
+      write_header(stream, List(msg.length + 1))
     }
-    msg.write_stream(stream)
-    stream.write(10)
-
-    try { stream.flush() } catch { case _: IOException => }
+    write(stream, msg)
+    newline(stream)
+    flush(stream)
   }
 
   def read_line_message(stream: InputStream): Option[Bytes] =
-  {
-    try {
-      read_line(stream) match {
-        case Some(line) =>
-          if (is_length(line)) read_block(stream, Value.Int.parse(line.text)).map(_.trim_line)
-          else Some(line)
-        case None => None
-      }
+    read_line(stream) match {
+      case Some(line) =>
+        if (is_length(line)) read_block(stream, Value.Int.parse(line.text)).map(_.trim_line)
+        else Some(line)
+      case None => None
     }
-    catch { case _: IOException => None }
-  }
 }
--- a/src/Pure/System/message_channel.ML	Tue Dec 11 23:59:41 2018 +0100
+++ b/src/Pure/System/message_channel.ML	Wed Dec 12 00:01:11 2018 +0100
@@ -41,16 +41,15 @@
 fun send (Message_Channel {send, ...}) = send;
 fun shutdown (Message_Channel {shutdown, ...}) = shutdown ();
 
-fun flush stream = ignore (try BinIO.flushOut stream);
 val flush_timeout = SOME (seconds 0.02);
 
 fun message_output mbox stream =
   let
     fun continue timeout =
       (case Mailbox.receive timeout mbox of
-        [] => (flush stream; continue NONE)
+        [] => (Byte_Message.flush stream; continue NONE)
       | msgs => received timeout msgs)
-    and received _ (NONE :: _) = flush stream
+    and received _ (NONE :: _) = Byte_Message.flush stream
       | received _ (SOME msg :: rest) = (output_message stream msg; received flush_timeout rest)
       | received timeout [] = continue timeout;
   in fn () => continue NONE end;
--- a/src/Pure/Tools/server.scala	Tue Dec 11 23:59:41 2018 +0100
+++ b/src/Pure/Tools/server.scala	Wed Dec 12 00:01:11 2018 +0100
@@ -181,7 +181,8 @@
         interrupt = interrupt)
 
     def read_message(): Option[String] =
-      Byte_Message.read_line_message(in).map(_.text)
+      try { Byte_Message.read_line_message(in).map(_.text) }
+      catch { case _: IOException => None }
 
     def write_message(msg: String): Unit =
       out_lock.synchronized { Byte_Message.write_line_message(out, Bytes(UTF8.bytes(msg))) }