src/Pure/System/system_channel.ML
author wenzelm
Wed, 08 Aug 2012 12:33:40 +0200
changeset 48731 a45ba78abcc1
parent 45158 db4bf4fb5492
child 50800 c0fb2839d1a9
permissions -rw-r--r--
more casual exit back to ML toplevel, to accomodate commit in SML/NJ which continues at the saved point;

(*  Title:      Pure/System/system_channel.ML
    Author:     Makarius

Portable system channel for inter-process communication, based on
named pipes or sockets.
*)

signature SYSTEM_CHANNEL =
sig
  type T
  val input_line: T -> string option
  val inputN: T -> int -> string
  val output: T -> string -> unit
  val flush: T -> unit
  val fifo_rendezvous: string -> string -> T
  val socket_rendezvous: string -> T
end;

structure System_Channel: SYSTEM_CHANNEL =
struct

datatype T = System_Channel of
 {input_line: unit -> string option,
  inputN: int -> string,
  output: string -> unit,
  flush: unit -> unit};

fun input_line (System_Channel {input_line = f, ...}) = f ();
fun inputN (System_Channel {inputN = f, ...}) n = f n;
fun output (System_Channel {output = f, ...}) s = f s;
fun flush (System_Channel {flush = f, ...}) = f ();


(* named pipes *)

fun fifo_rendezvous fifo1 fifo2 =
  let
    val in_stream = TextIO.openIn fifo1;
    val out_stream = TextIO.openOut fifo2;
    val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream out_stream, IO.BLOCK_BUF);
  in
    System_Channel
     {input_line = fn () => TextIO.inputLine in_stream,
      inputN = fn n => TextIO.inputN (in_stream, n),
      output = fn s => TextIO.output (out_stream, s),
      flush = fn () => TextIO.flushOut out_stream}
  end;


(* sockets *)  (* FIXME proper buffering via BinIO (after Poly/ML 5.4.1) *)

local

fun readN socket n =
  let
    fun read i buf =
      let
        val s = Byte.bytesToString (Socket.recvVec (socket, n - i));
        val m = size s;
        val i' = i + m;
        val buf' = Buffer.add s buf;
      in if m > 0 andalso n > i' then read i' buf' else Buffer.content buf' end;
  in read 0 Buffer.empty end;

fun read_line socket =
  let
    fun result cs = implode (rev ("\n" :: cs));
    fun read cs =
      (case readN socket 1 of
        "" => if null cs then NONE else SOME (result cs)
      | "\n" => SOME (result cs)
      | c => read (c :: cs));
  in read [] end;

fun write socket =
  let
    fun send buf =
      if Word8VectorSlice.isEmpty buf then ()
      else
        let
          val n = Int.min (Word8VectorSlice.length buf, 4096);
          val m = Socket.sendVec (socket, Word8VectorSlice.subslice (buf, 0, SOME n));
          val buf' = Word8VectorSlice.subslice (buf, m, NONE);
        in send buf' end;
  in fn s => send (Word8VectorSlice.full (Byte.stringToBytes s)) end;

in

fun socket_rendezvous name =
  let
    fun err () = error ("Bad socket name: " ^ quote name);
    val (host, port) =
      (case space_explode ":" name of
        [h, p] =>
         (case NetHostDB.getByName h of SOME host => host | NONE => err (),
          case Int.fromString p of SOME port => port | NONE => err ())
      | _ => err ());
    val socket: Socket.active INetSock.stream_sock = INetSock.TCP.socket ();
    val _ = Socket.connect (socket, INetSock.toAddr (NetHostDB.addr host, port));
  in
    System_Channel
     {input_line = fn () => read_line socket,
      inputN = fn n => readN socket n,
      output = fn s => write socket s,
      flush = fn () => ()}
  end;

end;

end;