more uniform use of Byte_Message;
authorwenzelm
Sun, 11 Apr 2021 22:47:55 +0200
changeset 73815 22b5ecb53dd9
parent 73814 a5d1d1e2f109
child 73816 a578ebf5b78d
more uniform use of Byte_Message; support protocol_message with multiple chunks;
src/Pure/General/bytes.scala
src/Pure/General/output_primitives.ML
src/Pure/PIDE/byte_message.ML
src/Pure/PIDE/protocol.ML
src/Pure/PIDE/prover.scala
src/Pure/PIDE/session.scala
src/Pure/System/isabelle_process.ML
src/Pure/System/message_channel.ML
src/Pure/System/scala.ML
src/Pure/Thy/export.ML
src/Pure/Thy/thy_info.ML
src/Pure/Tools/build.ML
src/Pure/Tools/build_job.scala
src/Pure/Tools/debugger.ML
src/Pure/Tools/debugger.scala
src/Pure/Tools/print_operation.ML
--- a/src/Pure/General/bytes.scala	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/General/bytes.scala	Sun Apr 11 22:47:55 2021 +0200
@@ -135,7 +135,10 @@
   }
 
   def text: String =
-    UTF8.decode_chars(s => s, bytes, offset, offset + length).toString
+    UTF8.decode_chars(identity, bytes, offset, offset + length).toString
+
+  def symbols: String =
+    UTF8.decode_chars(Symbol.decode, bytes, offset, offset + length).toString
 
   def base64: String =
   {
--- a/src/Pure/General/output_primitives.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/General/output_primitives.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -29,7 +29,7 @@
   val status_fn: output list -> unit
   val report_fn: output list -> unit
   val result_fn: properties -> output list -> unit
-  type protocol_message_fn = properties -> XML.body -> unit
+  type protocol_message_fn = properties -> XML.body list -> unit
   val protocol_message_fn: protocol_message_fn
   val markup_fn: string * properties -> output * output
 end;
@@ -73,7 +73,7 @@
 val report_fn = ignore_outputs;
 fun result_fn (_: properties) = ignore_outputs;
 
-type protocol_message_fn = properties -> XML.body -> unit;
+type protocol_message_fn = properties -> XML.body list -> unit;
 val protocol_message_fn: protocol_message_fn = fn _ => fn _ => ();
 
 fun markup_fn (_: string * properties) = ("", "");
--- a/src/Pure/PIDE/byte_message.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/PIDE/byte_message.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -14,6 +14,7 @@
   val read_block: BinIO.instream -> int -> string option * int
   val read_line: BinIO.instream -> string option
   val write_message: BinIO.outstream -> string list -> unit
+  val write_message_yxml: BinIO.outstream -> XML.body list -> unit
   val read_message: BinIO.instream -> string list option
   val write_line_message: BinIO.outstream -> string -> unit
   val read_line_message: BinIO.instream -> string option
@@ -64,6 +65,11 @@
 fun write_message stream chunks =
   (write stream (make_header (map size chunks) @ chunks); flush stream);
 
+fun write_message_yxml stream chunks =
+  (write stream (make_header (map YXML.body_size chunks));
+   (List.app o List.app) (write_yxml stream) chunks;
+   flush stream);
+
 fun parse_header line =
   map Value.parse_nat (space_explode "," line)
     handle Fail _ => error ("Malformed message header: " ^ quote line);
--- a/src/Pure/PIDE/protocol.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/PIDE/protocol.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -60,7 +60,7 @@
   end;
 
 fun commands_accepted ids =
-  Output.protocol_message Markup.commands_accepted [XML.Text (space_implode "," ids)];
+  Output.protocol_message Markup.commands_accepted [[XML.Text (space_implode "," ids)]];
 
 val _ =
   Protocol_Command.define "Document.define_command"
@@ -141,12 +141,12 @@
 
             val _ =
               Output.protocol_message Markup.assign_update
-                ((new_id, edited, assign_update) |>
+                [(new_id, edited, assign_update) |>
                   let
                     open XML.Encode;
                     fun encode_upd (a, bs) =
                       string (space_implode "," (map Value.print_int (a :: bs)));
-                  in triple int (list string) (list encode_upd) end);
+                  in triple int (list string) (list encode_upd) end];
           in Document.start_execution state' end)));
 
 val _ =
@@ -157,7 +157,7 @@
           YXML.parse_body versions_yxml |>
             let open XML.Decode in list int end;
         val state1 = Document.remove_versions versions state;
-        val _ = Output.protocol_message Markup.removed_versions [XML.Text (versions_yxml)];
+        val _ = Output.protocol_message Markup.removed_versions [[XML.Text (versions_yxml)]];
       in state1 end));
 
 val _ =
