recovered buffered sockets from 11f622794ad6 -- requires Poly/ML 5.5.x;
authorwenzelm
Thu, 10 Jan 2013 12:41:53 +0100
changeset 50800 c0fb2839d1a9
parent 50799 5a2f5834ccb4
child 50801 b8ff6d1ee56c
recovered buffered sockets from 11f622794ad6 -- requires Poly/ML 5.5.x;
src/Pure/General/socket_io.ML
src/Pure/ROOT
src/Pure/ROOT.ML
src/Pure/System/system_channel.ML
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Pure/General/socket_io.ML	Thu Jan 10 12:41:53 2013 +0100
@@ -0,0 +1,87 @@
+(*  Title:      Pure/General/socket_io.ML
+    Author:     Timothy Bourke, NICTA
+    Author:     Makarius
+
+Stream IO over TCP sockets.  Following example 10.2 in "The Standard
+ML Basis Library" by Emden R. Gansner and John H. Reppy.
+
+Note: BinIO requires Poly/ML 5.5.x to work reliably.
+*)
+
+signature SOCKET_IO =
+sig
+  val make_streams: Socket.active INetSock.stream_sock -> BinIO.instream * BinIO.outstream
+  val open_streams: string -> BinIO.instream * BinIO.outstream
+end;
+
+structure Socket_IO: SOCKET_IO =
+struct
+
+fun make_streams socket =
+  let
+    val (host, port) = INetSock.fromAddr (Socket.Ctl.getSockName socket);
+    val name = NetHostDB.toString host ^ ":" ^ string_of_int port;
+
+    val rd =
+      BinPrimIO.RD {
+        name = name,
+        chunkSize = Socket.Ctl.getRCVBUF socket,
+        readVec = SOME (fn n => Socket.recvVec (socket, n)),
+        readArr = SOME (fn buffer => Socket.recvArr (socket, buffer)),
+        readVecNB = NONE,
+        readArrNB = NONE,
+        block = NONE,
+        canInput = NONE,
+        avail = fn () => NONE,
+        getPos = NONE,
+        setPos = NONE,
+        endPos = NONE,
+        verifyPos = NONE,
+        close = fn () => Socket.close socket,
+        ioDesc = NONE
+      };
+
+    val wr =
+      BinPrimIO.WR {
+        name = name,
+        chunkSize = Socket.Ctl.getSNDBUF socket,
+        writeVec = SOME (fn buffer => Socket.sendVec (socket, buffer)),
+        writeArr = SOME (fn buffer => Socket.sendArr (socket, buffer)),
+        writeVecNB = NONE,
+        writeArrNB = NONE,
+        block = NONE,
+        canOutput = NONE,
+        getPos = NONE,
+        setPos = NONE,
+        endPos = NONE,
+        verifyPos = NONE,
+        close = fn () => Socket.close socket,
+        ioDesc = NONE
+      };
+
+    val in_stream =
+      BinIO.mkInstream
+        (BinIO.StreamIO.mkInstream (rd, Word8Vector.fromList []));
+
+    val out_stream =
+      BinIO.mkOutstream
+        (BinIO.StreamIO.mkOutstream (wr, IO.BLOCK_BUF));
+
+  in (in_stream, out_stream) end;
+
+
+fun open_streams socket_name =
+  let
+    fun err () = error ("Bad socket name: " ^ quote socket_name);
+    val (host, port) =
+      (case space_explode ":" socket_name of
+        [h, p] =>
+         (case NetHostDB.getByName h of SOME host => host | NONE => err (),
+          case Int.fromString p of SOME port => port | NONE => err ())
+      | _ => err ());
+    val socket: Socket.active INetSock.stream_sock = INetSock.TCP.socket ();
+    val _ = Socket.connect (socket, INetSock.toAddr (NetHostDB.addr host, port));
+  in make_streams socket end;
+
+end;
+
--- a/src/Pure/ROOT	Wed Jan 09 22:38:21 2013 +0100
+++ b/src/Pure/ROOT	Thu Jan 10 12:41:53 2013 +0100
@@ -87,6 +87,7 @@
     "General/seq.ML"
     "General/sha1.ML"
     "General/sha1_polyml.ML"
