more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
authorwenzelm
Sat, 20 Feb 2021 21:38:23 +0100
changeset 73262 71b7a5775342
parent 73261 f0446b3e4d17
child 73263 ad60214bef09
more reactive protocol messages, e.g. for Scala.function (relevant for Bash.process);
src/Pure/System/message_channel.ML
--- a/src/Pure/System/message_channel.ML	Sat Feb 20 20:23:30 2021 +0100
+++ b/src/Pure/System/message_channel.ML	Sat Feb 20 21:38:23 2021 +0100
@@ -19,7 +19,7 @@
 
 (* message *)
 
-datatype message = Message of XML.body;
+datatype message = Message of {body: XML.body, flush: bool};
 
 fun body_size body = fold (YXML.traverse (Integer.add o size)) body 0;
 
@@ -29,10 +29,7 @@
   let
     val robust_props = map (apply2 YXML.embed_controls) raw_props;
     val header = XML.Elem ((name, robust_props), []);
-  in Message (chunk [header] @ chunk body) end;
-
-fun output_message stream (Message body) =
-  fold (YXML.traverse (fn s => fn () => File.output stream s)) body ();
+  in Message {body = chunk [header] @ chunk body, flush = name = Markup.protocolN} end;
 
 
 (* channel *)
@@ -51,7 +48,11 @@
         [] => (Byte_Message.flush stream; continue NONE)
       | msgs => received timeout msgs)
     and received _ (NONE :: _) = Byte_Message.flush stream
-      | received _ (SOME msg :: rest) = (output_message stream msg; received flush_timeout rest)
+      | received _ (SOME (Message {body, flush}) :: rest) =
+          let
+            val _ = fold (YXML.traverse (fn s => fn () => File.output stream s)) body ();
+            val timeout = if flush then (Byte_Message.flush stream; NONE) else flush_timeout;
+          in received timeout rest end
       | received timeout [] = continue timeout;
   in fn () => continue NONE end;