--- a/src/Pure/PIDE/prover.scala	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/PIDE/prover.scala	Sun Apr 11 22:47:55 2021 +0200
@@ -47,17 +47,28 @@
         if (is_status || is_report) message.body.map(_.toString).mkString
         else Pretty.string_of(message.body, metric = Symbol.Metric)
       if (properties.isEmpty)
-        kind.toString + " [[" + res + "]]"
+        kind + " [[" + res + "]]"
       else
-        kind.toString + " " +
+        kind + " " +
           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
     }
   }
 
-  class Protocol_Output(props: Properties.T, val bytes: Bytes)
+  class Protocol_Error(msg: String) extends Exception(msg)
+  def bad_header(print: String): Nothing = throw new Protocol_Error("bad message header: " + print)
+  def bad_chunks(): Nothing = throw new Protocol_Error("bad message chunks")
+
+  def the_chunk(chunks: List[Bytes], print: => String): Bytes =
+    chunks match {
+      case List(chunk) => chunk
+      case _ => throw new Protocol_Error("single chunk expected: " + print)
+    }
+
+  class Protocol_Output(props: Properties.T, val chunks: List[Bytes])
     extends Output(XML.Elem(Markup(Markup.PROTOCOL, props), Nil))
   {
-    lazy val text: String = bytes.text
+    def chunk: Bytes = the_chunk(chunks, toString)
+    lazy val text: String = chunk.text
   }
 }
 
@@ -75,9 +86,9 @@
     receiver(new Prover.Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text)))))
   }
 
-  private def protocol_output(props: Properties.T, bytes: Bytes): Unit =
+  private def protocol_output(props: Properties.T, chunks: List[Bytes]): Unit =
   {
-    receiver(new Prover.Protocol_Output(cache.props(props), bytes))
+    receiver(new Prover.Protocol_Output(cache.props(props), chunks))
   }
 
   private def output(kind: String, props: Properties.T, body: XML.Body): Unit =