+    "General/socket_io.ML"
     "General/source.ML"
     "General/stack.ML"
     "General/symbol.ML"
--- a/src/Pure/ROOT.ML	Wed Jan 09 22:38:21 2013 +0100
+++ b/src/Pure/ROOT.ML	Thu Jan 10 12:41:53 2013 +0100
@@ -57,6 +57,7 @@
 use "General/file.ML";
 use "General/long_name.ML";
 use "General/binding.ML";
+use "General/socket_io.ML";
 
 use "General/sha1.ML";
 if ML_System.is_polyml then use "General/sha1_polyml.ML" else ();
--- a/src/Pure/System/system_channel.ML	Wed Jan 09 22:38:21 2013 +0100
+++ b/src/Pure/System/system_channel.ML	Thu Jan 10 12:41:53 2013 +0100
@@ -47,65 +47,31 @@
   end;
 
 
-(* sockets *)  (* FIXME proper buffering via BinIO (after Poly/ML 5.4.1) *)
+(* sockets *)
 
-local
-
-fun readN socket n =
+fun read_line in_stream =
   let
-    fun read i buf =
-      let
-        val s = Byte.bytesToString (Socket.recvVec (socket, n - i));
-        val m = size s;
-        val i' = i + m;
-        val buf' = Buffer.add s buf;
-      in if m > 0 andalso n > i' then read i' buf' else Buffer.content buf' end;
-  in read 0 Buffer.empty end;
-
-fun read_line socket =
-  let
-    fun result cs = implode (rev ("\n" :: cs));
+    fun result cs = String.implode (rev (#"\n" :: cs));
     fun read cs =
-      (case readN socket 1 of
-        "" => if null cs then NONE else SOME (result cs)
-      | "\n" => SOME (result cs)
-      | c => read (c :: cs));
+      (case BinIO.input1 in_stream of
+        NONE => if null cs then NONE else SOME (result cs)
+      | SOME b =>
+          (case Byte.byteToChar b of
+            #"\n" => SOME (result cs)
+          | c => read (c :: cs)));
   in read [] end;
 
-fun write socket =
-  let
-    fun send buf =
-      if Word8VectorSlice.isEmpty buf then ()
-      else
-        let
-          val n = Int.min (Word8VectorSlice.length buf, 4096);
-          val m = Socket.sendVec (socket, Word8VectorSlice.subslice (buf, 0, SOME n));
-          val buf' = Word8VectorSlice.subslice (buf, m, NONE);
-        in send buf' end;
-  in fn s => send (Word8VectorSlice.full (Byte.stringToBytes s)) end;
-
-in
-
 fun socket_rendezvous name =
   let
-    fun err () = error ("Bad socket name: " ^ quote name);
-    val (host, port) =
-      (case space_explode ":" name of
-        [h, p] =>
-         (case NetHostDB.getByName h of SOME host => host | NONE => err (),
-          case Int.fromString p of SOME port => port | NONE => err ())
-      | _ => err ());
-    val socket: Socket.active INetSock.stream_sock = INetSock.TCP.socket ();
-    val _ = Socket.connect (socket, INetSock.toAddr (NetHostDB.addr host, port));
+    val (in_stream, out_stream) = Socket_IO.open_streams name;
+    val _ = BinIO.StreamIO.setBufferMode (BinIO.getOutstream out_stream, IO.BLOCK_BUF);
   in
     System_Channel
-     {input_line = fn () => read_line socket,
-      inputN = fn n => readN socket n,
-      output = fn s => write socket s,
-      flush = fn () => ()}
+     {input_line = fn () => read_line in_stream,
+      inputN = fn n => Byte.bytesToString (BinIO.inputN (in_stream, n)),
+      output = fn s => BinIO.output (out_stream, Byte.stringToBytes s),
+      flush = fn () => BinIO.flushOut out_stream}
   end;
 
 end;
 
-end;
-