changeset 744:4b8695f1cfc4

add rpc IO type
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 13 Jul 2016 20:39:08 -0600 (2016-07-14)
parents 2c41f2aec92f
children 9c1f28b26395
files core/src/luan/modules/IoLuan.java core/src/luan/modules/RpcLuan.java
diffstat 2 files changed, 114 insertions(+), 39 deletions(-) [+]
line wrap: on
line diff
--- a/core/src/luan/modules/IoLuan.java	Wed Jul 13 17:27:35 2016 -0600
+++ b/core/src/luan/modules/IoLuan.java	Wed Jul 13 20:39:08 2016 -0600
@@ -616,7 +616,7 @@
 	}
 
 	public static final class LuanSocket extends LuanIO {
-		private final Socket socket;
+		public final Socket socket;
 
 		private LuanSocket(String host,int port) throws LuanException {
 			try {
--- a/core/src/luan/modules/RpcLuan.java	Wed Jul 13 17:27:35 2016 -0600
+++ b/core/src/luan/modules/RpcLuan.java	Wed Jul 13 20:39:08 2016 -0600
@@ -4,8 +4,10 @@
 import java.io.OutputStream;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.InputStreamReader;
 import java.io.IOException;
 import java.io.EOFException;
+import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.Set;
 import java.util.IdentityHashMap;
@@ -26,53 +28,47 @@
 	private static final int NUMBER = 3;
 	private static final int BINARY = 4;
 	private static final int TABLE = 5;
+	private static final int IO = 6;
 
 	@LuanMethod public static Object[] call(LuanState luan,LuanTable socketTbl,String fnName,Object... args)
 		throws LuanException, IOException
 	{
-		IoLuan.LuanSocket socket = (IoLuan.LuanSocket)socketTbl.rawGet("java");
-		InputStream in = new BufferedInputStream(socket.inputStream());
-		OutputStream out = new BufferedOutputStream(socket.outputStream());
+		IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java");
+		Socket socket = luanSocket.socket;
+		InputStream in = new BufferedInputStream(socket.getInputStream());
+		OutputStream out = new BufferedOutputStream(socket.getOutputStream());
+		Close close = new Close();
 		try {
 			writeString(out,fnName);
-			writeInt(out,args.length);
-			for( Object arg : args ) {
-				writeObj(out,luan,arg);
-			}
+			writeObjs(out,luan,args);
 			out.flush();
+			socket.shutdownOutput();
 			boolean ok = readBoolean(in);
 			if( ok ) {
-				int n = readInt(in);
-				Object[] rtn = new Object[n];
-				for( int i=0; i<n; i++ ) {
-					rtn[i] = readObj(in,luan);
-				}
-				return rtn;
+				return readObjs(in,luan,close);
 			} else {
 				String msg = readString(in);
 				throw new LuanException(msg);
 			}
 		} finally {
-			out.close();
-			in.close();
+			if( close.b) {
+				socket.close();
+			}
 		}
 	}
 
 	public static void respond(LuanState luan,LuanTable socketTbl,LuanTable fns)
 		throws IOException, LuanException
 	{
-		IoLuan.LuanSocket socket = (IoLuan.LuanSocket)socketTbl.rawGet("java");
-		InputStream in = new BufferedInputStream(socket.inputStream());
-		OutputStream out = new BufferedOutputStream(socket.outputStream());
+		IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java");
+		Socket socket = luanSocket.socket;
+		InputStream in = new BufferedInputStream(socket.getInputStream());
+		OutputStream out = new BufferedOutputStream(socket.getOutputStream());
 		try {
 			Object[] rtn;
 			try {
 				String fnName = readString(in);
-				int nArgs = readInt(in);
-				Object[] args = new Object[nArgs];
-				for( int i=0; i<nArgs; i++ ) {
-					args[i] = readObj(in,luan);
-				}
+				Object[] args = readObjs(in,luan,null);
 				LuanFunction fn = (LuanFunction)fns.get(luan,fnName);
 				if( fn == null )
 					throw new LuanException( "function not found: " + fnName );
@@ -83,17 +79,46 @@
 				return;
 			}
 			writeBoolean(out,true);
-			writeInt(out,rtn.length);
-			for( Object obj : rtn ) {
-				writeObj(out,luan,obj);
-			}
+			writeObjs(out,luan,rtn);
 		} finally {
-			out.close();
-			in.close();
+			out.flush();
+			socket.close();
 		}
 	}
 
-	static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException {
+	private static void writeObjs(OutputStream out,LuanState luan,Object[] a) throws IOException, LuanException {
+		IoLuan.LuanIn luanIn = null;
+		writeInt(out,a.length);
+		for( Object obj : a ) {
+			if( obj instanceof LuanTable ) {
+				LuanTable tbl = (LuanTable)obj;
+				Object java = tbl.rawGet("java");
+				if( java instanceof IoLuan.LuanIn ) {
+					if( luanIn != null )
+						throw new LuanException("can't have multiple IO params");
+					luanIn = (IoLuan.LuanIn)java;
+					out.write(IO);
+					continue;
+				}
+			}
+			writeObj(out,luan,obj);
+		}
+		if( luanIn != null ) {
+			InputStream in = luanIn.inputStream();
+			Utils.copyAll(in,out);
+		}
+	}
+
+	private static Object[] readObjs(InputStream in,LuanState luan,Close close) throws IOException, LuanException {
+		int n = readInt(in);
+		Object[] rtn = new Object[n];
+		for( int i=0; i<n; i++ ) {
+			rtn[i] = readObj(in,luan,close);
+		}
+		return rtn;
+	}
+
+	private static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException {
 		if( obj == null ) {
 			out.write(NIL);
 		}
@@ -124,7 +149,7 @@
 			throw new LuanException( "invalid type: " + obj.getClass() );
 	}
 
-	static Object readObj(InputStream in,LuanState luan) throws IOException, LuanException {
+	private static Object readObj(InputStream in,LuanState luan,Close close) throws IOException, LuanException {
 		int type = in.read();
 		switch(type) {
 		case NIL:
@@ -141,22 +166,24 @@
 			String s = readString(in);
 			LuanFunction fn = Luan.load("return "+s,"rpc-reader");
 			return fn.call(luan);
+		case IO:
+			return new LuanInputStream(in,close).table();
 		default:
 			throw new LuanException( "invalid type: " + type );
 		}
 	}
 
-	static Boolean readBoolean(InputStream in) throws IOException {
+	private static Boolean readBoolean(InputStream in) throws IOException {
 		return Boolean.valueOf(readString(in));
 	}
 
-	static String readString(InputStream in) throws IOException {
+	private static String readString(InputStream in) throws IOException {
 		int len = readInt(in);
 		byte[] a = readBinary(in,len);
 		return new String(a,StandardCharsets.UTF_8);
 	}
 
-	static int readInt(InputStream in) throws IOException {
+	private static int readInt(InputStream in) throws IOException {
 		int ch1 = in.read();
 		int ch2 = in.read();
 		int ch3 = in.read();
@@ -166,7 +193,7 @@
 		return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
 	}
 
-	static byte[] readBinary(InputStream in,int size) throws IOException {
+	private static byte[] readBinary(InputStream in,int size) throws IOException {
 		byte[] a = new byte[size];
 		int i = 0;
 		while( i < size ) {
@@ -178,17 +205,17 @@
 		return a;
 	}
 
-	static void writeBoolean(OutputStream out,Boolean b) throws IOException {
+	private static void writeBoolean(OutputStream out,Boolean b) throws IOException {
 		writeString(out,b.toString());
 	}
 
-	static void writeString(OutputStream out,String s) throws IOException {
+	private static void writeString(OutputStream out,String s) throws IOException {
 		byte[] a = s.getBytes(StandardCharsets.UTF_8);
 		writeInt(out,a.length);
 		out.write(a);
 	}
 
-	static void writeInt(OutputStream out,int v) throws IOException {
+	private static void writeInt(OutputStream out,int v) throws IOException {
         out.write((v >>> 24) & 0xFF);
         out.write((v >>> 16) & 0xFF);
         out.write((v >>>  8) & 0xFF);
@@ -225,4 +252,52 @@
 		throw new LuanException( "invalid type: " + obj.getClass() );
 	}
 
+
+	private static class Close {
+		boolean b = true;
+	}
+
+	private static class LuanInputStream extends IoLuan.LuanIn {
+		private final InputStream in;
+		private final boolean close;
+
+		public LuanInputStream(InputStream in,Close close) {
+			this.in = in;
+			this.close = close!=null && close.b;
+			if(this.close)  close.b = false;
+		}
+
+		private void close() throws IOException {
+			if(close)  in.close();
+		}
+
+		@Override public InputStream inputStream() {
+			return in;
+		}
+
+		@Override public String to_string() {
+			return "<input_stream>";
+		}
+
+		@Override public String to_uri_string() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override public String read_text() throws IOException {
+			String rtn = Utils.readAll(new InputStreamReader(in));
+			close();
+			return rtn;
+		}
+
+		@Override public byte[] read_binary() throws IOException {
+			byte[] rtn = Utils.readAll(in);
+			close();
+			return rtn;
+		}
+
+		@Override public boolean exists() {
+			return true;
+		}
+	};
+
 }