@@ -252,90 +263,37 @@
 
   private def message_output(stream: InputStream): Thread =
   {
-    class EOF extends Exception
-    class Protocol_Error(msg: String) extends Exception(msg)
-
-    val name = "message_output"
-    Isabelle_Thread.fork(name = name) {
-      val default_buffer = new Array[Byte](65536)
-      var c = -1
+    def decode_chunk(chunk: Bytes): XML.Body = YXML.parse_body_failsafe(chunk.symbols)
 
-      def read_int(): Int =
-      //{{{
-      {
-        var n = 0
-        c = stream.read
-        if (c == -1) throw new EOF
-        while (48 <= c && c <= 57) {
-          n = 10 * n + (c - 48)
-          c = stream.read
-        }
-        if (c != 10)
-          throw new Protocol_Error("malformed header: expected integer followed by newline")
-        else n
-      }
-      //}}}
-
-      def read_chunk_bytes(): (Array[Byte], Int) =
-      //{{{
-      {
-        val n = read_int()
-        val buf =
-          if (n <= default_buffer.length) default_buffer
-          else new Array[Byte](n)
-
-        var i = 0
-        var m = 0
-        do {
-          m = stream.read(buf, i, n - i)
-          if (m != -1) i += m
+    val thread_name = "message_output"
+    Isabelle_Thread.fork(name = thread_name) {
+      try {
+        var finished = false
+        while (!finished) {
+          Byte_Message.read_message(stream) match {
+            case None => finished = true
+            case Some(header :: chunks) =>
+              decode_chunk(header) match {
+                case List(XML.Elem(Markup(name, props), Nil)) =>
+                  val kind = name.intern
+                  if (kind == Markup.PROTOCOL) protocol_output(props, chunks)
+                  else {
+                    val body = decode_chunk(Prover.the_chunk(chunks, name))
+                    output(kind, props, body)
+                  }
+                case _ => Prover.bad_header(header.toString)
+              }
+            case Some(_) => Prover.bad_chunks()
+          }
         }
-        while (m != -1 && n > i)
-
-        if (i != n)
-          throw new Protocol_Error("bad chunk (unexpected EOF after " + i + " of " + n + " bytes)")
-
-        (buf, n)
-      }
-      //}}}
-
-      def read_chunk(): XML.Body =
-      {
-        val (buf, n) = read_chunk_bytes()
-        YXML.parse_body_failsafe(UTF8.decode_chars(Symbol.decode, buf, 0, n))
-      }
-
-      try {
-        do {
-          try {
-            val header = read_chunk()
-            header match {
-              case List(XML.Elem(Markup(name, props), Nil)) =>
-                val kind = name.intern
-                if (kind == Markup.PROTOCOL) {
-                  val (buf, n) = read_chunk_bytes()
-                  protocol_output(props, Bytes(buf, 0, n))
-                }
-                else {
-                  val body = read_chunk()
-                  output(kind, props, body)
-                }
-              case _ =>
-                read_chunk()
-                throw new Protocol_Error("bad header: " + header.toString)
-            }
-          }
-          catch { case _: EOF => }
-        }
-        while (c != -1)
       }
       catch {
         case e: IOException => system_output("Cannot read message:\n" + e.getMessage)
-        case e: Protocol_Error => system_output("Malformed message:\n" + e.getMessage)
+        case e: Prover.Protocol_Error => system_output("Malformed message:\n" + e.getMessage)
       }
       stream.close()
 
-      system_output(name + " terminated")
+      system_output(thread_name + " terminated")
     }
   }
 
--- a/src/Pure/PIDE/session.scala	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/PIDE/session.scala	Sun Apr 11 22:47:55 2021 +0200
@@ -506,7 +506,7 @@
               case Protocol.Export(args)
               if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined =>
                 val id = Value.Long.unapply(args.id.get).get
-                val export = Export.make_entry("", args, msg.bytes, cache)
+                val export = Export.make_entry("", args, msg.chunk, cache)
                 change_command(_.add_export(id, (args.serial, export)))
 
               case Protocol.Loading_Theory(node_name, id) =>
--- a/src/Pure/System/isabelle_process.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/System/isabelle_process.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -107,8 +107,8 @@
 
     val msg_channel = Message_Channel.make out_stream;
 
-    fun message name props body =
-      Message_Channel.send msg_channel (Message_Channel.message name props body);
+    fun message name props chunks =
+      Message_Channel.send msg_channel (Message_Channel.message name props chunks);
 
     fun standard_message props name ss =
       if forall (fn s => s = "") ss then ()
@@ -117,7 +117,7 @@
           val pos_props =
             if exists Markup.position_property props then props
             else props @ Position.properties_of (Position.thread_data ());
-        in message name pos_props (XML.blob ss) end;
+        in message name pos_props [XML.blob ss] end;
 
     fun report_message ss =
       if Context_Position.pide_reports ()
@@ -145,9 +145,9 @@
       Unsynchronized.setmp Private_Output.error_message_fn
         (fn (i, s) => standard_message (Markup.serial_properties i) Markup.errorN s) #>
       Unsynchronized.setmp Private_Output.system_message_fn
-        (fn ss => message Markup.systemN [] (XML.blob ss)) #>
+        (fn ss => message Markup.systemN [] [XML.blob ss]) #>
       Unsynchronized.setmp Private_Output.protocol_message_fn
-        (fn props => fn body => message Markup.protocolN props body) #>
+        (fn props => fn chunks => message Markup.protocolN props chunks) #>
       Unsynchronized.setmp print_mode
         ((! print_mode @ #1 modes) |> fold (update op =) (#2 modes));
 
@@ -167,7 +167,7 @@
       in protocol_loop () end;
 
     fun protocol () =
-     (message Markup.initN [] [XML.Text (Session.welcome ())];
+     (message Markup.initN [] [[XML.Text (Session.welcome ())]];
       ml_statistics ();
       protocol_loop ());
 
--- a/src/Pure/System/message_channel.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/System/message_channel.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -7,7 +7,7 @@
 signature MESSAGE_CHANNEL =
 sig
   type message
-  val message: string -> Properties.T -> XML.body -> message
+  val message: string -> Properties.T -> XML.body list -> message
   type T
   val send: T -> message -> unit
   val shutdown: T -> unit
@@ -19,15 +19,13 @@
 
 (* message *)
 
-datatype message = Message of {body: XML.body, flush: bool};
+datatype message = Message of {chunks: XML.body list, flush: bool};
 
-fun chunk body = XML.Text (string_of_int (YXML.body_size body) ^ "\n") :: body;
-
-fun message name raw_props body =
+fun message name raw_props chunks =
   let
     val robust_props = map (apply2 YXML.embed_controls) raw_props;
-    val header = XML.Elem ((name, robust_props), []);
-  in Message {body = chunk [header] @ chunk body, flush = name = Markup.protocolN} end;
+    val header = [XML.Elem ((name, robust_props), [])];
+  in Message {chunks = header :: chunks, flush = name = Markup.protocolN} end;
 
 
 (* channel *)
@@ -46,9 +44,9 @@
         [] => (Byte_Message.flush stream; continue NONE)
       | msgs => received timeout msgs)
     and received _ (NONE :: _) = Byte_Message.flush stream
-      | received _ (SOME (Message {body, flush}) :: rest) =
+      | received _ (SOME (Message {chunks, flush}) :: rest) =
           let
-            val _ = List.app (Byte_Message.write_yxml stream) body;
+            val _ = Byte_Message.write_message_yxml stream chunks;
             val timeout = if flush then (Byte_Message.flush stream; NONE) else flush_timeout;
           in received timeout rest end
       | received timeout [] = continue timeout;
--- a/src/Pure/System/scala.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/System/scala.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -44,7 +44,7 @@
       val id = new_id ();
       fun invoke () =
        (Synchronized.change results (Symtab.update (id, Exn.Exn Match));
-        Output.protocol_message (Markup.invoke_scala name id) [XML.Text arg]);
+        Output.protocol_message (Markup.invoke_scala name id) [[XML.Text arg]]);
       fun cancel () =
        (Synchronized.change results (Symtab.delete_safe id);
         Output.protocol_message (Markup.cancel_scala id) []);
--- a/src/Pure/Thy/export.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/Thy/export.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -42,7 +42,7 @@
     name = Path.implode_binding (tap Path.proper_binding binding),
     executable = executable,
     compress = compress,
-    strict = strict} body);
+    strict = strict} [body]);
 
 fun export thy binding body =
   export_params
