more scalable byte messages, notably for Scala functions in ML;
authorwenzelm
Tue, 21 Jun 2022 22:17:11 +0200
changeset 75577 c51e1cef1eae
parent 75576 8c5eedb6c983
child 75578 d3ba143a7ab8
more scalable byte messages, notably for Scala functions in ML;
src/Pure/General/bytes.ML
src/Pure/General/socket_io.ML
src/Pure/PIDE/byte_message.ML
src/Pure/PIDE/protocol_command.ML
src/Pure/System/isabelle_process.ML
src/Pure/System/isabelle_system.ML
src/Pure/System/scala.ML
src/Pure/library.ML
--- a/src/Pure/General/bytes.ML	Tue Jun 21 16:03:00 2022 +0200
+++ b/src/Pure/General/bytes.ML	Tue Jun 21 22:17:11 2022 +0200
@@ -14,17 +14,24 @@
   type T
   val length: T -> int
   val contents: T -> string list
+  val contents_blob: T -> XML.body
   val content: T -> string
   val is_empty: T -> bool
   val empty: T
+  val build: (T -> T) -> T
   val add_substring: substring -> T -> T
   val add: string -> T -> T
+  val beginning: int -> T -> string
+  val exists_string: (string -> bool) -> T -> bool
+  val forall_string: (string -> bool) -> T -> bool
+  val last_string: T -> string option
+  val trim_line: T -> T
   val append: T -> T -> T
-  val build: (T -> T) -> T
   val string: string -> T
+  val newline: T
   val buffer: Buffer.T -> T
-  val read_chunk: BinIO.instream -> string
-  val read_stream: BinIO.instream -> T
+  val read_block: int -> BinIO.instream -> string
+  val read_stream: int -> BinIO.instream -> T
   val write_stream: BinIO.outstream -> T -> unit
   val read: Path.T -> T
   val write: Path.T -> T -> unit
@@ -48,12 +55,16 @@
 fun contents (Bytes {buffer, chunks, ...}) =
   rev (chunks |> not (null buffer) ? cons (compact buffer));
 
+val contents_blob = contents #> XML.blob;
+
 val content = implode o contents;
 
 fun is_empty bytes = length bytes = 0;
 
 val empty = Bytes {buffer = [], chunks = [], m = 0, n = 0};
 
+fun build (f: T -> T) = f empty;
+
 fun add_substring s (bytes as Bytes {buffer, chunks, m, n}) =
   if Substring.isEmpty s then bytes
   else
@@ -71,35 +82,76 @@
 
 val add = add_substring o Substring.full;
 
