abstract System_Channel in ML (cf. Scala version);
back to TextIO for fifo, which is more stable in Poly/ML 5.4.x;
explicit block buffering -- BinIO might be subject to old Poly/ML defaults;
--- a/src/Pure/IsaMakefile Wed Sep 21 22:18:17 2011 +0200
+++ b/src/Pure/IsaMakefile Thu Sep 22 20:33:08 2011 +0200
@@ -195,6 +195,7 @@
System/isabelle_system.ML \
System/isar.ML \
System/session.ML \
+ System/system_channel.ML \
Thy/html.ML \
Thy/latex.ML \
Thy/present.ML \
--- a/src/Pure/ROOT.ML Wed Sep 21 22:18:17 2011 +0200
+++ b/src/Pure/ROOT.ML Thu Sep 22 20:33:08 2011 +0200
@@ -267,6 +267,7 @@
(* Isabelle/Isar system *)
use "System/session.ML";
+use "System/system_channel.ML";
use "System/isabelle_process.ML";
use "System/invoke_scala.ML";
use "PIDE/isar_document.ML";
--- a/src/Pure/System/isabelle_process.ML Wed Sep 21 22:18:17 2011 +0200
+++ b/src/Pure/System/isabelle_process.ML Thu Sep 22 20:33:08 2011 +0200
@@ -9,7 +9,7 @@
. stdout \002: ML running
.. stdin/stdout/stderr freely available (raw ML loop)
.. protocol thread initialization
- ... switch to in_fifo/out_fifo channels (rendezvous via open)
+ ... rendezvous on system channel
... out_fifo INIT(pid): channels ready
... out_fifo STATUS(keywords)
... out_fifo READY: main loop ready
@@ -21,8 +21,8 @@
val add_command: string -> (string list -> unit) -> unit
val command: string -> string list -> unit
val crashes: exn list Synchronized.var
+ val init_fifos: string -> string -> unit
val init_socket: string -> unit
- val init_fifos: string -> string -> unit
end;
structure Isabelle_Process: ISABELLE_PROCESS =
@@ -80,13 +80,13 @@
((case opt_serial of SOME i => cons (Markup.serialN, string_of_int i) | _ => I)
(Position.properties_of (Position.thread_data ()))) body;
-fun message_output mbox out_stream =
+fun message_output mbox channel =
let
- fun flush () = ignore (try BinIO.flushOut out_stream);
+ fun flush () = ignore (try System_Channel.flush channel);
fun loop receive =
(case receive mbox of
SOME (msg, do_flush) =>
- (List.app (fn s => BinIO.output (out_stream, Byte.stringToBytes s)) msg;
+ (List.app (fn s => System_Channel.output channel s) msg;
if do_flush then flush () else ();
loop (Mailbox.receive_timeout (seconds 0.02)))
| NONE => (flush (); loop (SOME o Mailbox.receive)));
@@ -94,13 +94,13 @@
in
-fun setup_channels out_stream =
+fun setup_channels channel =
let
val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF);
val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF);
val mbox = Mailbox.create () : (string list * bool) Mailbox.T;
- val _ = Simple_Thread.fork false (message_output mbox out_stream);
+ val _ = Simple_Thread.fork false (message_output mbox channel);
in
Output.Private_Hooks.status_fn := standard_message mbox NONE "B";
Output.Private_Hooks.report_fn := standard_message mbox NONE "C";
@@ -127,35 +127,23 @@
(Synchronized.change crashes (cons crash);
warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes");
-fun read_line stream =
- let
- val content = String.implode o rev;
- fun read cs =
- (case BinIO.input1 stream of
- NONE => (content cs, null cs)
- | SOME b =>
- (case Byte.byteToChar b of
- #"\n" => (content cs, false)
- | c => read (c :: cs)));
- in case read [] of ("", true) => NONE | (s, _) => SOME s end;
-
-fun read_chunk stream len =
+fun read_chunk channel len =
let
val n =
(case Int.fromString len of
SOME n => n
| NONE => error ("Isabelle process: malformed chunk header " ^ quote len));
- val chunk = Byte.bytesToString (BinIO.inputN (stream, n));
+ val chunk = System_Channel.inputN channel n;
val m = size chunk;
in
if m = n then chunk
else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")")
end;
-fun read_command stream =
- (case read_line stream of
+fun read_command channel =
+ (case System_Channel.input_line channel of
NONE => raise Runtime.TERMINATE
- | SOME line => map (read_chunk stream) (space_explode "," line));
+ | SOME line => map (read_chunk channel) (space_explode "," line));
fun run_command name args =
Runtime.debugging (command name) args
@@ -164,21 +152,21 @@
in
-fun loop stream =
+fun loop channel =
let val continue =
- (case read_command stream of
+ (case read_command channel of
[] => (Output.error_msg "Isabelle process: no input"; true)
| name :: args => (run_command name args; true))
handle Runtime.TERMINATE => false
| exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
- in if continue then loop stream else () end;
+ in if continue then loop channel else () end;
end;
(* init *)
-fun init make_streams = ignore (Simple_Thread.fork false (fn () =>
+fun init rendezvous = ignore (Simple_Thread.fork false (fn () =>
let
val _ = OS.Process.sleep (seconds 0.5); (*yield to raw ML toplevel*)
val _ = Output.physical_stdout Symbol.STX;
@@ -194,18 +182,16 @@
(fold (update op =)
[Symbol.xsymbolsN, isabelle_processN, Keyword.keyword_statusN, Pretty.symbolicN]);
- val (in_stream, out_stream) = make_streams ();
- val _ = setup_channels out_stream;
+ val channel = rendezvous ();
+ val _ = setup_channels channel;
val _ = Keyword.status ();
val _ = Thy_Info.status ();
val _ = Output.status (Markup.markup Markup.ready "process ready");
- in loop in_stream end));
+ in loop channel end));
-fun rendezvous fifo1 fifo2 = (BinIO.openIn fifo1, BinIO.openOut fifo2);
-fun init_fifos fifo1 fifo2 = init (fn () => rendezvous fifo1 fifo2);
-
-fun init_socket socket_name = init (fn () => Socket_IO.open_streams socket_name);
+fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2);
+fun init_socket name = init (fn () => System_Channel.socket_rendezvous name);
end;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/System/system_channel.ML Thu Sep 22 20:33:08 2011 +0200
@@ -0,0 +1,77 @@
+(* Title: Pure/System/system_channel.ML
+ Author: Makarius
+
+Portable system channel for inter-process communication, based on
+named pipes or sockets.
+*)
+
+signature SYSTEM_CHANNEL =
+sig
+ type T
+ val input_line: T -> string option
+ val inputN: T -> int -> string
+ val output: T -> string -> unit
+ val flush: T -> unit
+ val fifo_rendezvous: string -> string -> T
+ val socket_rendezvous: string -> T
+end;
+
+structure System_Channel: SYSTEM_CHANNEL =
+struct
+
+datatype T = System_Channel of
+ {input_line: unit -> string option,
+ inputN: int -> string,
+ output: string -> unit,
+ flush: unit -> unit};
+
+fun input_line (System_Channel {input_line = f, ...}) = f ();
+fun inputN (System_Channel {inputN = f, ...}) n = f n;
+fun output (System_Channel {output = f, ...}) s = f s;
+fun flush (System_Channel {flush = f, ...}) = f ();
+
+
+(* named pipes *)
+
+fun fifo_rendezvous fifo1 fifo2 =
+ let
+ val in_stream = TextIO.openIn fifo1;
+ val out_stream = TextIO.openOut fifo2;
+ val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream out_stream, IO.BLOCK_BUF);
+ in
+ System_Channel
+ {input_line = fn () => TextIO.inputLine in_stream,
+ inputN = fn n => TextIO.inputN (in_stream, n),
+ output = fn s => TextIO.output (out_stream, s),
+ flush = fn () => TextIO.flushOut out_stream}
+ end;
+
+
+(* sockets *)
+
+fun read_line in_stream =
+ let
+ fun result cs = String.implode (rev (#"\n" :: cs));
+ fun read cs =
+ (case BinIO.input1 in_stream of
+ NONE => if null cs then NONE else SOME (result cs)
+ | SOME b =>
+ (case Byte.byteToChar b of
+ #"\n" => SOME (result cs)
+ | c => read (c :: cs)));
+ in read [] end;
+
+fun socket_rendezvous name =
+ let
+ val (in_stream, out_stream) = Socket_IO.open_streams name;
+ val _ = BinIO.StreamIO.setBufferMode (BinIO.getOutstream out_stream, IO.BLOCK_BUF);
+ in
+ System_Channel
+ {input_line = fn () => read_line in_stream,
+ inputN = fn n => Byte.bytesToString (BinIO.inputN (in_stream, n)),
+ output = fn s => BinIO.output (out_stream, Byte.stringToBytes s),
+ flush = fn () => BinIO.flushOut out_stream}
+ end;
+
+end;
+