--- a/src/Pure/Thy/thy_info.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/Thy/thy_info.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -344,7 +344,7 @@
 
     val _ = remove_thy name;
     val _ = writeln ("Loading theory " ^ quote name ^ required_by " " initiators);
-    val _ = Output.try_protocol_message (Markup.loading_theory name @ text_props) (XML.blob [text]);
+    val _ = Output.try_protocol_message (Markup.loading_theory name @ text_props) [XML.blob [text]];
 
     val _ =
       Position.setmp_thread_data (Position.id_only id) (fn () =>
@@ -360,7 +360,7 @@
 
     val timing_result = Timing.result timing_start;
     val timing_props = [Markup.theory_timing, (Markup.nameN, name)];
-    val _  = Output.try_protocol_message (timing_props @ Markup.timing_properties timing_result) []
+    val _ = Output.try_protocol_message (timing_props @ Markup.timing_properties timing_result) []
 
     fun commit () = update_thy deps theory;
   in
--- a/src/Pure/Tools/build.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/Tools/build.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -91,6 +91,7 @@
             (Future.interruptible_task (fn () => (build (); (0, []))) () handle exn =>
               ((1, Runtime.exn_message_list exn) handle _ (*sic!*) => (2, ["CRASHED"])))
           |> let open XML.Encode in pair int (list string) end
+          |> single
           |> Output.protocol_message Markup.build_session_finished)
         end
       | _ => raise Match);
