abstract System_Channel in ML (cf. Scala version);
authorwenzelm
Thu, 22 Sep 2011 20:33:08 +0200
changeset 45029 63144ea111f7
parent 45028 d608dd8cd409
child 45030 9cf265a192f6
abstract System_Channel in ML (cf. Scala version); back to TextIO for fifo, which is more stable in Poly/ML 5.4.x; explicit block buffering -- BinIO might be subject to old Poly/ML defaults;
src/Pure/IsaMakefile
src/Pure/ROOT.ML
src/Pure/System/isabelle_process.ML
src/Pure/System/system_channel.ML
--- a/src/Pure/IsaMakefile	Wed Sep 21 22:18:17 2011 +0200
+++ b/src/Pure/IsaMakefile	Thu Sep 22 20:33:08 2011 +0200
@@ -195,6 +195,7 @@
   System/isabelle_system.ML				\
   System/isar.ML					\
   System/session.ML					\
+  System/system_channel.ML				\
   Thy/html.ML						\
   Thy/latex.ML						\
   Thy/present.ML					\
--- a/src/Pure/ROOT.ML	Wed Sep 21 22:18:17 2011 +0200
+++ b/src/Pure/ROOT.ML	Thu Sep 22 20:33:08 2011 +0200
@@ -267,6 +267,7 @@
 (* Isabelle/Isar system *)
 
 use "System/session.ML";
+use "System/system_channel.ML";
 use "System/isabelle_process.ML";
 use "System/invoke_scala.ML";
 use "PIDE/isar_document.ML";
--- a/src/Pure/System/isabelle_process.ML	Wed Sep 21 22:18:17 2011 +0200
+++ b/src/Pure/System/isabelle_process.ML	Thu Sep 22 20:33:08 2011 +0200
@@ -9,7 +9,7 @@
   . stdout \002: ML running
   .. stdin/stdout/stderr freely available (raw ML loop)
   .. protocol thread initialization
-  ... switch to in_fifo/out_fifo channels (rendezvous via open)
+  ... rendezvous on system channel
   ... out_fifo INIT(pid): channels ready
   ... out_fifo STATUS(keywords)
   ... out_fifo READY: main loop ready
@@ -21,8 +21,8 @@
   val add_command: string -> (string list -> unit) -> unit
   val command: string -> string list -> unit
   val crashes: exn list Synchronized.var
+  val init_fifos: string -> string -> unit
   val init_socket: string -> unit
-  val init_fifos: string -> string -> unit
 end;
 
 structure Isabelle_Process: ISABELLE_PROCESS =
@@ -80,13 +80,13 @@
       ((case opt_serial of SOME i => cons (Markup.serialN, string_of_int i) | _ => I)
         (Position.properties_of (Position.thread_data ()))) body;
 
-fun message_output mbox out_stream =
+fun message_output mbox channel =
   let
-    fun flush () = ignore (try BinIO.flushOut out_stream);
+    fun flush () = ignore (try System_Channel.flush channel);
     fun loop receive =
       (case receive mbox of
         SOME (msg, do_flush) =>
-         (List.app (fn s => BinIO.output (out_stream, Byte.stringToBytes s)) msg;
+         (List.app (fn s => System_Channel.output channel s) msg;
           if do_flush then flush () else ();
           loop (Mailbox.receive_timeout (seconds 0.02)))
       | NONE => (flush (); loop (SOME o Mailbox.receive)));
@@ -94,13 +94,13 @@
 
 in
 
-fun setup_channels out_stream =
+fun setup_channels channel =
   let
     val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdOut, IO.LINE_BUF);
     val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream TextIO.stdErr, IO.LINE_BUF);
 
     val mbox = Mailbox.create () : (string list * bool) Mailbox.T;
-    val _ = Simple_Thread.fork false (message_output mbox out_stream);
+    val _ = Simple_Thread.fork false (message_output mbox channel);
   in
     Output.Private_Hooks.status_fn := standard_message mbox NONE "B";
     Output.Private_Hooks.report_fn := standard_message mbox NONE "C";
@@ -127,35 +127,23 @@
   (Synchronized.change crashes (cons crash);
     warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes");
 
-fun read_line stream =
-  let
-    val content = String.implode o rev;
-    fun read cs =
-      (case BinIO.input1 stream of
-        NONE => (content cs, null cs)
-      | SOME b =>
-          (case Byte.byteToChar b of
-            #"\n" => (content cs, false)
-          | c => read (c :: cs)));
-  in case read [] of ("", true) => NONE | (s, _) => SOME s end;
-
-fun read_chunk stream len =
+fun read_chunk channel len =
   let
     val n =
       (case Int.fromString len of
         SOME n => n
       | NONE => error ("Isabelle process: malformed chunk header " ^ quote len));
-    val chunk = Byte.bytesToString (BinIO.inputN (stream, n));
+    val chunk = System_Channel.inputN channel n;
     val m = size chunk;
   in
     if m = n then chunk
     else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")")
   end;
 
