Mercurial Hosting > luan
changeset 744:4b8695f1cfc4
add rpc IO type
| author | Franklin Schmidt <fschmidt@gmail.com> | 
|---|---|
| date | Wed, 13 Jul 2016 20:39:08 -0600 | 
| parents | 2c41f2aec92f | 
| children | 9c1f28b26395 | 
| files | core/src/luan/modules/IoLuan.java core/src/luan/modules/RpcLuan.java | 
| diffstat | 2 files changed, 114 insertions(+), 39 deletions(-) [+] | 
line wrap: on
 line diff
--- a/core/src/luan/modules/IoLuan.java Wed Jul 13 17:27:35 2016 -0600 +++ b/core/src/luan/modules/IoLuan.java Wed Jul 13 20:39:08 2016 -0600 @@ -616,7 +616,7 @@ } public static final class LuanSocket extends LuanIO { - private final Socket socket; + public final Socket socket; private LuanSocket(String host,int port) throws LuanException { try {
--- a/core/src/luan/modules/RpcLuan.java Wed Jul 13 17:27:35 2016 -0600 +++ b/core/src/luan/modules/RpcLuan.java Wed Jul 13 20:39:08 2016 -0600 @@ -4,8 +4,10 @@ import java.io.OutputStream; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.InputStreamReader; import java.io.IOException; import java.io.EOFException; +import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.Set; import java.util.IdentityHashMap; @@ -26,53 +28,47 @@ private static final int NUMBER = 3; private static final int BINARY = 4; private static final int TABLE = 5; + private static final int IO = 6; @LuanMethod public static Object[] call(LuanState luan,LuanTable socketTbl,String fnName,Object... args) throws LuanException, IOException { - IoLuan.LuanSocket socket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); - InputStream in = new BufferedInputStream(socket.inputStream()); - OutputStream out = new BufferedOutputStream(socket.outputStream()); + IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); + Socket socket = luanSocket.socket; + InputStream in = new BufferedInputStream(socket.getInputStream()); + OutputStream out = new BufferedOutputStream(socket.getOutputStream()); + Close close = new Close(); try { writeString(out,fnName); - writeInt(out,args.length); - for( Object arg : args ) { - writeObj(out,luan,arg); - } + writeObjs(out,luan,args); out.flush(); + socket.shutdownOutput(); boolean ok = readBoolean(in); if( ok ) { - int n = readInt(in); - Object[] rtn = new Object[n]; - for( int i=0; i<n; i++ ) { - rtn[i] = readObj(in,luan); - } - return rtn; + return readObjs(in,luan,close); } else { String msg = readString(in); throw new LuanException(msg); } } finally { - out.close(); - in.close(); + if( close.b) { + socket.close(); + } } } public static void respond(LuanState luan,LuanTable socketTbl,LuanTable fns) throws IOException, LuanException { - IoLuan.LuanSocket socket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); - InputStream in = new BufferedInputStream(socket.inputStream()); - OutputStream out = new BufferedOutputStream(socket.outputStream()); + IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); + Socket socket = luanSocket.socket; + InputStream in = new BufferedInputStream(socket.getInputStream()); + OutputStream out = new BufferedOutputStream(socket.getOutputStream()); try { Object[] rtn; try { String fnName = readString(in); - int nArgs = readInt(in); - Object[] args = new Object[nArgs]; - for( int i=0; i<nArgs; i++ ) { - args[i] = readObj(in,luan); - } + Object[] args = readObjs(in,luan,null); LuanFunction fn = (LuanFunction)fns.get(luan,fnName); if( fn == null ) throw new LuanException( "function not found: " + fnName ); @@ -83,17 +79,46 @@ return; } writeBoolean(out,true); - writeInt(out,rtn.length); - for( Object obj : rtn ) { - writeObj(out,luan,obj); - } + writeObjs(out,luan,rtn); } finally { - out.close(); - in.close(); + out.flush(); + socket.close(); } } - static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException { + private static void writeObjs(OutputStream out,LuanState luan,Object[] a) throws IOException, LuanException { + IoLuan.LuanIn luanIn = null; + writeInt(out,a.length); + for( Object obj : a ) { + if( obj instanceof LuanTable ) { + LuanTable tbl = (LuanTable)obj; + Object java = tbl.rawGet("java"); + if( java instanceof IoLuan.LuanIn ) { + if( luanIn != null ) + throw new LuanException("can't have multiple IO params"); + luanIn = (IoLuan.LuanIn)java; + out.write(IO); + continue; + } + } + writeObj(out,luan,obj); + } + if( luanIn != null ) { + InputStream in = luanIn.inputStream(); + Utils.copyAll(in,out); + } + } + + private static Object[] readObjs(InputStream in,LuanState luan,Close close) throws IOException, LuanException { + int n = readInt(in); + Object[] rtn = new Object[n]; + for( int i=0; i<n; i++ ) { + rtn[i] = readObj(in,luan,close); + } + return rtn; + } + + private static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException { if( obj == null ) { out.write(NIL); } @@ -124,7 +149,7 @@ throw new LuanException( "invalid type: " + obj.getClass() ); } - static Object readObj(InputStream in,LuanState luan) throws IOException, LuanException { + private static Object readObj(InputStream in,LuanState luan,Close close) throws IOException, LuanException { int type = in.read(); switch(type) { case NIL: @@ -141,22 +166,24 @@ String s = readString(in); LuanFunction fn = Luan.load("return "+s,"rpc-reader"); return fn.call(luan); + case IO: + return new LuanInputStream(in,close).table(); default: throw new LuanException( "invalid type: " + type ); } } - static Boolean readBoolean(InputStream in) throws IOException { + private static Boolean readBoolean(InputStream in) throws IOException { return Boolean.valueOf(readString(in)); } - static String readString(InputStream in) throws IOException { + private static String readString(InputStream in) throws IOException { int len = readInt(in); byte[] a = readBinary(in,len); return new String(a,StandardCharsets.UTF_8); } - static int readInt(InputStream in) throws IOException { + private static int readInt(InputStream in) throws IOException { int ch1 = in.read(); int ch2 = in.read(); int ch3 = in.read(); @@ -166,7 +193,7 @@ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } - static byte[] readBinary(InputStream in,int size) throws IOException { + private static byte[] readBinary(InputStream in,int size) throws IOException { byte[] a = new byte[size]; int i = 0; while( i < size ) { @@ -178,17 +205,17 @@ return a; } - static void writeBoolean(OutputStream out,Boolean b) throws IOException { + private static void writeBoolean(OutputStream out,Boolean b) throws IOException { writeString(out,b.toString()); } - static void writeString(OutputStream out,String s) throws IOException { + private static void writeString(OutputStream out,String s) throws IOException { byte[] a = s.getBytes(StandardCharsets.UTF_8); writeInt(out,a.length); out.write(a); } - static void writeInt(OutputStream out,int v) throws IOException { + private static void writeInt(OutputStream out,int v) throws IOException { out.write((v >>> 24) & 0xFF); out.write((v >>> 16) & 0xFF); out.write((v >>> 8) & 0xFF); @@ -225,4 +252,52 @@ throw new LuanException( "invalid type: " + obj.getClass() ); } + + private static class Close { + boolean b = true; + } + + private static class LuanInputStream extends IoLuan.LuanIn { + private final InputStream in; + private final boolean close; + + public LuanInputStream(InputStream in,Close close) { + this.in = in; + this.close = close!=null && close.b; + if(this.close) close.b = false; + } + + private void close() throws IOException { + if(close) in.close(); + } + + @Override public InputStream inputStream() { + return in; + } + + @Override public String to_string() { + return "<input_stream>"; + } + + @Override public String to_uri_string() { + throw new UnsupportedOperationException(); + } + + @Override public String read_text() throws IOException { + String rtn = Utils.readAll(new InputStreamReader(in)); + close(); + return rtn; + } + + @Override public byte[] read_binary() throws IOException { + byte[] rtn = Utils.readAll(in); + close(); + return rtn; + } + + @Override public boolean exists() { + return true; + } + }; + }
