view core/src/luan/modules/RpcLuan.java @ 759:ae612dfc57cb 0.21

better handling of longs in rpc and json
author Franklin Schmidt <fschmidt@gmail.com>
date Tue, 19 Jul 2016 09:09:41 -0600
parents c29d11d675fd
children
line wrap: on
line source

package luan.modules;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStreamReader;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.EOFException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.IdentityHashMap;
import java.util.Collections;
import java.util.Map;
import luan.Luan;
import luan.LuanState;
import luan.LuanTable;
import luan.LuanFunction;
import luan.LuanException;
import luan.LuanMethod;
import luan.modules.parsers.Json;
import luan.modules.parsers.ParseException;


public final class RpcLuan {
	private static final int NIL = 0;
	private static final int STRING = 1;
	private static final int BOOLEAN = 2;
	private static final int NUMBER = 3;
	private static final int BINARY = 4;
	private static final int TABLE = 5;
	private static final int IO = 6;
	private static final int LONG = 7;

	@LuanMethod public static Object[] call(LuanState luan,LuanTable socketTbl,String fnName,Object... args)
		throws LuanException, IOException
	{
		IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java");
		Socket socket = luanSocket.socket;
		InputStream in = new BufferedInputStream(socket.getInputStream());
		OutputStream out = new BufferedOutputStream(socket.getOutputStream());
		Close close = new Close();
		try {
			writeString(out,fnName);
			writeObjs(out,luan,args);
			out.flush();
			socket.shutdownOutput();
			boolean ok = readBoolean(in);
			if( ok ) {
				return readObjs(in,luan,close);
			} else {
				String msg = readString(in);
				throw new LuanException(msg);
			}
		} finally {
			if( close.b) {
				socket.close();
			}
		}
	}

	public static void respond(LuanState luan,LuanTable socketTbl,LuanTable fns)
		throws IOException, LuanException
	{
		IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java");
		Socket socket = luanSocket.socket;
		InputStream in = new BufferedInputStream(socket.getInputStream());
		OutputStream out = new BufferedOutputStream(socket.getOutputStream());
		try {
			Object[] rtn;
			try {
				String fnName = readString(in);
				Object[] args = readObjs(in,luan,null);
				LuanFunction fn = (LuanFunction)fns.get(luan,fnName);
				if( fn == null )
					throw new LuanException( "function not found: " + fnName );
				rtn = Luan.array(fn.call(luan,args));
			} catch(LuanException e) {
				writeBoolean(out,false);
				writeString(out,e.getFullMessage());
				out.flush();
				return;
			}
			writeBoolean(out,true);
			writeObjs(out,luan,rtn);
			out.flush();
		} finally {
			socket.close();
		}
	}

	private static void writeObjs(OutputStream out,LuanState luan,Object[] a) throws IOException, LuanException {
		IoLuan.LuanIn luanIn = null;
		writeInt(out,a.length);
		for( Object obj : a ) {
			if( obj instanceof LuanTable ) {
				LuanTable tbl = (LuanTable)obj;
				Object java = tbl.rawGet("java");
				if( java instanceof IoLuan.LuanIn ) {
					if( luanIn != null )
						throw new LuanException("can't have multiple IO params");
					luanIn = (IoLuan.LuanIn)java;
					out.write(IO);
					continue;
				}
			}
			writeObj(out,luan,obj);
		}
		if( luanIn != null ) {
			InputStream in = luanIn.inputStream();
			Utils.copyAll(in,out);
		}
	}

	private static Object[] readObjs(InputStream in,LuanState luan,Close close) throws IOException, LuanException {
		int n = readInt(in);
		Object[] rtn = new Object[n];
		for( int i=0; i<n; i++ ) {
			rtn[i] = readObj(in,luan,close);
		}
		return rtn;
	}

	private static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException {
		if( obj == null ) {
			out.write(NIL);
		}
		else if( obj instanceof String ) {
			out.write(STRING);
			writeString(out,(String)obj);
		}
		else if( obj instanceof Boolean ) {
			out.write(BOOLEAN);
			writeBoolean(out,(Boolean)obj);
		}
		else if( obj instanceof Long ) {
			out.write(LONG);
			writeString(out,obj.toString());
		}
		else if( obj instanceof Number ) {
			out.write(NUMBER);
			writeString(out,obj.toString());
		}
		else if( obj instanceof byte[] ) {
			byte[] a = (byte[])obj;
			out.write(BINARY);
			writeInt(out,a.length);
			out.write(a);
		}
		else if( obj instanceof LuanTable ) {
			out.write(TABLE);
//			String s = pickle( luan, obj, Collections.newSetFromMap(new IdentityHashMap<LuanTable,Boolean>()) );
			String s = Json.toString(obj);
			writeString(out,s);
		}
		else
			throw new LuanException( "invalid type: " + obj.getClass() );
	}

