changeset 1118:e4710ddfd287

start luan/lib/rpc
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 06 Aug 2017 20:11:11 -0600
parents 9a1aa6fc0b4e
children 87c674f3f6b7
files src/luan/lib/rpc/FixedLengthInputStream.java src/luan/lib/rpc/Rpc.java src/luan/lib/rpc/RpcCall.java src/luan/lib/rpc/RpcClient.java src/luan/lib/rpc/RpcCon.java src/luan/lib/rpc/RpcException.java src/luan/lib/rpc/RpcResult.java src/luan/lib/rpc/RpcServer.java
diffstat 8 files changed, 368 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/lib/rpc/FixedLengthInputStream.java	Sun Aug 06 20:11:11 2017 -0600
@@ -0,0 +1,75 @@
+package luan.lib.rpc;
+
+import java.io.InputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.EOFException;
+
+
+public class FixedLengthInputStream extends FilterInputStream {
+	private long left;
+
+	public FixedLengthInputStream(InputStream in,long len) {
+		super(in);
+		if( len < 0 )
+			throw new IllegalArgumentException("len can't be negative");
+		this.left = len;
+	}
+
+	public int read() throws IOException {
+		if( left == 0 )
+			return -1;
+		int n = in.read();
+		if( n == -1 )
+			throw new EOFException();
+		left--;
+		return n;
+	}
+
+	public int read(byte b[], int off, int len) throws IOException {
+		if( len == 0 )
+			return 0;
+		if( left == 0 )
+			return -1;
+		if( len > left )
+			len = (int)left;
+		int n = in.read(b, off, len);
+		if( n == -1 )
+			throw new EOFException();
+		left -= n;
+		return n;
+	}
+
+	public long skip(long n) throws IOException {
+		if( n > left )
+			n = left;
+		n = in.skip(n);
+		left -= n;
+		return n;
+	}
+
+	public int available() throws IOException {
+		int n = in.available();
+		if( n > left )
+			n = (int)left;
+		return n;
+	}
+
+    public void close() throws IOException {
+        while( left > 0 ) {
+			if( skip(left) == 0 )
+				throw new EOFException();
+		}
+    }
+
+	public void mark(int readlimit) {}
+
+	public void reset() throws IOException {
+		throw new IOException("not supported");
+	}
+
+	public boolean markSupported() {
+		return false;
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/lib/rpc/Rpc.java	Sun Aug 06 20:11:11 2017 -0600
@@ -0,0 +1,36 @@
+package luan.lib.rpc;
+
+import java.io.IOException;
+
+
+// static utils
+public class Rpc {
+	private Rpc() {}  // never
+
+	public static final RpcResult OK = new RpcResult();
+
+	public static final RpcCall CLOSE = new RpcCall("close");
+	public static final RpcCall PING = new RpcCall("ping");
+	public static final String ECHO = "echo";
+
+	public static final RpcException COMMAND_NOT_FOUND = new RpcException("command_not_found");
+
+	public static boolean handle(RpcServer server,RpcCall call)
+		throws IOException
+	{
+		if( CLOSE.cmd.equals(call.cmd) ) {
+			server.close();
+			return true;
+		}
+		if( PING.cmd.equals(call.cmd) ) {
+			server.write(OK);
+			return true;
+		}
+		if( ECHO.equals(call.cmd) ) {
+			server.write(new RpcResult(call.args));
+			return true;
+		}
+		return false;
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/lib/rpc/RpcCall.java	Sun Aug 06 20:11:11 2017 -0600
@@ -0,0 +1,22 @@
+package luan.lib.rpc;
+
+import java.io.InputStream;
+
+
+public final class RpcCall {
+	public final InputStream in;
+	public final long lenIn;
+	public final String cmd;
+	public final Object[] args;
+
+	public RpcCall(String cmd,Object... args) {
+		this(null,-1L,cmd,args);
+	}
+
+	public RpcCall(InputStream in,long lenIn,String cmd,Object... args) {
+		this.in = in;
+		this.lenIn = lenIn;
+		this.cmd = cmd;
+		this.args = args;
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/lib/rpc/RpcClient.java	Sun Aug 06 20:11:11 2017 -0600
@@ -0,0 +1,40 @@
+package luan.lib.rpc;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.ArrayList;
+
+
+public class RpcClient extends RpcCon {
+
+	public RpcClient(Socket socket) throws IOException {
+		super(socket);
+	}
+
+	public void write(RpcCall call)
+		throws IOException
+	{
+		List list = new ArrayList();
+		list.add(call.cmd);
+		for( Object arg : call.args ) {
+			list.add(arg);
+		}
+		write(call.in,call.lenIn,list);
+	}
+
+	public RpcResult read()
+		throws IOException, RpcException
+	{
+		List list = readJson();
+		boolean ok = (Boolean)list.remove(0);
+		if( !ok ) {
+			String errorId = (String)list.remove(0);
+			Object[] args = list.toArray();
+			throw new RpcException(inBinary,lenBinary,errorId,args);
+		}
+		Object[] args = list.toArray();
+		return new RpcResult(inBinary,lenBinary,args);
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/lib/rpc/RpcCon.java	Sun Aug 06 20:11:11 2017 -0600
@@ -0,0 +1,108 @@
+package luan.lib.rpc;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import luan.lib.parser.ParseException;
+import luan.lib.json.JsonParser;
+import luan.lib.json.JsonToString;
+
+
+public class RpcCon {
+	final Socket socket;
+	final InputStream in;
+	final OutputStream out;
+	InputStream inBinary = null;
+	long lenBinary = -1;
+
+	RpcCon(Socket socket) throws IOException {
+		this.socket = socket;
+		this.in = socket.getInputStream();
+		this.out = socket.getOutputStream();
+	}
+
+	public void close()
+		throws IOException
+	{
+		socket.close();
+	}
+
+	public boolean isClosed() {
+		return socket.isClosed();
+	}
+
+	void write(InputStream in,long lenIn,List list)
+		throws IOException
+	{
+		if( in != null )
+			list.add(0,lenIn);
+		String json = JsonToString.toString(list);
+		byte[] aJson = json.getBytes(StandardCharsets.UTF_8);
+		int len = aJson.length;
+		byte[] a = new byte[4+len];
+        a[0] = (byte)(len >>> 24);
+        a[1] = (byte)(len >>> 16);
+        a[2] = (byte)(len >>>  8);
+        a[3] = (byte)(len >>>  0);
+		System.arraycopy(aJson,0,a,4,len);
+		out.write(a);
+		if( in != null ) {
+			a = new byte[8192];
+			long total = 0;
+			int n;
+			while( (n=in.read(a)) != -1 ) {
+				out.write(a,0,n);
+				total += n;
+			}
+			if( total != lenIn )
+				throw new IOException("InputStream wrong length "+total+" when should be "+lenIn);
+		}
+	}
+
+	List readJson()
+		throws IOException
+	{
+		if( inBinary != null ) {
+			inBinary.close();
+			inBinary = null;
+			lenBinary = -1;
+		}
+		byte[] a = new byte[4];
+		readAll(a);
+		int len = 0;
+		for( byte b : a ) {
+			len <<= 8;
+			len |= b&0xFF;
+		}
+		a = new byte[len];
+		readAll(a);
+		String json = new String(a,StandardCharsets.UTF_8);
+		List list;
+		try {
+			list = (List)JsonParser.parse(json);
+		} catch(ParseException e) {
+			throw new IOException(e);
+		}
+		if( list.get(0) instanceof Long ) {
+			lenBinary = (Long)list.remove(0);
+			inBinary = new FixedLengthInputStream(in,lenBinary);
+		}
+		return list;
+	}
+
+	private void readAll(final byte[] a) throws IOException {
+		int total = 0;
+		int n;
+		while( total < a.length ){
+			n = in.read( a, total, a.length-total );
+			if( n == -1 )
+				throw new EOFException();
+			total += n;
+		}
+	}
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/lib/rpc/RpcException.java	Sun Aug 06 20:11:11 2017 -0600
@@ -0,0 +1,21 @@
+package luan.lib.rpc;
+
+import java.io.InputStream;
+
+
+public class RpcException extends Exception {
+	public final InputStream in;
+	public final long lenIn;
+	public final Object[] values;
+
+	public RpcException(String id,Object... values) {
+		this(null,-1,id,values);
+	}
+
+	public RpcException(InputStream in,long lenIn,String id,Object... values) {
+		super(id);
+		this.in = in;
+		this.lenIn = lenIn;
+		this.values = values;
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/lib/rpc/RpcResult.java	Sun Aug 06 20:11:11 2017 -0600
@@ -0,0 +1,20 @@
+package luan.lib.rpc;
+
+import java.io.InputStream;
+
+
+public final class RpcResult {
+	public final InputStream in;
+	public final long lenIn;
+	public final Object[] returnValues;
+
+	public RpcResult(Object... returnValues) {
+		this(null,-1L,returnValues);
+	}
+
+	public RpcResult(InputStream in,long lenIn,Object... returnValues) {
+		this.in = in;
+		this.lenIn = lenIn;
+		this.returnValues = returnValues;
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/lib/rpc/RpcServer.java	Sun Aug 06 20:11:11 2017 -0600
@@ -0,0 +1,46 @@
+package luan.lib.rpc;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.ArrayList;
+
+
+public class RpcServer extends RpcCon {
+
+	public RpcServer(Socket socket) throws IOException {
+		super(socket);
+	}
+
+	public RpcCall read()
+		throws IOException
+	{
+		List list = readJson();
+		String cmd = (String)list.remove(0);
+		Object[] args = list.toArray();
+		return new RpcCall(inBinary,lenBinary,cmd,args);
+	}
+
+	public void write(RpcResult result)
+		throws IOException
+	{
+		List list = new ArrayList();
+		list.add(true);
+		for( Object val : result.returnValues ) {
+			list.add(val);
+		}
+		write(result.in,result.lenIn,list);
+	}
+
+	public void write(RpcException ex)
+		throws IOException
+	{
+		List list = new ArrayList();
+		list.add(false);
+		for( Object val : ex.values ) {
+			list.add(val);
+		}
+		write(ex.in,ex.lenIn,list);
+	}
+
+}