# HG changeset patch # User wenzelm # Date 1316716388 -7200 # Node ID 63144ea111f791ba28a93e48d5e25d887c5ea589 # Parent d608dd8cd409a0397706b8254354874a749c8f7e abstract System_Channel in ML (cf. Scala version); back to TextIO for fifo, which is more stable in Poly/ML 5.4.x; explicit block buffering -- BinIO might be subject to old Poly/ML defaults; diff -r d608dd8cd409 -r 63144ea111f7 src/Pure/IsaMakefile --- a/src/Pure/IsaMakefile Wed Sep 21 22:18:17 2011 +0200 +++ b/src/Pure/IsaMakefile Thu Sep 22 20:33:08 2011 +0200 @@ -195,6 +195,7 @@ System/isabelle_system.ML \ System/isar.ML \ System/session.ML \ + System/system_channel.ML \ Thy/html.ML \ Thy/latex.ML \ Thy/present.ML \ diff -r d608dd8cd409 -r 63144ea111f7 src/Pure/ROOT.ML --- a/src/Pure/ROOT.ML Wed Sep 21 22:18:17 2011 +0200 +++ b/src/Pure/ROOT.ML Thu Sep 22 20:33:08 2011 +0200 @@ -267,6 +267,7 @@ (* Isabelle/Isar system *) use "System/session.ML"; +use "System/system_channel.ML"; use "System/isabelle_process.ML"; use "System/invoke_scala.ML"; use "PIDE/isar_document.ML"; diff -r d608dd8cd409 -r 63144ea111f7 src/Pure/System/isabelle_process.ML --- a/src/Pure/System/isabelle_process.ML Wed Sep 21 22:18:17 2011 +0200 +++ b/src/Pure/System/isabelle_process.ML Thu Sep 22 20:33:08 2011 +0200 @@ -9,7 +9,7 @@ . stdout \002: ML running .. stdin/stdout/stderr freely available (raw ML loop) .. protocol thread initialization - ... switch to in_fifo/out_fifo channels (rendezvous via open) + ... rendezvous on system channel ... out_fifo INIT(pid): channels ready ... out_fifo STATUS(keywords) ... out_fifo READY: main loop ready @@ -21,8 +21,8 @@ val add_command: string -> (string list -> unit) -> unit val command: string -> string list -> unit val crashes: exn list Synchronized.var + val init_fifos: string -> string -> unit val init_socket: string -> unit - val init_fifos: string -> string -> unit end; structure Isabelle_Process: ISABELLE_PROCESS = @@ -80,13 +80,13 @@ ((case opt_serial of SOME i => cons (Markup.serialN, string_of_int i) | _ => I) (Position.properties_of (Position.thread_data ()))) body; -fun message_output mbox out_stream = +fun message_output mbox channel = let - fun flush () = ignore (try BinIO.flushOut out_stream); + fun flush () = ignore (try System_Channel.flush channel); fun loop receive = (case receive mbox of SOME (msg, do_flush) => - (List.app (fn s => BinIO.output (out_stream, Byte.stringToBytes s)) msg; + (List.app (fn s => System_Channel.output channel s) msg; if do_flush then flush () else (); loop (Mailbox.receive_timeout (seconds 0.02))) | NONE => (flush (); loop (SOME o Mailbox.receive))); @@ -94,13 +94,13 @@ in -fun setup_channels out_stream = +fun setup_channels channel = let val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF); val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF); val mbox = Mailbox.create () : (string list * bool) Mailbox.T; - val _ = Simple_Thread.fork false (message_output mbox out_stream); + val _ = Simple_Thread.fork false (message_output mbox channel); in Output.Private_Hooks.status_fn := standard_message mbox NONE "B"; Output.Private_Hooks.report_fn := standard_message mbox NONE "C"; @@ -127,35 +127,23 @@ (Synchronized.change crashes (cons crash); warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes"); -fun read_line stream = - let - val content = String.implode o rev; - fun read cs = - (case BinIO.input1 stream of - NONE => (content cs, null cs) - | SOME b => - (case Byte.byteToChar b of - #"\n" => (content cs, false) - | c => read (c :: cs))); - in case read [] of ("", true) => NONE | (s, _) => SOME s end; - -fun read_chunk stream len = +fun read_chunk channel len = let val n = (case Int.fromString len of SOME n => n | NONE => error ("Isabelle process: malformed chunk header " ^ quote len)); - val chunk = Byte.bytesToString (BinIO.inputN (stream, n)); + val chunk = System_Channel.inputN channel n; val m = size chunk; in if m = n then chunk else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")") end; -fun read_command stream = - (case read_line stream of +fun read_command channel = + (case System_Channel.input_line channel of NONE => raise Runtime.TERMINATE - | SOME line => map (read_chunk stream) (space_explode "," line)); + | SOME line => map (read_chunk channel) (space_explode "," line)); fun run_command name args = Runtime.debugging (command name) args @@ -164,21 +152,21 @@ in -fun loop stream = +fun loop channel = let val continue = - (case read_command stream of + (case read_command channel of [] => (Output.error_msg "Isabelle process: no input"; true) | name :: args => (run_command name args; true)) handle Runtime.TERMINATE => false | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true); - in if continue then loop stream else () end; + in if continue then loop channel else () end; end; (* init *) -fun init make_streams = ignore (Simple_Thread.fork false (fn () => +fun init rendezvous = ignore (Simple_Thread.fork false (fn () => let val _ = OS.Process.sleep (seconds 0.5); (*yield to raw ML toplevel*) val _ = Output.physical_stdout Symbol.STX; @@ -194,18 +182,16 @@ (fold (update op =) [Symbol.xsymbolsN, isabelle_processN, Keyword.keyword_statusN, Pretty.symbolicN]); - val (in_stream, out_stream) = make_streams (); - val _ = setup_channels out_stream; + val channel = rendezvous (); + val _ = setup_channels channel; val _ = Keyword.status (); val _ = Thy_Info.status (); val _ = Output.status (Markup.markup Markup.ready "process ready"); - in loop in_stream end)); + in loop channel end)); -fun rendezvous fifo1 fifo2 = (BinIO.openIn fifo1, BinIO.openOut fifo2); -fun init_fifos fifo1 fifo2 = init (fn () => rendezvous fifo1 fifo2); - -fun init_socket socket_name = init (fn () => Socket_IO.open_streams socket_name); +fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2); +fun init_socket name = init (fn () => System_Channel.socket_rendezvous name); end; diff -r d608dd8cd409 -r 63144ea111f7 src/Pure/System/system_channel.ML --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Pure/System/system_channel.ML Thu Sep 22 20:33:08 2011 +0200 @@ -0,0 +1,77 @@ +(* Title: Pure/System/system_channel.ML + Author: Makarius + +Portable system channel for inter-process communication, based on +named pipes or sockets. +*) + +signature SYSTEM_CHANNEL = +sig + type T + val input_line: T -> string option + val inputN: T -> int -> string + val output: T -> string -> unit + val flush: T -> unit + val fifo_rendezvous: string -> string -> T + val socket_rendezvous: string -> T +end; + +structure System_Channel: SYSTEM_CHANNEL = +struct + +datatype T = System_Channel of + {input_line: unit -> string option, + inputN: int -> string, + output: string -> unit, + flush: unit -> unit}; + +fun input_line (System_Channel {input_line = f, ...}) = f (); +fun inputN (System_Channel {inputN = f, ...}) n = f n; +fun output (System_Channel {output = f, ...}) s = f s; +fun flush (System_Channel {flush = f, ...}) = f (); + + +(* named pipes *) + +fun fifo_rendezvous fifo1 fifo2 = + let + val in_stream = TextIO.openIn fifo1; + val out_stream = TextIO.openOut fifo2; + val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream out_stream, IO.BLOCK_BUF); + in + System_Channel + {input_line = fn () => TextIO.inputLine in_stream, + inputN = fn n => TextIO.inputN (in_stream, n), + output = fn s => TextIO.output (out_stream, s), + flush = fn () => TextIO.flushOut out_stream} + end; + + +(* sockets *) + +fun read_line in_stream = + let + fun result cs = String.implode (rev (#"\n" :: cs)); + fun read cs = + (case BinIO.input1 in_stream of + NONE => if null cs then NONE else SOME (result cs) + | SOME b => + (case Byte.byteToChar b of + #"\n" => SOME (result cs) + | c => read (c :: cs))); + in read [] end; + +fun socket_rendezvous name = + let + val (in_stream, out_stream) = Socket_IO.open_streams name; + val _ = BinIO.StreamIO.setBufferMode (BinIO.getOutstream out_stream, IO.BLOCK_BUF); + in + System_Channel + {input_line = fn () => read_line in_stream, + inputN = fn n => Byte.bytesToString (BinIO.inputN (in_stream, n)), + output = fn s => BinIO.output (out_stream, Byte.stringToBytes s), + flush = fn () => BinIO.flushOut out_stream} + end; + +end; +