	private static Object readObj(InputStream in,LuanState luan,Close close) throws IOException, LuanException {
		int type = in.read();
		switch(type) {
		case NIL:
			return null;
		case STRING:
			return readString(in);
		case BOOLEAN:
			return readBoolean(in);
		case LONG:
			return Long.valueOf(readString(in));
		case NUMBER:
			return Double.valueOf(readString(in));
		case BINARY:
			return readBinary(in,readInt(in));
		case TABLE:
			String s = readString(in);
/*
			LuanFunction fn = Luan.load("return "+s,"rpc-reader");
			return fn.call(luan);
*/
			try {
				return Json.parse(s);
			} catch(ParseException e) {
				throw new LuanException(e);
			}
		case IO:
			return new LuanInputStream(in,close).table();
		default:
			throw new LuanException( "invalid type: " + type );
		}
	}

	private static Boolean readBoolean(InputStream in) throws IOException {
		return Boolean.valueOf(readString(in));
	}

	private static String readString(InputStream in) throws IOException {
		int len = readInt(in);
		byte[] a = readBinary(in,len);
		return new String(a,StandardCharsets.UTF_8);
	}

	private static int readInt(InputStream in) throws IOException {
		int ch1 = in.read();
		int ch2 = in.read();
		int ch3 = in.read();
		int ch4 = in.read();
		if ((ch1 | ch2 | ch3 | ch4) < 0)
			throw new EOFException();
		return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
	}

	private static byte[] readBinary(InputStream in,int size) throws IOException {
		byte[] a = new byte[size];
		int i = 0;
		while( i < size ) {
			int n = in.read(a,i,size-i);
			if( n == -1 )
				throw new EOFException();
			i += n;
		}
		return a;
	}

	private static void writeBoolean(OutputStream out,Boolean b) throws IOException {
		writeString(out,b.toString());
	}

	private static void writeString(OutputStream out,String s) throws IOException {
		byte[] a = s.getBytes(StandardCharsets.UTF_8);
		writeInt(out,a.length);
		out.write(a);
	}

	private static void writeInt(OutputStream out,int v) throws IOException {
        out.write((v >>> 24) & 0xFF);
        out.write((v >>> 16) & 0xFF);
        out.write((v >>>  8) & 0xFF);
        out.write((v >>>  0) & 0xFF);
	}

/*
	private static String pickle(LuanState luan,Object obj,Set<LuanTable> set) throws LuanException {
		if( obj == null )
			return "nil";
		if( obj instanceof Boolean )
			return obj.toString();
		if( obj instanceof Number )
			return Luan.toString((Number)obj);
		if( obj instanceof String )
			return "\"" + Luan.stringEncode((String)obj) + "\"";
		if( obj instanceof LuanTable ) {
			LuanTable tbl = (LuanTable)obj;
			if( !set.add(tbl) ) {
				throw new LuanException( "circular reference in table" );
			}
			StringBuilder sb = new StringBuilder();
			sb.append( "{" );
			for( Map.Entry<Object,Object> entry : tbl.iterable(luan) ) {
				sb.append( "[" );
				sb.append( pickle(luan,entry.getKey(),set) );
				sb.append( "]=" );
				sb.append( pickle(luan,entry.getValue(),set) );
				sb.append( ", " );
			}
			sb.append( "}" );
			return sb.toString();
		}
		throw new LuanException( "invalid type: " + obj.getClass() );
	}
*/

	private static class Close {
		boolean b = true;
	}

	private static class LuanInputStream extends IoLuan.LuanIn {
		private final InputStream in;
		private final boolean close;

		public LuanInputStream(InputStream in,Close close) {
			this.in = in;
			this.close = close!=null && close.b;
			if(this.close)  close.b = false;
		}

		@Override public InputStream inputStream() {
			return new FilterInputStream(in) {
				@Override public void close() throws IOException {
					if(close)  super.close();
				}
			};
		}

		@Override public String to_string() {
			return "<input_stream>";
		}

		@Override public String to_uri_string() {
			throw new UnsupportedOperationException();
		}

		@Override public boolean exists() {
			return true;
		}
	};

}