--- a/src/Pure/Tools/build_job.scala	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/Tools/build_job.scala	Sun Apr 11 22:47:55 2021 +0200
@@ -322,7 +322,7 @@
           private def export(msg: Prover.Protocol_Output): Boolean =
             msg.properties match {
               case Protocol.Export(args) =>
-                export_consumer(session_name, args, msg.bytes)
+                export_consumer(session_name, args, msg.chunk)
                 true
               case _ => false
             }
--- a/src/Pure/Tools/debugger.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/Tools/debugger.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -23,7 +23,7 @@
   else
     Output.protocol_message
       (Markup.debugger_output (Isabelle_Thread.get_name ()))
-      [XML.Text (Markup.markup (kind, Markup.serial_properties (serial ())) msg)];
+      [[XML.Text (Markup.markup (kind, Markup.serial_properties (serial ())) msg)]];
 
 val writeln_message = output_message Markup.writelnN;
 val warning_message = output_message Markup.warningN;
@@ -201,12 +201,12 @@
 
 fun debugger_state thread_name =
   Output.protocol_message (Markup.debugger_state thread_name)
-   (get_debugging ()
+   [get_debugging ()
     |> map (fn st =>
       (Position.properties_of
         (Exn_Properties.position_of_polyml_location (PolyML.DebuggerInterface.debugLocation st)),
        PolyML.DebuggerInterface.debugFunction st))
-    |> let open XML.Encode in list (pair properties string) end);
+    |> let open XML.Encode in list (pair properties string) end];
 
 fun debugger_command thread_name =
   (case get_input thread_name of
--- a/src/Pure/Tools/debugger.scala	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/Tools/debugger.scala	Sun Apr 11 22:47:55 2021 +0200
@@ -112,7 +112,7 @@
     {
       msg.properties match {
         case Markup.Debugger_State(thread_name) =>
-          val msg_body = Symbol.decode_yxml_failsafe(UTF8.decode_permissive(msg.bytes))
+          val msg_body = Symbol.decode_yxml_failsafe(UTF8.decode_permissive(msg.chunk))
           val debug_states =
           {
             import XML.Decode._
@@ -130,7 +130,7 @@
     {
       msg.properties match {
         case Markup.Debugger_Output(thread_name) =>
-          Symbol.decode_yxml_failsafe(UTF8.decode_permissive(msg.bytes)) match {
+          Symbol.decode_yxml_failsafe(UTF8.decode_permissive(msg.chunk)) match {
             case List(XML.Elem(Markup(name, props @ Markup.Serial(i)), body)) =>
               val message = XML.Elem(Markup(Markup.message(name), props), body)
               debugger.add_output(thread_name, i -> session.cache.elem(message))
--- a/src/Pure/Tools/print_operation.ML	Sun Apr 11 21:32:09 2021 +0200
+++ b/src/Pure/Tools/print_operation.ML	Sun Apr 11 22:47:55 2021 +0200
@@ -23,9 +23,9 @@
 
 fun report () =
   Output.try_protocol_message Markup.print_operations
-    (Synchronized.value print_operations
+    [Synchronized.value print_operations
       |> map (fn (x, (y, _)) => (x, y)) |> rev
-      |> let open XML.Encode in list (pair string string) end);
+      |> let open XML.Encode in list (pair string string) end];
 
 val _ = Protocol_Command.define "print_operations" (fn [] => report ());