22 catch { case _: IOException => } |
22 catch { case _: IOException => } |
23 |
23 |
24 |
24 |
25 /* input operations */ |
25 /* input operations */ |
26 |
26 |
27 def read(stream: InputStream, length: Int): Bytes = |
27 def read(stream: InputStream, n: Int): Bytes = |
28 Bytes.read_stream(stream, limit = length) |
28 Bytes.read_stream(stream, limit = n) |
29 |
29 |
30 def read_block(stream: InputStream, length: Int): Option[Bytes] = |
30 def read_block(stream: InputStream, n: Int): (Option[Bytes], Int) = |
31 { |
31 { |
32 val msg = read(stream, length) |
32 val msg = read(stream, n) |
33 if (msg.length == length) Some(msg) else None |
33 val len = msg.length |
|
34 (if (len == n) Some(msg) else None, len) |
34 } |
35 } |
35 |
36 |
36 def read_line(stream: InputStream): Option[Bytes] = |
37 def read_line(stream: InputStream): Option[Bytes] = |
37 { |
38 { |
38 val line = new ByteArrayOutputStream(100) |
39 val line = new ByteArrayOutputStream(100) |
49 } |
50 } |
50 |
51 |
51 |
52 |
52 /* header with chunk lengths */ |
53 /* header with chunk lengths */ |
53 |
54 |
|
55 def write_header(stream: OutputStream, ns: List[Int]) |
|
56 { |
|
57 stream.write(UTF8.bytes(ns.mkString(","))) |
|
58 newline(stream) |
|
59 } |
|
60 |
54 private def err_header(line: String): Nothing = |
61 private def err_header(line: String): Nothing = |
55 error("Malformed message header: " + quote(line)) |
62 error("Malformed message header: " + quote(line)) |
56 |
63 |
57 private def parse_header(line: String): List[Int] = |
64 private def parse_header(line: String): List[Int] = |
58 try { space_explode(',', line).map(Value.Int.parse_nat) } |
65 try { space_explode(',', line).map(Value.Nat.parse) } |
59 catch { case ERROR(_) => err_header(line) } |
66 catch { case ERROR(_) => err_header(line) } |
60 |
67 |
61 def read_header(stream: InputStream): Option[List[Int]] = |
68 def read_header(stream: InputStream): Option[List[Int]] = |
62 read_line(stream).map(_.text).map(parse_header(_)) |
69 read_line(stream).map(_.text).map(parse_header(_)) |
63 |
70 |
66 parse_header(line) match { |
73 parse_header(line) match { |
67 case List(n) => n |
74 case List(n) => n |
68 case _ => err_header(line) |
75 case _ => err_header(line) |
69 }) |
76 }) |
70 |
77 |
71 def write_header(stream: OutputStream, ns: List[Int]) |
|
72 { |
|
73 stream.write(UTF8.bytes(ns.mkString(","))) |
|
74 newline(stream) |
|
75 } |
|
76 |
|
77 |
78 |
78 /* messages with multiple chunks (arbitrary content) */ |
79 /* messages with multiple chunks (arbitrary content) */ |
79 |
80 |
80 def write_message(stream: OutputStream, chunks: List[Bytes]) |
81 def write_message(stream: OutputStream, chunks: List[Bytes]) |
81 { |
82 { |
83 chunks.foreach(write(stream, _)) |
84 chunks.foreach(write(stream, _)) |
84 flush(stream) |
85 flush(stream) |
85 } |
86 } |
86 |
87 |
87 private def read_chunk(stream: InputStream, n: Int): Bytes = |
88 private def read_chunk(stream: InputStream, n: Int): Bytes = |
88 { |
89 read_block(stream, n) match { |
89 val chunk = read(stream, n) |
90 case (Some(chunk), _) => chunk |
90 val len = chunk.length |
91 case (None, len) => |
91 if (len == n) chunk |
92 error("Malformed message chunk: unexpected EOF after " + len + " of " + n + " bytes") |
92 else error("Malformed message chunk: unexpected EOF after " + len + " of " + n + " bytes") |
93 } |
93 } |
|
94 |
94 |
95 def read_message(stream: InputStream): Option[List[Bytes]] = |
95 def read_message(stream: InputStream): Option[List[Bytes]] = |
96 read_header(stream).map(ns => ns.map(n => read_chunk(stream, n))) |
96 read_header(stream).map(ns => ns.map(n => read_chunk(stream, n))) |
97 |
97 |
98 |
98 |
99 /* hybrid messages: line or length+block (restricted content) */ |
99 /* hybrid messages: line or length+block (restricted content) */ |
100 |
100 |
101 private def is_length(msg: Bytes): Boolean = |
101 private def is_length(msg: Bytes): Boolean = |
102 !msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar)) |
102 !msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar)) |
103 |
103 |
104 private def has_line_terminator(msg: Bytes): Boolean = |
104 private def is_terminated(msg: Bytes): Boolean = |
105 { |
105 { |
106 val len = msg.length |
106 val len = msg.length |
107 len > 0 && Symbol.is_ascii_line_terminator(msg.charAt(len - 1)) |
107 len > 0 && Symbol.is_ascii_line_terminator(msg.charAt(len - 1)) |
108 } |
108 } |
109 |
109 |
110 def write_line_message(stream: OutputStream, msg: Bytes) |
110 def write_line_message(stream: OutputStream, msg: Bytes) |
111 { |
111 { |
112 if (is_length(msg) || has_line_terminator(msg)) |
112 if (is_length(msg) || is_terminated(msg)) |
113 error ("Bad content for line message:\n" ++ msg.text.take(100)) |
113 error ("Bad content for line message:\n" ++ msg.text.take(100)) |
114 |
114 |
115 if (msg.length > 100 || msg.iterator.contains(10)) { |
115 val n = msg.length |
116 write_header(stream, List(msg.length + 1)) |
116 if (n > 100 || msg.iterator.contains(10)) write_header(stream, List(n + 1)) |
117 } |
117 |
118 write(stream, msg) |
118 write(stream, msg) |
119 newline(stream) |
119 newline(stream) |
120 flush(stream) |
120 flush(stream) |
121 } |
121 } |
122 |
122 |
123 def read_line_message(stream: InputStream): Option[Bytes] = |
123 def read_line_message(stream: InputStream): Option[Bytes] = |
124 read_line(stream) match { |
124 read_line(stream) match { |
|
125 case None => None |
125 case Some(line) => |
126 case Some(line) => |
126 if (is_length(line)) read_block(stream, Value.Int.parse(line.text)).map(_.trim_line) |
127 Value.Nat.unapply(line.text) match { |
127 else Some(line) |
128 case None => Some(line) |
128 case None => None |
129 case Some(n) => read_block(stream, n)._1.map(_.trim_line) |
|
130 } |
129 } |
131 } |
130 } |
132 } |