src/Pure/PIDE/byte_message.scala
changeset 69452 704915cf59fa
parent 69451 387894c2fb2c
child 69454 ef051edd4d10
equal deleted inserted replaced
69451:387894c2fb2c 69452:704915cf59fa
    22     catch { case _: IOException => }
    22     catch { case _: IOException => }
    23 
    23 
    24 
    24 
    25   /* input operations */
    25   /* input operations */
    26 
    26 
    27   def read(stream: InputStream, length: Int): Bytes =
    27   def read(stream: InputStream, n: Int): Bytes =
    28     Bytes.read_stream(stream, limit = length)
    28     Bytes.read_stream(stream, limit = n)
    29 
    29 
    30   def read_block(stream: InputStream, length: Int): Option[Bytes] =
    30   def read_block(stream: InputStream, n: Int): (Option[Bytes], Int) =
    31   {
    31   {
    32     val msg = read(stream, length)
    32     val msg = read(stream, n)
    33     if (msg.length == length) Some(msg) else None
    33     val len = msg.length
       
    34     (if (len == n) Some(msg) else None, len)
    34   }
    35   }
    35 
    36 
    36   def read_line(stream: InputStream): Option[Bytes] =
    37   def read_line(stream: InputStream): Option[Bytes] =
    37   {
    38   {
    38     val line = new ByteArrayOutputStream(100)
    39     val line = new ByteArrayOutputStream(100)
    49   }
    50   }
    50 
    51 
    51 
    52 
    52   /* header with chunk lengths */
    53   /* header with chunk lengths */
    53 
    54 
       
    55   def write_header(stream: OutputStream, ns: List[Int])
       
    56   {
       
    57     stream.write(UTF8.bytes(ns.mkString(",")))
       
    58     newline(stream)
       
    59   }
       
    60 
    54   private def err_header(line: String): Nothing =
    61   private def err_header(line: String): Nothing =
    55     error("Malformed message header: " + quote(line))
    62     error("Malformed message header: " + quote(line))
    56 
    63 
    57   private def parse_header(line: String): List[Int] =
    64   private def parse_header(line: String): List[Int] =
    58     try { space_explode(',', line).map(Value.Int.parse_nat) }
    65     try { space_explode(',', line).map(Value.Nat.parse) }
    59     catch { case ERROR(_) => err_header(line) }
    66     catch { case ERROR(_) => err_header(line) }
    60 
    67 
    61   def read_header(stream: InputStream): Option[List[Int]] =
    68   def read_header(stream: InputStream): Option[List[Int]] =
    62     read_line(stream).map(_.text).map(parse_header(_))
    69     read_line(stream).map(_.text).map(parse_header(_))
    63 
    70 
    66       parse_header(line) match {
    73       parse_header(line) match {
    67         case List(n) => n
    74         case List(n) => n
    68         case _ => err_header(line)
    75         case _ => err_header(line)
    69       })
    76       })
    70 
    77 
    71   def write_header(stream: OutputStream, ns: List[Int])
       
    72   {
       
    73     stream.write(UTF8.bytes(ns.mkString(",")))
       
    74     newline(stream)
       
    75   }
       
    76 
       
    77 
    78 
    78   /* messages with multiple chunks (arbitrary content) */
    79   /* messages with multiple chunks (arbitrary content) */
    79 
    80 
    80   def write_message(stream: OutputStream, chunks: List[Bytes])
    81   def write_message(stream: OutputStream, chunks: List[Bytes])
    81   {
    82   {
    83     chunks.foreach(write(stream, _))
    84     chunks.foreach(write(stream, _))
    84     flush(stream)
    85     flush(stream)
    85   }
    86   }
    86 
    87 
    87   private def read_chunk(stream: InputStream, n: Int): Bytes =
    88   private def read_chunk(stream: InputStream, n: Int): Bytes =
    88   {
    89     read_block(stream, n) match {
    89     val chunk = read(stream, n)
    90       case (Some(chunk), _) => chunk
    90     val len = chunk.length
    91       case (None, len) =>
    91     if (len == n) chunk
    92         error("Malformed message chunk: unexpected EOF after " + len + " of " + n + " bytes")
    92     else error("Malformed message chunk: unexpected EOF after " + len + " of " + n + " bytes")
    93     }
    93   }
       
    94 
    94 
    95   def read_message(stream: InputStream): Option[List[Bytes]] =
    95   def read_message(stream: InputStream): Option[List[Bytes]] =
    96     read_header(stream).map(ns => ns.map(n => read_chunk(stream, n)))
    96     read_header(stream).map(ns => ns.map(n => read_chunk(stream, n)))
    97 
    97 
    98 
    98 
    99   /* hybrid messages: line or length+block (restricted content) */
    99   /* hybrid messages: line or length+block (restricted content) */
   100 
   100 
   101   private def is_length(msg: Bytes): Boolean =
   101   private def is_length(msg: Bytes): Boolean =
   102     !msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar))
   102     !msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar))
   103 
   103 
   104   private def has_line_terminator(msg: Bytes): Boolean =
   104   private def is_terminated(msg: Bytes): Boolean =
   105   {
   105   {
   106     val len = msg.length
   106     val len = msg.length
   107     len > 0 && Symbol.is_ascii_line_terminator(msg.charAt(len - 1))
   107     len > 0 && Symbol.is_ascii_line_terminator(msg.charAt(len - 1))
   108   }
   108   }
   109 
   109 
   110   def write_line_message(stream: OutputStream, msg: Bytes)
   110   def write_line_message(stream: OutputStream, msg: Bytes)
   111   {
   111   {
   112     if (is_length(msg) || has_line_terminator(msg))
   112     if (is_length(msg) || is_terminated(msg))
   113       error ("Bad content for line message:\n" ++ msg.text.take(100))
   113       error ("Bad content for line message:\n" ++ msg.text.take(100))
   114 
   114 
   115     if (msg.length > 100 || msg.iterator.contains(10)) {
   115     val n = msg.length
   116       write_header(stream, List(msg.length + 1))
   116     if (n > 100 || msg.iterator.contains(10)) write_header(stream, List(n + 1))
   117     }
   117 
   118     write(stream, msg)
   118     write(stream, msg)
   119     newline(stream)
   119     newline(stream)
   120     flush(stream)
   120     flush(stream)
   121   }
   121   }
   122 
   122 
   123   def read_line_message(stream: InputStream): Option[Bytes] =
   123   def read_line_message(stream: InputStream): Option[Bytes] =
   124     read_line(stream) match {
   124     read_line(stream) match {
       
   125       case None => None
   125       case Some(line) =>
   126       case Some(line) =>
   126         if (is_length(line)) read_block(stream, Value.Int.parse(line.text)).map(_.trim_line)
   127         Value.Nat.unapply(line.text) match {
   127         else Some(line)
   128           case None => Some(line)
   128       case None => None
   129           case Some(n) => read_block(stream, n)._1.map(_.trim_line)
       
   130         }
   129     }
   131     }
   130 }
   132 }