-fun read_command stream =
-  (case read_line stream of
+fun read_command channel =
+  (case System_Channel.input_line channel of
     NONE => raise Runtime.TERMINATE
-  | SOME line => map (read_chunk stream) (space_explode "," line));
+  | SOME line => map (read_chunk channel) (space_explode "," line));
 
 fun run_command name args =
   Runtime.debugging (command name) args
@@ -164,21 +152,21 @@
 
 in
 
-fun loop stream =
+fun loop channel =
   let val continue =
-    (case read_command stream of
+    (case read_command channel of
       [] => (Output.error_msg "Isabelle process: no input"; true)
     | name :: args => (run_command name args; true))
     handle Runtime.TERMINATE => false
       | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
-  in if continue then loop stream else () end;
+  in if continue then loop channel else () end;
 
 end;
 
 
 (* init *)
 
-fun init make_streams = ignore (Simple_Thread.fork false (fn () =>
+fun init rendezvous = ignore (Simple_Thread.fork false (fn () =>
   let
     val _ = OS.Process.sleep (seconds 0.5);  (*yield to raw ML toplevel*)
     val _ = Output.physical_stdout Symbol.STX;
@@ -194,18 +182,16 @@
         (fold (update op =)
           [Symbol.xsymbolsN, isabelle_processN, Keyword.keyword_statusN, Pretty.symbolicN]);
 
-    val (in_stream, out_stream) = make_streams ();
-    val _ = setup_channels out_stream;
+    val channel = rendezvous ();
+    val _ = setup_channels channel;
 
     val _ = Keyword.status ();
     val _ = Thy_Info.status ();
     val _ = Output.status (Markup.markup Markup.ready "process ready");
-  in loop in_stream end));
+  in loop channel end));
 
-fun rendezvous fifo1 fifo2 = (BinIO.openIn fifo1, BinIO.openOut fifo2);
-fun init_fifos fifo1 fifo2 = init (fn () => rendezvous fifo1 fifo2);
-
-fun init_socket socket_name = init (fn () => Socket_IO.open_streams socket_name);
+fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2);
+fun init_socket name = init (fn () => System_Channel.socket_rendezvous name);
 
 end;
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/System/system_channel.ML	Thu Sep 22 20:33:08 2011 +0200
@@ -0,0 +1,77 @@
+(*  Title:      Pure/System/system_channel.ML
+    Author:     Makarius
+
+Portable system channel for inter-process communication, based on
+named pipes or sockets.
+*)
+
+signature SYSTEM_CHANNEL =
+sig
+  type T
+  val input_line: T -> string option
+  val inputN: T -> int -> string
+  val output: T -> string -> unit
+  val flush: T -> unit
+  val fifo_rendezvous: string -> string -> T
+  val socket_rendezvous: string -> T
+end;
+
+structure System_Channel: SYSTEM_CHANNEL =
+struct
+
+datatype T = System_Channel of
+ {input_line: unit -> string option,
+  inputN: int -> string,
+  output: string -> unit,
+  flush: unit -> unit};
+
+fun input_line (System_Channel {input_line = f, ...}) = f ();
+fun inputN (System_Channel {inputN = f, ...}) n = f n;
+fun output (System_Channel {output = f, ...}) s = f s;
+fun flush (System_Channel {flush = f, ...}) = f ();
+
+
+(* named pipes *)
+
+fun fifo_rendezvous fifo1 fifo2 =
+  let
+    val in_stream = TextIO.openIn fifo1;
+    val out_stream = TextIO.openOut fifo2;
+    val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream out_stream, IO.BLOCK_BUF);
+  in
+    System_Channel
+     {input_line = fn () => TextIO.inputLine in_stream,
+      inputN = fn n => TextIO.inputN (in_stream, n),
+      output = fn s => TextIO.output (out_stream, s),
+      flush = fn () => TextIO.flushOut out_stream}
+  end;
+
+
+(* sockets *)
+
+fun read_line in_stream =
+  let
+    fun result cs = String.implode (rev (#"\n" :: cs));
+    fun read cs =
+      (case BinIO.input1 in_stream of
+        NONE => if null cs then NONE else SOME (result cs)
+      | SOME b =>
+          (case Byte.byteToChar b of
+            #"\n" => SOME (result cs)
+          | c => read (c :: cs)));
+  in read [] end;
+
+fun socket_rendezvous name =
+  let
+    val (in_stream, out_stream) = Socket_IO.open_streams name;
+    val _ = BinIO.StreamIO.setBufferMode (BinIO.getOutstream out_stream, IO.BLOCK_BUF);
+  in
+    System_Channel
+     {input_line = fn () => read_line in_stream,
+      inputN = fn n => Byte.bytesToString (BinIO.inputN (in_stream, n)),
+      output = fn s => BinIO.output (out_stream, Byte.stringToBytes s),
+      flush = fn () => BinIO.flushOut out_stream}
+  end;
+
+end;
+