# HG changeset patch # User wenzelm # Date 1544559782 -3600 # Node ID b516fdf8005ca6cc726193e05dd2e7a1a73d8a82 # Parent 51e696887b813c4cb626fc94d752e128126106be more uniform multi-language operations; clarified modules and signature; diff -r 51e696887b81 -r b516fdf8005c src/Pure/PIDE/byte_message.ML --- a/src/Pure/PIDE/byte_message.ML Tue Dec 11 19:25:35 2018 +0100 +++ b/src/Pure/PIDE/byte_message.ML Tue Dec 11 21:23:02 2018 +0100 @@ -7,12 +7,20 @@ signature BYTE_MESSAGE = sig val read_line: BinIO.instream -> string option - val read_block: BinIO.instream -> int -> string + val read: BinIO.instream -> int -> string + val read_block: BinIO.instream -> int -> string option + val read_message: BinIO.instream -> string list option end; structure Byte_Message: BYTE_MESSAGE = struct +fun read stream n = Byte.bytesToString (BinIO.inputN (stream, n)); + +fun read_block stream n = + let val msg = read stream n + in if size msg = n then SOME msg else NONE end; + fun read_line stream = let val result = trim_line o String.implode o rev; @@ -25,7 +33,22 @@ | c => read (c :: cs))); in read [] end; -fun read_block stream n = - Byte.bytesToString (BinIO.inputN (stream, n)); + +(* messages with multiple chunks (arbitrary content) *) + +fun read_chunk stream n = + let val (len, chunk) = `size (read stream n) in + if len = n then chunk + else + error ("Malformed message chunk: unexpected EOF after " ^ + string_of_int len ^ " of " ^ string_of_int n ^ " bytes") + end; + +fun read_message stream = + read_line stream |> Option.map (fn line => + let + val ns = map Value.parse_nat (space_explode "," line) + handle Fail _ => error ("Malformed message header: " ^ quote line); + in map (read_chunk stream) ns end); end; diff -r 51e696887b81 -r b516fdf8005c src/Pure/PIDE/byte_message.scala --- a/src/Pure/PIDE/byte_message.scala Tue Dec 11 19:25:35 2018 +0100 +++ b/src/Pure/PIDE/byte_message.scala Tue Dec 11 21:23:02 2018 +0100 @@ -11,6 +11,15 @@ object Byte_Message { + def read(stream: InputStream, length: Int): Bytes = + Bytes.read_stream(stream, limit = length) + + def read_block(stream: InputStream, length: Int): Option[Bytes] = + { + val msg = read(stream, length) + if (msg.length == length) Some(msg) else None + } + def read_line(stream: InputStream): Option[Bytes] = { val line = new ByteArrayOutputStream(100) @@ -26,14 +35,8 @@ } } - def read_block(stream: InputStream, length: Int): Option[Bytes] = - { - val msg = Bytes.read_stream(stream, limit = length) - if (msg.length == length) Some(msg) else None - } - - /* hybrid messages: line or length+block, with content restriction */ + /* hybrid messages: line or length+block (with content restriction) */ private def is_length(msg: Bytes): Boolean = !msg.is_empty && msg.iterator.forall(b => Symbol.is_ascii_digit(b.toChar)) diff -r 51e696887b81 -r b516fdf8005c src/Pure/ROOT.ML --- a/src/Pure/ROOT.ML Tue Dec 11 19:25:35 2018 +0100 +++ b/src/Pure/ROOT.ML Tue Dec 11 21:23:02 2018 +0100 @@ -318,7 +318,6 @@ subsection "Isabelle/Isar system"; ML_file "System/command_line.ML"; -ML_file "System/system_channel.ML"; ML_file "System/message_channel.ML"; ML_file "System/isabelle_process.ML"; ML_file "System/invoke_scala.ML"; diff -r 51e696887b81 -r b516fdf8005c src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Tue Dec 11 19:25:35 2018 +0100 +++ b/src/Pure/System/isabelle_process.ML Tue Dec 11 21:23:02 2018 +0100 @@ -95,12 +95,13 @@ val serial_props = Markup.serial_properties o serial; -fun init_channels channel = +fun init_channels out_stream = let val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF); val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF); + val _ = BinIO.StreamIO.setBufferMode (BinIO.getOutstream out_stream, IO.BLOCK_BUF); - val msg_channel = Message_Channel.make channel; + val msg_channel = Message_Channel.make out_stream; fun message name props body = Message_Channel.send msg_channel (Message_Channel.message name props body); @@ -149,37 +150,18 @@ Output.physical_stderr "Recovered from Isabelle process crash -- see also Isabelle_Process.crashes\n"); -fun read_chunk channel len = - let - val n = - (case Int.fromString len of - SOME n => n - | NONE => error ("Isabelle process: malformed header " ^ quote len)); - val chunk = System_Channel.inputN channel n; - val i = size chunk; - in - if i <> n then - error ("Isabelle process: bad chunk (unexpected EOF after " ^ - string_of_int i ^ " of " ^ string_of_int n ^ " bytes)") - else chunk - end; - -fun read_command channel = - System_Channel.input_line channel - |> Option.map (fn line => map (read_chunk channel) (space_explode "," line)); - in -fun loop channel = +fun loop stream = let val continue = - (case read_command channel of + (case Byte_Message.read_message stream of NONE => false | SOME [] => (Output.system_message "Isabelle process: no input"; true) | SOME (name :: args) => (run_command name args; true)) handle exn => (Runtime.exn_system_message exn handle crash => recover crash; true); in - if continue then loop channel + if continue then loop stream else (Future.shutdown (); Execution.reset (); ()) end; @@ -202,9 +184,9 @@ Unsynchronized.change print_mode (fn mode => (mode @ default_modes1) |> fold (update op =) default_modes2); - val channel = System_Channel.rendezvous socket; - val msg_channel = init_channels channel; - val _ = loop channel; + val (in_stream, out_stream) = Socket_IO.open_streams socket; + val msg_channel = init_channels out_stream; + val _ = loop in_stream; val _ = Message_Channel.shutdown msg_channel; val _ = Private_Output.init_channels (); diff -r 51e696887b81 -r b516fdf8005c src/Pure/System/message_channel.ML --- a/src/Pure/System/message_channel.ML Tue Dec 11 19:25:35 2018 +0100 +++ b/src/Pure/System/message_channel.ML Tue Dec 11 21:23:02 2018 +0100 @@ -11,7 +11,7 @@ type T val send: T -> message -> unit val shutdown: T -> unit - val make: System_Channel.T -> T + val make: BinIO.outstream -> T end; structure Message_Channel: MESSAGE_CHANNEL = @@ -30,8 +30,8 @@ val header = YXML.string_of (XML.Elem ((name, robust_props), [])); in Message (chunk [header] @ chunk body) end; -fun output_message channel (Message ss) = - List.app (System_Channel.output channel) ss; +fun output_message stream (Message ss) = + List.app (File.output stream) ss; (* channel *) @@ -41,26 +41,26 @@ fun send (Message_Channel {send, ...}) = send; fun shutdown (Message_Channel {shutdown, ...}) = shutdown (); -fun flush channel = ignore (try System_Channel.flush channel); +fun flush stream = ignore (try BinIO.flushOut stream); val flush_timeout = SOME (seconds 0.02); -fun message_output mbox channel = +fun message_output mbox stream = let fun continue timeout = (case Mailbox.receive timeout mbox of - [] => (flush channel; continue NONE) + [] => (flush stream; continue NONE) | msgs => received timeout msgs) - and received _ (NONE :: _) = flush channel - | received _ (SOME msg :: rest) = (output_message channel msg; received flush_timeout rest) + and received _ (NONE :: _) = flush stream + | received _ (SOME msg :: rest) = (output_message stream msg; received flush_timeout rest) | received timeout [] = continue timeout; in fn () => continue NONE end; -fun make channel = +fun make stream = let val mbox = Mailbox.create (); val thread = Standard_Thread.fork {name = "channel", stack_limit = NONE, interrupts = false} - (message_output mbox channel); + (message_output mbox stream); fun send msg = Mailbox.send mbox (SOME msg); fun shutdown () = (Mailbox.send mbox NONE; Mailbox.await_empty mbox; Standard_Thread.join thread); diff -r 51e696887b81 -r b516fdf8005c src/Pure/System/system_channel.ML --- a/src/Pure/System/system_channel.ML Tue Dec 11 19:25:35 2018 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,34 +0,0 @@ -(* Title: Pure/System/system_channel.ML - Author: Makarius - -Socket-based system channel for inter-process communication. -*) - -signature SYSTEM_CHANNEL = -sig - type T - val input_line: T -> string option - val inputN: T -> int -> string - val output: T -> string -> unit - val flush: T -> unit - val rendezvous: string -> T -end; - -structure System_Channel: SYSTEM_CHANNEL = -struct - -datatype T = System_Channel of BinIO.instream * BinIO.outstream; - -fun input_line (System_Channel (stream, _)) = Byte_Message.read_line stream; -fun inputN (System_Channel (stream, _)) n = Byte_Message.read_block stream n; - -fun output (System_Channel (_, stream)) s = File.output stream s; -fun flush (System_Channel (_, stream)) = BinIO.flushOut stream; - -fun rendezvous name = - let - val (in_stream, out_stream) = Socket_IO.open_streams name; - val _ = BinIO.StreamIO.setBufferMode (BinIO.getOutstream out_stream, IO.BLOCK_BUF); - in System_Channel (in_stream, out_stream) end; - -end; diff -r 51e696887b81 -r b516fdf8005c src/Tools/Haskell/Haskell.thy --- a/src/Tools/Haskell/Haskell.thy Tue Dec 11 19:25:35 2018 +0100 +++ b/src/Tools/Haskell/Haskell.thy Tue Dec 11 21:23:02 2018 +0100 @@ -1379,9 +1379,12 @@ and \<^file>\$ISABELLE_HOME/src/Pure/PIDE/byte_message.scala\. -} -module Isabelle.Byte_Message (read_line, read_block, trim_line, read_line_message, write_line_message) +module Isabelle.Byte_Message + (read, read_block, read_line, trim_line, + read_line_message, write_line_message) where +import Prelude hiding (read) import Data.ByteString (ByteString) import qualified Data.ByteString as ByteString import qualified Data.ByteString.UTF8 as UTF8 @@ -1395,6 +1398,27 @@ import qualified Isabelle.Value as Value +read :: Socket -> Int -> IO ByteString +read socket n = read_bytes 0 [] + where + result :: [ByteString] -> ByteString + result = ByteString.concat . reverse + + read_bytes :: Int -> [ByteString] -> IO ByteString + read_bytes len ss = + if len >= n then return (result ss) + else + (do + s <- ByteString.recv socket (min (n - len) 8192) + case ByteString.length s of + 0 -> return (result ss) + m -> read_bytes (len + m) (s : ss)) + +read_block :: Socket -> Int -> IO (Maybe ByteString) +read_block socket n = do + s <- read socket n + return (if ByteString.length s == n then Just s else Nothing) + read_line :: Socket -> IO (Maybe ByteString) read_line socket = read [] where @@ -1413,24 +1437,6 @@ 10 -> return (Just (result bs)) b -> read (b : bs) -read_block :: Socket -> Int -> IO (Maybe ByteString) -read_block socket n = read 0 [] - where - result :: [ByteString] -> Maybe ByteString - result ss = - if ByteString.length s == n then Just s else Nothing - where s = ByteString.concat (reverse ss) - - read :: Int -> [ByteString] -> IO (Maybe ByteString) - read len ss = - if len >= n then return (result ss) - else - (do - s <- ByteString.recv socket (min (n - len) 8192) - case ByteString.length s of - 0 -> return (result ss) - m -> read (len + m) (s : ss)) - trim_line :: ByteString -> ByteString trim_line s = if n >= 2 && at (n - 2) == 13 && at (n - 1) == 10 then ByteString.take (n - 2) s @@ -1441,7 +1447,6 @@ at = ByteString.index s - -- hybrid messages: line or length+block (with content restriction) is_length :: ByteString -> Bool