+fun exists_string pred (Bytes {buffer, chunks, ...}) =
+  let val ex = (exists o Library.exists_string) pred
+  in ex buffer orelse ex chunks end;
+
+fun forall_string pred = not o exists_string (not o pred);
+
+fun last_string (Bytes {buffer, chunks, ...}) =
+  (case buffer of
+    s :: _ => Library.last_string s
+  | [] =>
+      (case chunks of
+        s :: _ => Library.last_string s
+      | [] => NONE));
+
+fun trim_line (bytes as Bytes {buffer, chunks, ...}) =
+  let
+    val is_line =
+      (case last_string bytes of
+        SOME s => Symbol.is_ascii_line_terminator s
+      | NONE => false);
+  in
+    if is_line then
+      let
+        val (last_chunk, chunks') =
+          (case chunks of
+            [] => ("", [])
+          | c :: cs => (c, cs));
+        val trimed = Library.trim_line (last_chunk ^ compact buffer);
+      in build (fold_rev add chunks' #> add trimed) end
+    else bytes
+  end;
+
 end;
 
 
 (* derived operations *)
 
+fun beginning n bytes =
+  let
+    val dots = " ...";
+    val m = (String.maxSize - size dots) div chunk_size;
+    val a = implode (take m (contents bytes));
+    val b = String.substring (a, 0, Int.min (n, size a));
+  in if size b < length bytes then b ^ dots else b end;
+
 fun append bytes1 bytes2 =  (*left-associative*)
   if is_empty bytes1 then bytes2
   else if is_empty bytes2 then bytes1
   else bytes1 |> fold add (contents bytes2);
 
-fun build (f: T -> T) = f empty;
+val string = build o add;
 
-val string = build o add;
+val newline = string "\n";
 
 val buffer = build o fold add o Buffer.contents;
 
-val read_chunk = File.input_size chunk_size;
+fun read_block limit =
+  File.input_size (if limit < 0 then chunk_size else Int.min (chunk_size, limit));
 
-fun read_stream stream =
+fun read_stream limit stream =
   let
-    fun read_bytes bytes =
-      (case read_chunk stream of
+    fun read bytes =
+      (case read_block (limit - length bytes) stream of
         "" => bytes
-      | s => read_bytes (add s bytes))
-  in read_bytes empty end;
+      | s => read (add s bytes))
+  in read empty end;
 
 fun write_stream stream = File.outputs stream o contents;
 
-val read = File.open_input read_stream;
+val read = File.open_input (read_stream ~1);
 val write = File.open_output write_stream;
 
 
--- a/src/Pure/General/socket_io.ML	Tue Jun 21 16:03:00 2022 +0200
+++ b/src/Pure/General/socket_io.ML	Tue Jun 21 22:17:11 2022 +0200
@@ -96,6 +96,6 @@
 
 fun with_streams' f socket_name password =
   with_streams (fn streams =>
-    (Byte_Message.write_line (#2 streams) password; f streams)) socket_name;
+    (Byte_Message.write_line (#2 streams) (Bytes.string password); f streams)) socket_name;
 
 end;
--- a/src/Pure/PIDE/byte_message.ML	Tue Jun 21 16:03:00 2022 +0200
+++ b/src/Pure/PIDE/byte_message.ML	Tue Jun 21 22:17:11 2022 +0200
@@ -6,18 +6,20 @@
 
 signature BYTE_MESSAGE =
 sig
-  val write: BinIO.outstream -> string list -> unit
+  val write: BinIO.outstream -> Bytes.T list -> unit
   val write_yxml: BinIO.outstream -> XML.tree -> unit
   val flush: BinIO.outstream -> unit
-  val write_line: BinIO.outstream -> string -> unit
-  val read: BinIO.instream -> int -> string
-  val read_block: BinIO.instream -> int -> string option * int
-  val read_line: BinIO.instream -> string option
-  val write_message: BinIO.outstream -> string list -> unit
+  val write_line: BinIO.outstream -> Bytes.T -> unit
+  val read: BinIO.instream -> int -> Bytes.T
+  val read_block: BinIO.instream -> int -> Bytes.T option * int
+  val read_line: BinIO.instream -> Bytes.T option
+  val write_message: BinIO.outstream -> Bytes.T list -> unit
+  val write_message_string: BinIO.outstream -> string list -> unit
   val write_message_yxml: BinIO.outstream -> XML.body list -> unit
-  val read_message: BinIO.instream -> string list option
-  val write_line_message: BinIO.outstream -> string -> unit
-  val read_line_message: BinIO.instream -> string option
+  val read_message: BinIO.instream -> Bytes.T list option
+  val read_message_string: BinIO.instream -> string list option
+  val write_line_message: BinIO.outstream -> Bytes.T -> unit
+  val read_line_message: BinIO.instream -> Bytes.T option
 end;
 
 structure Byte_Message: BYTE_MESSAGE =
@@ -25,45 +27,48 @@
 
 (* output operations *)
 
-val write = File.outputs;
+val write = List.app o Bytes.write_stream;
 
 fun write_yxml stream tree = YXML.traverse (fn s => fn () => File.output stream s) tree ();
 
 fun flush stream = ignore (try BinIO.flushOut stream);
 
-fun write_line stream s = (write stream [s, "\n"]; flush stream);
+fun write_line stream bs = (write stream [bs, Bytes.newline]; flush stream);
 
 
 (* input operations *)
 
-fun read stream n = File.input_size n stream;
+fun read stream n = Bytes.read_stream n stream;
 
 fun read_block stream n =
   let
     val msg = read stream n;
-    val len = size msg;
+    val len = Bytes.length msg;
   in (if len = n then SOME msg else NONE, len) end;
 
 fun read_line stream =
   let
-    val result = trim_line o String.implode o rev;
-    fun read_body cs =
+    val result = SOME o Bytes.trim_line;
+    fun read_body bs =
       (case BinIO.input1 stream of
-        NONE => if null cs then NONE else SOME (result cs)
+        NONE => if Bytes.is_empty bs then NONE else result bs
       | SOME b =>
           (case Byte.byteToChar b of
-            #"\n" => SOME (result cs)
-          | c => read_body (c :: cs)));
-  in read_body [] end;
+            #"\n" => result bs
+          | c => read_body (Bytes.add (str c) bs)));
+  in read_body Bytes.empty end;
 
 
 (* messages with multiple chunks (arbitrary content) *)
 
 fun make_header ns =
-  [space_implode "," (map Value.print_int ns), "\n"];
+  [Bytes.string (space_implode "," (map Value.print_int ns)), Bytes.newline];
 
 fun write_message stream chunks =
-  (write stream (make_header (map size chunks) @ chunks); flush stream);
+  (write stream (make_header (map Bytes.length chunks) @ chunks); flush stream);
+
+fun write_message_string stream =
+  write_message stream o map Bytes.string;
 
 fun write_message_yxml stream chunks =
   (write stream (make_header (map YXML.body_size chunks));
@@ -82,26 +87,32 @@
         string_of_int len ^ " of " ^ string_of_int n ^ " bytes"));
 
 fun read_message stream =
-  read_line stream |> Option.map (parse_header #> map (read_chunk stream));
+  read_line stream |> Option.map (Bytes.content #> parse_header #> map (read_chunk stream));
+
+fun read_message_string stream =
+  read_message stream |> (Option.map o map) Bytes.content;
 
 
 (* hybrid messages: line or length+block (with content restriction) *)
 
+(* line message format *)
+
 fun is_length msg =
-  msg <> "" andalso forall_string Symbol.is_ascii_digit msg;
+  not (Bytes.is_empty msg) andalso Bytes.forall_string Symbol.is_ascii_digit msg;
 
 fun is_terminated msg =
-  let val len = size msg
-  in len > 0 andalso Symbol.is_ascii_line_terminator (str (String.sub (msg, len - 1))) end;
+  (case Bytes.last_string msg of
+    NONE => false
+  | SOME s => Symbol.is_ascii_line_terminator s);
 
 fun write_line_message stream msg =
   if is_length msg orelse is_terminated msg then
-    error ("Bad content for line message:\n" ^ implode (take 100 (Symbol.explode msg)))
+    error ("Bad content for line message:\n" ^ Bytes.beginning 100 msg)
   else
-    let val n = size msg in
+    let val n = Bytes.length msg in
       write stream
-        ((if n > 100 orelse exists_string (fn s => s = "\n") msg
-          then make_header [n + 1] else []) @ [msg, "\n"]);
+        ((if n > 100 orelse Bytes.exists_string (fn s => s = "\n") msg
+          then make_header [n + 1] else []) @ [msg, Bytes.newline]);
       flush stream
     end;
 
@@ -109,8 +120,8 @@
   (case read_line stream of
     NONE => NONE
   | SOME line =>
-      (case try Value.parse_nat line of
+      (case try (Value.parse_nat o Bytes.content) line of
         NONE => SOME line
-      | SOME n => Option.map trim_line (#1 (read_block stream n))));
+      | SOME n => Option.map Bytes.trim_line (#1 (read_block stream n))));
 
 end;
--- a/src/Pure/PIDE/protocol_command.ML	Tue Jun 21 16:03:00 2022 +0200
+++ b/src/Pure/PIDE/protocol_command.ML	Tue Jun 21 22:17:11 2022 +0200
@@ -8,8 +8,9 @@
 sig
   exception STOP of int
   val is_protocol_exn: exn -> bool
+  val define_bytes: string -> (Bytes.T list -> unit) -> unit
   val define: string -> (string list -> unit) -> unit
-  val run: string -> string list -> unit
+  val run: string -> Bytes.T list -> unit
 end;
 
 structure Protocol_Command: PROTOCOL_COMMAND =
@@ -23,16 +24,18 @@
 
 val commands =
   Synchronized.var "Protocol_Command.commands"
-    (Symtab.empty: (string list -> unit) Symtab.table);
+    (Symtab.empty: (Bytes.T list -> unit) Symtab.table);
 
 in
 
-fun define name cmd =
+fun define_bytes name cmd =
   Synchronized.change commands (fn cmds =>
    (if not (Symtab.defined cmds name) then ()
     else warning ("Redefining Isabelle protocol command " ^ quote name);
     Symtab.update (name, cmd) cmds));
 
+fun define name cmd = define_bytes name (map Bytes.content #> cmd);
+
 fun run name args =
   (case Symtab.lookup (Synchronized.value commands) name of
     NONE => error ("Undefined Isabelle protocol command " ^ quote name)
--- a/src/Pure/System/isabelle_process.ML	Tue Jun 21 16:03:00 2022 +0200
+++ b/src/Pure/System/isabelle_process.ML	Tue Jun 21 22:17:11 2022 +0200
@@ -96,7 +96,7 @@
     (* streams *)
 
     val (in_stream, out_stream) = Socket_IO.open_streams address;
-    val _ = Byte_Message.write_line out_stream password;
+    val _ = Byte_Message.write_line out_stream (Bytes.string password);
 
     val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF);
     val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF);
@@ -158,7 +158,7 @@
           (case Byte_Message.read_message in_stream of
             NONE => raise Protocol_Command.STOP 0
           | SOME [] => Output.system_message "Isabelle process: no input"
-          | SOME (name :: args) => Protocol_Command.run name args)
+          | SOME (name :: args) => Protocol_Command.run (Bytes.content name) args)
           handle exn =>
             if Protocol_Command.is_protocol_exn exn then Exn.reraise exn
             else (Runtime.exn_system_message exn handle crash => recover crash);
--- a/src/Pure/System/isabelle_system.ML	Tue Jun 21 16:03:00 2022 +0200
+++ b/src/Pure/System/isabelle_system.ML	Tue Jun 21 22:17:11 2022 +0200
@@ -58,14 +58,14 @@
     fun with_streams f = Socket_IO.with_streams' f address password;
 
     fun kill (SOME uuid) =
-          with_streams (fn s => Byte_Message.write_message (#2 s) [Bash.server_kill, uuid])
+          with_streams (fn s => Byte_Message.write_message_string (#2 s) [Bash.server_kill, uuid])
       | kill NONE = ();
   in
     Thread_Attributes.uninterruptible (fn restore_attributes => fn () =>
       let
         fun err () = raise Fail "Malformed result from bash_process server";
         fun loop maybe_uuid s =
-          (case restore_attributes Byte_Message.read_message (#1 s) of
+          (case restore_attributes Byte_Message.read_message_string (#1 s) of
             SOME (head :: args) =>
               if head = Bash.server_uuid andalso length args = 1 then
                 loop (SOME (hd args)) s
@@ -91,7 +91,7 @@
                else err ()
           | _ => err ())
           handle exn => (kill maybe_uuid; Exn.reraise exn);
-      in with_streams (fn s => (Byte_Message.write_message (#2 s) run; loop NONE s)) end) ()
+      in with_streams (fn s => (Byte_Message.write_message_string (#2 s) run; loop NONE s)) end) ()
   end;
 
 val bash = Bash.script #> bash_process #> Process_Result.print #> Process_Result.rc;
--- a/src/Pure/System/scala.ML	Tue Jun 21 16:03:00 2022 +0200
+++ b/src/Pure/System/scala.ML	Tue Jun 21 22:17:11 2022 +0200
@@ -7,6 +7,8 @@
 signature SCALA =
 sig
   exception Null
+  val function_bytes: string -> Bytes.T list -> Bytes.T list
+  val function1_bytes: string -> Bytes.T -> Bytes.T
   val function: string -> string list -> string list
   val function1: string -> string -> string
 end;
@@ -21,31 +23,31 @@
 val new_id = string_of_int o Counter.make ();
 
 val results =
-  Synchronized.var "Scala.results" (Symtab.empty: string list Exn.result Symtab.table);
+  Synchronized.var "Scala.results" (Symtab.empty: Bytes.T list Exn.result Symtab.table);
 
 val _ =
-  Protocol_Command.define "Scala.result"
-    (fn id :: args =>
+  Protocol_Command.define_bytes "Scala.result"
+    (fn id :: tag :: rest =>
       let
         val result =
-          (case args of
-            ["0"] => Exn.Exn Null
-          | "1" :: rest => Exn.Res rest
-          | ["2", msg] => Exn.Exn (ERROR msg)
-          | ["3", msg] => Exn.Exn (Fail msg)
-          | ["4"] => Exn.Exn Exn.Interrupt
+          (case (Bytes.content tag, rest) of
+            ("0", []) => Exn.Exn Null
+          | ("1", _) => Exn.Res rest
+          | ("2", [msg]) => Exn.Exn (ERROR (Bytes.content msg))
+          | ("3", [msg]) => Exn.Exn (Fail (Bytes.content msg))
+          | ("4", []) => Exn.Exn Exn.Interrupt
           | _ => raise Fail "Malformed Scala.result");
-      in Synchronized.change results (Symtab.map_entry id (K result)) end);
+      in Synchronized.change results (Symtab.map_entry (Bytes.content id) (K result)) end);
 
 in
 
-fun function name args =
+fun function_bytes name args =
   Thread_Attributes.uninterruptible (fn restore_attributes => fn () =>
     let
       val id = new_id ();
       fun invoke () =
        (Synchronized.change results (Symtab.update (id, Exn.Exn Match));
-        Output.protocol_message (Markup.invoke_scala name id) (map (single o XML.Text) args));
+        Output.protocol_message (Markup.invoke_scala name id) (map Bytes.contents_blob args));
       fun cancel () =
        (Synchronized.change results (Symtab.delete_safe id);
         Output.protocol_message (Markup.cancel_scala id) []);
@@ -62,6 +64,10 @@
         handle exn => (if Exn.is_interrupt exn then cancel () else (); Exn.reraise exn)
     end) ();
 
+val function1_bytes = singleton o function_bytes;
+
+fun function name = map Bytes.string #> function_bytes name #> map Bytes.content;
+
 val function1 = singleton o function;
 
 end;
--- a/src/Pure/library.ML	Tue Jun 21 16:03:00 2022 +0200
+++ b/src/Pure/library.ML	Tue Jun 21 22:17:11 2022 +0200
@@ -135,6 +135,7 @@
   val exists_string: (string -> bool) -> string -> bool
   val forall_string: (string -> bool) -> string -> bool
   val member_string: string -> string -> bool
+  val last_string: string -> string option
   val first_field: string -> string -> (string * string) option
   val enclose: string -> string -> string -> string
   val unenclose: string -> string
@@ -714,6 +715,9 @@
 
 fun member_string str s = exists_string (fn s' => s = s') str;
 
+fun last_string "" = NONE
+  | last_string s = SOME (str (String.sub (s, size s - 1)));
+
 fun first_field sep str =
   let
     val n = size sep;