comparison core/src/luan/modules/RpcLuan.java @ 744:4b8695f1cfc4

add rpc IO type
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 13 Jul 2016 20:39:08 -0600
parents 5578541125ea
children 1a101ac9ea46
comparison
equal deleted inserted replaced
743:2c41f2aec92f 744:4b8695f1cfc4
2 2
3 import java.io.InputStream; 3 import java.io.InputStream;
4 import java.io.OutputStream; 4 import java.io.OutputStream;
5 import java.io.BufferedInputStream; 5 import java.io.BufferedInputStream;
6 import java.io.BufferedOutputStream; 6 import java.io.BufferedOutputStream;
7 import java.io.InputStreamReader;
7 import java.io.IOException; 8 import java.io.IOException;
8 import java.io.EOFException; 9 import java.io.EOFException;
10 import java.net.Socket;
9 import java.nio.charset.StandardCharsets; 11 import java.nio.charset.StandardCharsets;
10 import java.util.Set; 12 import java.util.Set;
11 import java.util.IdentityHashMap; 13 import java.util.IdentityHashMap;
12 import java.util.Collections; 14 import java.util.Collections;
13 import java.util.Map; 15 import java.util.Map;
24 private static final int STRING = 1; 26 private static final int STRING = 1;
25 private static final int BOOLEAN = 2; 27 private static final int BOOLEAN = 2;
26 private static final int NUMBER = 3; 28 private static final int NUMBER = 3;
27 private static final int BINARY = 4; 29 private static final int BINARY = 4;
28 private static final int TABLE = 5; 30 private static final int TABLE = 5;
31 private static final int IO = 6;
29 32
30 @LuanMethod public static Object[] call(LuanState luan,LuanTable socketTbl,String fnName,Object... args) 33 @LuanMethod public static Object[] call(LuanState luan,LuanTable socketTbl,String fnName,Object... args)
31 throws LuanException, IOException 34 throws LuanException, IOException
32 { 35 {
33 IoLuan.LuanSocket socket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); 36 IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java");
34 InputStream in = new BufferedInputStream(socket.inputStream()); 37 Socket socket = luanSocket.socket;
35 OutputStream out = new BufferedOutputStream(socket.outputStream()); 38 InputStream in = new BufferedInputStream(socket.getInputStream());
39 OutputStream out = new BufferedOutputStream(socket.getOutputStream());
40 Close close = new Close();
36 try { 41 try {
37 writeString(out,fnName); 42 writeString(out,fnName);
38 writeInt(out,args.length); 43 writeObjs(out,luan,args);
39 for( Object arg : args ) {
40 writeObj(out,luan,arg);
41 }
42 out.flush(); 44 out.flush();
45 socket.shutdownOutput();
43 boolean ok = readBoolean(in); 46 boolean ok = readBoolean(in);
44 if( ok ) { 47 if( ok ) {
45 int n = readInt(in); 48 return readObjs(in,luan,close);
46 Object[] rtn = new Object[n];
47 for( int i=0; i<n; i++ ) {
48 rtn[i] = readObj(in,luan);
49 }
50 return rtn;
51 } else { 49 } else {
52 String msg = readString(in); 50 String msg = readString(in);
53 throw new LuanException(msg); 51 throw new LuanException(msg);
54 } 52 }
55 } finally { 53 } finally {
56 out.close(); 54 if( close.b) {
57 in.close(); 55 socket.close();
56 }
58 } 57 }
59 } 58 }
60 59
61 public static void respond(LuanState luan,LuanTable socketTbl,LuanTable fns) 60 public static void respond(LuanState luan,LuanTable socketTbl,LuanTable fns)
62 throws IOException, LuanException 61 throws IOException, LuanException
63 { 62 {
64 IoLuan.LuanSocket socket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); 63 IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java");
65 InputStream in = new BufferedInputStream(socket.inputStream()); 64 Socket socket = luanSocket.socket;
66 OutputStream out = new BufferedOutputStream(socket.outputStream()); 65 InputStream in = new BufferedInputStream(socket.getInputStream());
66 OutputStream out = new BufferedOutputStream(socket.getOutputStream());
67 try { 67 try {
68 Object[] rtn; 68 Object[] rtn;
69 try { 69 try {
70 String fnName = readString(in); 70 String fnName = readString(in);
71 int nArgs = readInt(in); 71 Object[] args = readObjs(in,luan,null);
72 Object[] args = new Object[nArgs];
73 for( int i=0; i<nArgs; i++ ) {
74 args[i] = readObj(in,luan);
75 }
76 LuanFunction fn = (LuanFunction)fns.get(luan,fnName); 72 LuanFunction fn = (LuanFunction)fns.get(luan,fnName);
77 if( fn == null ) 73 if( fn == null )
78 throw new LuanException( "function not found: " + fnName ); 74 throw new LuanException( "function not found: " + fnName );
79 rtn = Luan.array(fn.call(luan,args)); 75 rtn = Luan.array(fn.call(luan,args));
80 } catch(LuanException e) { 76 } catch(LuanException e) {
81 writeBoolean(out,false); 77 writeBoolean(out,false);
82 writeString(out,e.getFullMessage()); 78 writeString(out,e.getFullMessage());
83 return; 79 return;
84 } 80 }
85 writeBoolean(out,true); 81 writeBoolean(out,true);
86 writeInt(out,rtn.length); 82 writeObjs(out,luan,rtn);
87 for( Object obj : rtn ) {
88 writeObj(out,luan,obj);
89 }
90 } finally { 83 } finally {
91 out.close(); 84 out.flush();
92 in.close(); 85 socket.close();
93 } 86 }
94 } 87 }
95 88
96 static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException { 89 private static void writeObjs(OutputStream out,LuanState luan,Object[] a) throws IOException, LuanException {
90 IoLuan.LuanIn luanIn = null;
91 writeInt(out,a.length);
92 for( Object obj : a ) {
93 if( obj instanceof LuanTable ) {
94 LuanTable tbl = (LuanTable)obj;
95 Object java = tbl.rawGet("java");
96 if( java instanceof IoLuan.LuanIn ) {
97 if( luanIn != null )
98 throw new LuanException("can't have multiple IO params");
99 luanIn = (IoLuan.LuanIn)java;
100 out.write(IO);
101 continue;
102 }
103 }
104 writeObj(out,luan,obj);
105 }
106 if( luanIn != null ) {
107 InputStream in = luanIn.inputStream();
108 Utils.copyAll(in,out);
109 }
110 }
111
112 private static Object[] readObjs(InputStream in,LuanState luan,Close close) throws IOException, LuanException {
113 int n = readInt(in);
114 Object[] rtn = new Object[n];
115 for( int i=0; i<n; i++ ) {
116 rtn[i] = readObj(in,luan,close);
117 }
118 return rtn;
119 }
120
121 private static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException {
97 if( obj == null ) { 122 if( obj == null ) {
98 out.write(NIL); 123 out.write(NIL);
99 } 124 }
100 else if( obj instanceof String ) { 125 else if( obj instanceof String ) {
101 out.write(STRING); 126 out.write(STRING);
122 } 147 }
123 else 148 else
124 throw new LuanException( "invalid type: " + obj.getClass() ); 149 throw new LuanException( "invalid type: " + obj.getClass() );
125 } 150 }
126 151
127 static Object readObj(InputStream in,LuanState luan) throws IOException, LuanException { 152 private static Object readObj(InputStream in,LuanState luan,Close close) throws IOException, LuanException {
128 int type = in.read(); 153 int type = in.read();
129 switch(type) { 154 switch(type) {
130 case NIL: 155 case NIL:
131 return null; 156 return null;
132 case STRING: 157 case STRING:
139 return readBinary(in,readInt(in)); 164 return readBinary(in,readInt(in));
140 case TABLE: 165 case TABLE:
141 String s = readString(in); 166 String s = readString(in);
142 LuanFunction fn = Luan.load("return "+s,"rpc-reader"); 167 LuanFunction fn = Luan.load("return "+s,"rpc-reader");
143 return fn.call(luan); 168 return fn.call(luan);
169 case IO:
170 return new LuanInputStream(in,close).table();
144 default: 171 default:
145 throw new LuanException( "invalid type: " + type ); 172 throw new LuanException( "invalid type: " + type );
146 } 173 }
147 } 174 }
148 175
149 static Boolean readBoolean(InputStream in) throws IOException { 176 private static Boolean readBoolean(InputStream in) throws IOException {
150 return Boolean.valueOf(readString(in)); 177 return Boolean.valueOf(readString(in));
151 } 178 }
152 179
153 static String readString(InputStream in) throws IOException { 180 private static String readString(InputStream in) throws IOException {
154 int len = readInt(in); 181 int len = readInt(in);
155 byte[] a = readBinary(in,len); 182 byte[] a = readBinary(in,len);
156 return new String(a,StandardCharsets.UTF_8); 183 return new String(a,StandardCharsets.UTF_8);
157 } 184 }
158 185
159 static int readInt(InputStream in) throws IOException { 186 private static int readInt(InputStream in) throws IOException {
160 int ch1 = in.read(); 187 int ch1 = in.read();
161 int ch2 = in.read(); 188 int ch2 = in.read();
162 int ch3 = in.read(); 189 int ch3 = in.read();
163 int ch4 = in.read(); 190 int ch4 = in.read();
164 if ((ch1 | ch2 | ch3 | ch4) < 0) 191 if ((ch1 | ch2 | ch3 | ch4) < 0)
165 throw new EOFException(); 192 throw new EOFException();
166 return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); 193 return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
167 } 194 }
168 195
169 static byte[] readBinary(InputStream in,int size) throws IOException { 196 private static byte[] readBinary(InputStream in,int size) throws IOException {
170 byte[] a = new byte[size]; 197 byte[] a = new byte[size];
171 int i = 0; 198 int i = 0;
172 while( i < size ) { 199 while( i < size ) {
173 int n = in.read(a,i,size-i); 200 int n = in.read(a,i,size-i);
174 if( n == -1 ) 201 if( n == -1 )
176 i += n; 203 i += n;
177 } 204 }
178 return a; 205 return a;
179 } 206 }
180 207
181 static void writeBoolean(OutputStream out,Boolean b) throws IOException { 208 private static void writeBoolean(OutputStream out,Boolean b) throws IOException {
182 writeString(out,b.toString()); 209 writeString(out,b.toString());
183 } 210 }
184 211
185 static void writeString(OutputStream out,String s) throws IOException { 212 private static void writeString(OutputStream out,String s) throws IOException {
186 byte[] a = s.getBytes(StandardCharsets.UTF_8); 213 byte[] a = s.getBytes(StandardCharsets.UTF_8);
187 writeInt(out,a.length); 214 writeInt(out,a.length);
188 out.write(a); 215 out.write(a);
189 } 216 }
190 217
191 static void writeInt(OutputStream out,int v) throws IOException { 218 private static void writeInt(OutputStream out,int v) throws IOException {
192 out.write((v >>> 24) & 0xFF); 219 out.write((v >>> 24) & 0xFF);
193 out.write((v >>> 16) & 0xFF); 220 out.write((v >>> 16) & 0xFF);
194 out.write((v >>> 8) & 0xFF); 221 out.write((v >>> 8) & 0xFF);
195 out.write((v >>> 0) & 0xFF); 222 out.write((v >>> 0) & 0xFF);
196 } 223 }
223 return sb.toString(); 250 return sb.toString();
224 } 251 }
225 throw new LuanException( "invalid type: " + obj.getClass() ); 252 throw new LuanException( "invalid type: " + obj.getClass() );
226 } 253 }
227 254
255
256 private static class Close {
257 boolean b = true;
258 }
259
260 private static class LuanInputStream extends IoLuan.LuanIn {
261 private final InputStream in;
262 private final boolean close;
263
264 public LuanInputStream(InputStream in,Close close) {
265 this.in = in;
266 this.close = close!=null && close.b;
267 if(this.close) close.b = false;
268 }
269
270 private void close() throws IOException {
271 if(close) in.close();
272 }
273
274 @Override public InputStream inputStream() {
275 return in;
276 }
277
278 @Override public String to_string() {
279 return "<input_stream>";
280 }
281
282 @Override public String to_uri_string() {
283 throw new UnsupportedOperationException();
284 }
285
286 @Override public String read_text() throws IOException {
287 String rtn = Utils.readAll(new InputStreamReader(in));
288 close();
289 return rtn;
290 }
291
292 @Override public byte[] read_binary() throws IOException {
293 byte[] rtn = Utils.readAll(in);
294 close();
295 return rtn;
296 }
297
298 @Override public boolean exists() {
299 return true;
300 }
301 };
302
228 } 303 }