# HG changeset patch # User wenzelm # Date 1655842631 -7200 # Node ID c51e1cef1eae48117e1129c0ced3ea60175243d7 # Parent 8c5eedb6c9839380e8981926935b0128d1849ec1 more scalable byte messages, notably for Scala functions in ML; diff -r 8c5eedb6c983 -r c51e1cef1eae src/Pure/General/bytes.ML --- a/src/Pure/General/bytes.ML Tue Jun 21 16:03:00 2022 +0200 +++ b/src/Pure/General/bytes.ML Tue Jun 21 22:17:11 2022 +0200 @@ -14,17 +14,24 @@ type T val length: T -> int val contents: T -> string list + val contents_blob: T -> XML.body val content: T -> string val is_empty: T -> bool val empty: T + val build: (T -> T) -> T val add_substring: substring -> T -> T val add: string -> T -> T + val beginning: int -> T -> string + val exists_string: (string -> bool) -> T -> bool + val forall_string: (string -> bool) -> T -> bool + val last_string: T -> string option + val trim_line: T -> T val append: T -> T -> T - val build: (T -> T) -> T val string: string -> T + val newline: T val buffer: Buffer.T -> T - val read_chunk: BinIO.instream -> string - val read_stream: BinIO.instream -> T + val read_block: int -> BinIO.instream -> string + val read_stream: int -> BinIO.instream -> T val write_stream: BinIO.outstream -> T -> unit val read: Path.T -> T val write: Path.T -> T -> unit @@ -48,12 +55,16 @@ fun contents (Bytes {buffer, chunks, ...}) = rev (chunks |> not (null buffer) ? cons (compact buffer)); +val contents_blob = contents #> XML.blob; + val content = implode o contents; fun is_empty bytes = length bytes = 0; val empty = Bytes {buffer = [], chunks = [], m = 0, n = 0}; +fun build (f: T -> T) = f empty; + fun add_substring s (bytes as Bytes {buffer, chunks, m, n}) = if Substring.isEmpty s then bytes else @@ -71,35 +82,76 @@ val add = add_substring o Substring.full; +fun exists_string pred (Bytes {buffer, chunks, ...}) = + let val ex = (exists o Library.exists_string) pred + in ex buffer orelse ex chunks end; + +fun forall_string pred = not o exists_string (not o pred); + +fun last_string (Bytes {buffer, chunks, ...}) = + (case buffer of + s :: _ => Library.last_string s + | [] => + (case chunks of + s :: _ => Library.last_string s + | [] => NONE)); + +fun trim_line (bytes as Bytes {buffer, chunks, ...}) = + let + val is_line = + (case last_string bytes of + SOME s => Symbol.is_ascii_line_terminator s + | NONE => false); + in + if is_line then + let + val (last_chunk, chunks') = + (case chunks of + [] => ("", []) + | c :: cs => (c, cs)); + val trimed = Library.trim_line (last_chunk ^ compact buffer); + in build (fold_rev add chunks' #> add trimed) end + else bytes + end; + end; (* derived operations *) +fun beginning n bytes = + let + val dots = " ..."; + val m = (String.maxSize - size dots) div chunk_size; + val a = implode (take m (contents bytes)); + val b = String.substring (a, 0, Int.min (n, size a)); + in if size b < length bytes then b ^ dots else b end; + fun append bytes1 bytes2 = (*left-associative*) if is_empty bytes1 then bytes2 else if is_empty bytes2 then bytes1 else bytes1 |> fold add (contents bytes2); -fun build (f: T -> T) = f empty; +val string = build o add; -val string = build o add; +val newline = string "\n"; val buffer = build o fold add o Buffer.contents; -val read_chunk = File.input_size chunk_size; +fun read_block limit = + File.input_size (if limit < 0 then chunk_size else Int.min (chunk_size, limit)); -fun read_stream stream = +fun read_stream limit stream = let - fun read_bytes bytes = - (case read_chunk stream of + fun read bytes = + (case read_block (limit - length bytes) stream of "" => bytes - | s => read_bytes (add s bytes)) - in read_bytes empty end; + | s => read (add s bytes)) + in read empty end; fun write_stream stream = File.outputs stream o contents; -val read = File.open_input read_stream; +val read = File.open_input (read_stream ~1); val write = File.open_output write_stream; diff -r 8c5eedb6c983 -r c51e1cef1eae src/Pure/General/socket_io.ML --- a/src/Pure/General/socket_io.ML Tue Jun 21 16:03:00 2022 +0200 +++ b/src/Pure/General/socket_io.ML Tue Jun 21 22:17:11 2022 +0200 @@ -96,6 +96,6 @@ fun with_streams' f socket_name password = with_streams (fn streams => - (Byte_Message.write_line (#2 streams) password; f streams)) socket_name; + (Byte_Message.write_line (#2 streams) (Bytes.string password); f streams)) socket_name; end; diff -r 8c5eedb6c983 -r c51e1cef1eae src/Pure/PIDE/byte_message.ML --- a/src/Pure/PIDE/byte_message.ML Tue Jun 21 16:03:00 2022 +0200 +++ b/src/Pure/PIDE/byte_message.ML Tue Jun 21 22:17:11 2022 +0200 @@ -6,18 +6,20 @@ signature BYTE_MESSAGE = sig - val write: BinIO.outstream -> string list -> unit + val write: BinIO.outstream -> Bytes.T list -> unit val write_yxml: BinIO.outstream -> XML.tree -> unit val flush: BinIO.outstream -> unit - val write_line: BinIO.outstream -> string -> unit - val read: BinIO.instream -> int -> string - val read_block: BinIO.instream -> int -> string option * int - val read_line: BinIO.instream -> string option - val write_message: BinIO.outstream -> string list -> unit + val write_line: BinIO.outstream -> Bytes.T -> unit + val read: BinIO.instream -> int -> Bytes.T + val read_block: BinIO.instream -> int -> Bytes.T option * int + val read_line: BinIO.instream -> Bytes.T option + val write_message: BinIO.outstream -> Bytes.T list -> unit + val write_message_string: BinIO.outstream -> string list -> unit val write_message_yxml: BinIO.outstream -> XML.body list -> unit - val read_message: BinIO.instream -> string list option - val write_line_message: BinIO.outstream -> string -> unit - val read_line_message: BinIO.instream -> string option + val read_message: BinIO.instream -> Bytes.T list option + val read_message_string: BinIO.instream -> string list option + val write_line_message: BinIO.outstream -> Bytes.T -> unit + val read_line_message: BinIO.instream -> Bytes.T option end; structure Byte_Message: BYTE_MESSAGE = @@ -25,45 +27,48 @@ (* output operations *) -val write = File.outputs; +val write = List.app o Bytes.write_stream; fun write_yxml stream tree = YXML.traverse (fn s => fn () => File.output stream s) tree (); fun flush stream = ignore (try BinIO.flushOut stream); -fun write_line stream s = (write stream [s, "\n"]; flush stream); +fun write_line stream bs = (write stream [bs, Bytes.newline]; flush stream); (* input operations *) -fun read stream n = File.input_size n stream; +fun read stream n = Bytes.read_stream n stream; fun read_block stream n = let val msg = read stream n; - val len = size msg; + val len = Bytes.length msg; in (if len = n then SOME msg else NONE, len) end; fun read_line stream = let - val result = trim_line o String.implode o rev; - fun read_body cs = + val result = SOME o Bytes.trim_line; + fun read_body bs = (case BinIO.input1 stream of - NONE => if null cs then NONE else SOME (result cs) + NONE => if Bytes.is_empty bs then NONE else result bs | SOME b => (case Byte.byteToChar b of - #"\n" => SOME (result cs) - | c => read_body (c :: cs))); - in read_body [] end; + #"\n" => result bs + | c => read_body (Bytes.add (str c) bs))); + in read_body Bytes.empty end; (* messages with multiple chunks (arbitrary content) *) fun make_header ns = - [space_implode "," (map Value.print_int ns), "\n"]; + [Bytes.string (space_implode "," (map Value.print_int ns)), Bytes.newline]; fun write_message stream chunks = - (write stream (make_header (map size chunks) @ chunks); flush stream); + (write stream (make_header (map Bytes.length chunks) @ chunks); flush stream); + +fun write_message_string stream = + write_message stream o map Bytes.string; fun write_message_yxml stream chunks = (write stream (make_header (map YXML.body_size chunks)); @@ -82,26 +87,32 @@ string_of_int len ^ " of " ^ string_of_int n ^ " bytes")); fun read_message stream = - read_line stream |> Option.map (parse_header #> map (read_chunk stream)); + read_line stream |> Option.map (Bytes.content #> parse_header #> map (read_chunk stream)); + +fun read_message_string stream = + read_message stream |> (Option.map o map) Bytes.content; (* hybrid messages: line or length+block (with content restriction) *) +(* line message format *) + fun is_length msg = - msg <> "" andalso forall_string Symbol.is_ascii_digit msg; + not (Bytes.is_empty msg) andalso Bytes.forall_string Symbol.is_ascii_digit msg; fun is_terminated msg = - let val len = size msg - in len > 0 andalso Symbol.is_ascii_line_terminator (str (String.sub (msg, len - 1))) end; + (case Bytes.last_string msg of + NONE => false + | SOME s => Symbol.is_ascii_line_terminator s); fun write_line_message stream msg = if is_length msg orelse is_terminated msg then - error ("Bad content for line message:\n" ^ implode (take 100 (Symbol.explode msg))) + error ("Bad content for line message:\n" ^ Bytes.beginning 100 msg) else - let val n = size msg in + let val n = Bytes.length msg in write stream - ((if n > 100 orelse exists_string (fn s => s = "\n") msg - then make_header [n + 1] else []) @ [msg, "\n"]); + ((if n > 100 orelse Bytes.exists_string (fn s => s = "\n") msg + then make_header [n + 1] else []) @ [msg, Bytes.newline]); flush stream end; @@ -109,8 +120,8 @@ (case read_line stream of NONE => NONE | SOME line => - (case try Value.parse_nat line of + (case try (Value.parse_nat o Bytes.content) line of NONE => SOME line - | SOME n => Option.map trim_line (#1 (read_block stream n)))); + | SOME n => Option.map Bytes.trim_line (#1 (read_block stream n)))); end; diff -r 8c5eedb6c983 -r c51e1cef1eae src/Pure/PIDE/protocol_command.ML --- a/src/Pure/PIDE/protocol_command.ML Tue Jun 21 16:03:00 2022 +0200 +++ b/src/Pure/PIDE/protocol_command.ML Tue Jun 21 22:17:11 2022 +0200 @@ -8,8 +8,9 @@ sig exception STOP of int val is_protocol_exn: exn -> bool + val define_bytes: string -> (Bytes.T list -> unit) -> unit val define: string -> (string list -> unit) -> unit - val run: string -> string list -> unit + val run: string -> Bytes.T list -> unit end; structure Protocol_Command: PROTOCOL_COMMAND = @@ -23,16 +24,18 @@ val commands = Synchronized.var "Protocol_Command.commands" - (Symtab.empty: (string list -> unit) Symtab.table); + (Symtab.empty: (Bytes.T list -> unit) Symtab.table); in -fun define name cmd = +fun define_bytes name cmd = Synchronized.change commands (fn cmds => (if not (Symtab.defined cmds name) then () else warning ("Redefining Isabelle protocol command " ^ quote name); Symtab.update (name, cmd) cmds)); +fun define name cmd = define_bytes name (map Bytes.content #> cmd); + fun run name args = (case Symtab.lookup (Synchronized.value commands) name of NONE => error ("Undefined Isabelle protocol command " ^ quote name) diff -r 8c5eedb6c983 -r c51e1cef1eae src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Tue Jun 21 16:03:00 2022 +0200 +++ b/src/Pure/System/isabelle_process.ML Tue Jun 21 22:17:11 2022 +0200 @@ -96,7 +96,7 @@ (* streams *) val (in_stream, out_stream) = Socket_IO.open_streams address; - val _ = Byte_Message.write_line out_stream password; + val _ = Byte_Message.write_line out_stream (Bytes.string password); val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF); val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF); @@ -158,7 +158,7 @@ (case Byte_Message.read_message in_stream of NONE => raise Protocol_Command.STOP 0 | SOME [] => Output.system_message "Isabelle process: no input" - | SOME (name :: args) => Protocol_Command.run name args) + | SOME (name :: args) => Protocol_Command.run (Bytes.content name) args) handle exn => if Protocol_Command.is_protocol_exn exn then Exn.reraise exn else (Runtime.exn_system_message exn handle crash => recover crash); diff -r 8c5eedb6c983 -r c51e1cef1eae src/Pure/System/isabelle_system.ML --- a/src/Pure/System/isabelle_system.ML Tue Jun 21 16:03:00 2022 +0200 +++ b/src/Pure/System/isabelle_system.ML Tue Jun 21 22:17:11 2022 +0200 @@ -58,14 +58,14 @@ fun with_streams f = Socket_IO.with_streams' f address password; fun kill (SOME uuid) = - with_streams (fn s => Byte_Message.write_message (#2 s) [Bash.server_kill, uuid]) + with_streams (fn s => Byte_Message.write_message_string (#2 s) [Bash.server_kill, uuid]) | kill NONE = (); in Thread_Attributes.uninterruptible (fn restore_attributes => fn () => let fun err () = raise Fail "Malformed result from bash_process server"; fun loop maybe_uuid s = - (case restore_attributes Byte_Message.read_message (#1 s) of + (case restore_attributes Byte_Message.read_message_string (#1 s) of SOME (head :: args) => if head = Bash.server_uuid andalso length args = 1 then loop (SOME (hd args)) s @@ -91,7 +91,7 @@ else err () | _ => err ()) handle exn => (kill maybe_uuid; Exn.reraise exn); - in with_streams (fn s => (Byte_Message.write_message (#2 s) run; loop NONE s)) end) () + in with_streams (fn s => (Byte_Message.write_message_string (#2 s) run; loop NONE s)) end) () end; val bash = Bash.script #> bash_process #> Process_Result.print #> Process_Result.rc; diff -r 8c5eedb6c983 -r c51e1cef1eae src/Pure/System/scala.ML --- a/src/Pure/System/scala.ML Tue Jun 21 16:03:00 2022 +0200 +++ b/src/Pure/System/scala.ML Tue Jun 21 22:17:11 2022 +0200 @@ -7,6 +7,8 @@ signature SCALA = sig exception Null + val function_bytes: string -> Bytes.T list -> Bytes.T list + val function1_bytes: string -> Bytes.T -> Bytes.T val function: string -> string list -> string list val function1: string -> string -> string end; @@ -21,31 +23,31 @@ val new_id = string_of_int o Counter.make (); val results = - Synchronized.var "Scala.results" (Symtab.empty: string list Exn.result Symtab.table); + Synchronized.var "Scala.results" (Symtab.empty: Bytes.T list Exn.result Symtab.table); val _ = - Protocol_Command.define "Scala.result" - (fn id :: args => + Protocol_Command.define_bytes "Scala.result" + (fn id :: tag :: rest => let val result = - (case args of - ["0"] => Exn.Exn Null - | "1" :: rest => Exn.Res rest - | ["2", msg] => Exn.Exn (ERROR msg) - | ["3", msg] => Exn.Exn (Fail msg) - | ["4"] => Exn.Exn Exn.Interrupt + (case (Bytes.content tag, rest) of + ("0", []) => Exn.Exn Null + | ("1", _) => Exn.Res rest + | ("2", [msg]) => Exn.Exn (ERROR (Bytes.content msg)) + | ("3", [msg]) => Exn.Exn (Fail (Bytes.content msg)) + | ("4", []) => Exn.Exn Exn.Interrupt | _ => raise Fail "Malformed Scala.result"); - in Synchronized.change results (Symtab.map_entry id (K result)) end); + in Synchronized.change results (Symtab.map_entry (Bytes.content id) (K result)) end); in -fun function name args = +fun function_bytes name args = Thread_Attributes.uninterruptible (fn restore_attributes => fn () => let val id = new_id (); fun invoke () = (Synchronized.change results (Symtab.update (id, Exn.Exn Match)); - Output.protocol_message (Markup.invoke_scala name id) (map (single o XML.Text) args)); + Output.protocol_message (Markup.invoke_scala name id) (map Bytes.contents_blob args)); fun cancel () = (Synchronized.change results (Symtab.delete_safe id); Output.protocol_message (Markup.cancel_scala id) []); @@ -62,6 +64,10 @@ handle exn => (if Exn.is_interrupt exn then cancel () else (); Exn.reraise exn) end) (); +val function1_bytes = singleton o function_bytes; + +fun function name = map Bytes.string #> function_bytes name #> map Bytes.content; + val function1 = singleton o function; end; diff -r 8c5eedb6c983 -r c51e1cef1eae src/Pure/library.ML --- a/src/Pure/library.ML Tue Jun 21 16:03:00 2022 +0200 +++ b/src/Pure/library.ML Tue Jun 21 22:17:11 2022 +0200 @@ -135,6 +135,7 @@ val exists_string: (string -> bool) -> string -> bool val forall_string: (string -> bool) -> string -> bool val member_string: string -> string -> bool + val last_string: string -> string option val first_field: string -> string -> (string * string) option val enclose: string -> string -> string -> string val unenclose: string -> string @@ -714,6 +715,9 @@ fun member_string str s = exists_string (fn s' => s = s') str; +fun last_string "" = NONE + | last_string s = SOME (str (String.sub (s, size s - 1))); + fun first_field sep str = let val n = size sep;