Mercurial Hosting > luan
changeset 1120:e8fc6712b468
luan Rpc uses luan.lib.rpc
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Mon, 07 Aug 2017 23:50:52 -0600 |
parents | 87c674f3f6b7 |
children | 4cf541886663 |
files | src/luan/LuanJavaFunction.java src/luan/lib/rpc/RpcCon.java src/luan/lib/rpc/RpcServer.java src/luan/modules/IoLuan.java src/luan/modules/JavaLuan.java src/luan/modules/Rpc.luan src/luan/modules/RpcLuan.java src/luan/modules/host/Hosting.luan |
diffstat | 8 files changed, 244 insertions(+), 342 deletions(-) [+] |
line wrap: on
line diff
--- a/src/luan/LuanJavaFunction.java Mon Aug 07 12:35:45 2017 -0600 +++ b/src/luan/LuanJavaFunction.java Mon Aug 07 23:50:52 2017 -0600 @@ -51,6 +51,10 @@ return argConverters.length; } + public boolean isVarArgs() { + return method.isVarArgs(); + } + @Override public Object call(LuanState luan,Object[] args) throws LuanException { try { args = fixArgs(luan,args);
--- a/src/luan/lib/rpc/RpcCon.java Mon Aug 07 12:35:45 2017 -0600 +++ b/src/luan/lib/rpc/RpcCon.java Mon Aug 07 23:50:52 2017 -0600 @@ -18,6 +18,7 @@ final OutputStream out; InputStream inBinary = null; long lenBinary = -1; + boolean readSome = false; RpcCon(Socket socket) throws RpcError @@ -90,6 +91,7 @@ inBinary = null; lenBinary = -1; } + readSome = false; byte[] a = new byte[4]; readAll(a); int len = 0; @@ -122,6 +124,7 @@ n = in.read( a, total, a.length-total ); if( n == -1 ) throw new EOFException(); + readSome = true; total += n; } }
--- a/src/luan/lib/rpc/RpcServer.java Mon Aug 07 12:35:45 2017 -0600 +++ b/src/luan/lib/rpc/RpcServer.java Mon Aug 07 23:50:52 2017 -0600 @@ -1,5 +1,6 @@ package luan.lib.rpc; +import java.io.EOFException; import java.net.Socket; import java.util.List; import java.util.ArrayList; @@ -16,10 +17,16 @@ public RpcCall read() throws RpcError { - List list = readJson(); - String cmd = (String)list.remove(0); - Object[] args = list.toArray(); - return new RpcCall(inBinary,lenBinary,cmd,args); + try { + List list = readJson(); + String cmd = (String)list.remove(0); + Object[] args = list.toArray(); + return new RpcCall(inBinary,lenBinary,cmd,args); + } catch(RpcError e) { + if( !readSome && e.getCause() instanceof EOFException ) + return null; + throw e; + } } public void write(RpcResult result) @@ -38,6 +45,7 @@ { List list = new ArrayList(); list.add(false); + list.add(ex.getMessage()); for( Object val : ex.values ) { list.add(val); }
--- a/src/luan/modules/IoLuan.java Mon Aug 07 12:35:45 2017 -0600 +++ b/src/luan/modules/IoLuan.java Mon Aug 07 23:50:52 2017 -0600 @@ -864,6 +864,32 @@ } + + public static class LuanInput extends LuanIn { + private final InputStream in; + + public LuanInput(InputStream in) { + this.in = in; + } + + @Override public InputStream inputStream() { + return in; + } + + @Override public String to_string() { + return "<input_stream>"; + } + + @Override public String to_uri_string() { + throw new UnsupportedOperationException(); + } + + @Override public boolean exists() { + return true; + } + }; + + public static String ip(String domain) { try { return InetAddress.getByName(domain).getHostAddress();
--- a/src/luan/modules/JavaLuan.java Mon Aug 07 12:35:45 2017 -0600 +++ b/src/luan/modules/JavaLuan.java Mon Aug 07 23:50:52 2017 -0600 @@ -16,6 +16,7 @@ import java.util.Iterator; import java.util.Collections; import java.util.Arrays; +import java.util.Comparator; import luan.Luan; import luan.LuanState; import luan.LuanTable; @@ -355,23 +356,43 @@ return new Static(cls); } + private static final Comparator<LuanJavaFunction> varArgsSorter = new Comparator<LuanJavaFunction>() { + public int compare(LuanJavaFunction fn1,LuanJavaFunction fn2) { + return fn2.getParameterCount() - fn1.getParameterCount(); + } + }; + private static class AmbiguousJavaFunction extends LuanFunction { private final Map<Integer,List<LuanJavaFunction>> fnMap = new HashMap<Integer,List<LuanJavaFunction>>(); + private List<LuanJavaFunction> varArgs = new ArrayList<LuanJavaFunction>(); AmbiguousJavaFunction(List<LuanJavaFunction> fns) { for( LuanJavaFunction fn : fns ) { - Integer n = fn.getParameterCount(); - List<LuanJavaFunction> list = fnMap.get(n); - if( list==null ) { - list = new ArrayList<LuanJavaFunction>(); - fnMap.put(n,list); + if( fn.isVarArgs() ) { + varArgs.add(fn); + } else { + Integer n = fn.getParameterCount(); + List<LuanJavaFunction> list = fnMap.get(n); + if( list==null ) { + list = new ArrayList<LuanJavaFunction>(); + fnMap.put(n,list); + } + list.add(fn); } - list.add(fn); } + Collections.sort(varArgs,varArgsSorter); } @Override public Object call(LuanState luan,Object[] args) throws LuanException { - for( LuanJavaFunction fn : fnMap.get(args.length) ) { + List<LuanJavaFunction> list = fnMap.get(args.length); + if( list != null ) { + for( LuanJavaFunction fn : list ) { + try { + return fn.rawCall(luan,args); + } catch(IllegalArgumentException e) {} + } + } + for( LuanJavaFunction fn : varArgs ) { try { return fn.rawCall(luan,args); } catch(IllegalArgumentException e) {}
--- a/src/luan/modules/Rpc.luan Mon Aug 07 12:35:45 2017 -0600 +++ b/src/luan/modules/Rpc.luan Mon Aug 07 23:50:52 2017 -0600 @@ -1,12 +1,27 @@ java() -local RpcLuan = require "java:luan.modules.RpcLuan" +local RpcClient = require "java:luan.lib.rpc.RpcClient" +local RpcServer = require "java:luan.lib.rpc.RpcServer" +local RpcCall = require "java:luan.lib.rpc.RpcCall" +local RpcResult = require "java:luan.lib.rpc.RpcResult" +local RpcException = require "java:luan.lib.rpc.RpcException" +local JavaRpc = require "java:luan.lib.rpc.Rpc" +local JavaLuan = require "java:luan.Luan" +local JavaUtils = require "java:luan.modules.Utils" +local IoLuan = require "java:luan.modules.IoLuan" +local ByteArrayInputStream = require "java:java.io.ByteArrayInputStream" local Luan = require "luan:Luan.luan" local error = Luan.error local set_metatable = Luan.set_metatable or error() local try = Luan.try or error() +local ipairs = Luan.ipairs or error() +local assert_table = Luan.assert_table or error() +local type = Luan.type or error() local Io = require "luan:Io.luan" local Thread = require "luan:Thread.luan" -local Logging = require "luan:logging/Logging.luan" -- external dependency +local Table = require "luan:Table.luan" +local unpack = Table.unpack or error() +require "luan:logging/init.luan" -- initialize logging +local Logging = require "luan:logging/Logging.luan" local logger = Logging.logger "Rpc" @@ -14,55 +29,180 @@ Rpc.port = 9101 -Rpc.call = RpcLuan.call -- Rpc.call(socket,fn_name,...) +local function java_args(list) + for i,v in ipairs(list) do + list[i] = JavaLuan.toJava(v) + end + return unpack(list) +end + +local function luan_args(list,binary_in) + list = assert_table(list) + for i,v in ipairs(list) do + list[i] = JavaLuan.toLuan(v) + end + if binary_in ~= nil then + local i_in = list[#list] + list[#list] = nil + local type = list[i_in] + if type == "binary" then + list[i_in] = JavaUtils.readAll(binary_in) + elseif type == "input" then + list[i_in] = IoLuan.LuanInput.new(binary_in).table() + else + error(type) + end + end + return unpack(list) +end + +local function encode_binary(args) + local binary_in, len_in, i_in + for i,v in ipairs(args) do + if type(v) == "binary" then + binary_in==nil or error "can't have multiple binary args" + i_in = i + binary_in = ByteArrayInputStream.new(v) + len_in = #v + args[i] = "binary" + elseif type(v) == "table" and v.java ~= nil and v.java.instanceof(IoLuan.LuanFile) then + binary_in==nil or error "can't have multiple binary args" + i_in = i + binary_in = v.java.inputStream() + len_in = v.length() + args[i] = "input" + end + end + args[#args+1] = i_in + return binary_in, len_in +end + +function Rpc.caller(socket) + local java_socket = socket.java.socket + local client = RpcClient.new(java_socket) + return function(fn_name,...) + local args = {...} + local binary_in, len_in = encode_binary(args) + local call + if binary_in == nil then + call = RpcCall.new(fn_name,java_args(args)) + else + call = RpcCall.new(binary_in,len_in,fn_name,java_args(args)) + end + client.write(call) + if fn_name == "close" then + client.close() + return + end + return try { + function() + local result = client.read() + return luan_args(result.returnValues,result["in"]) + end + catch = function(e) + local cause = e.java.getCause() + if cause ~= nil and cause.instanceof(RpcException) and cause.getMessage() == "luan" then + error(cause.values.get(0)) + else + e.throw() + end + end + } + end_function +end_function Rpc.functions = {} -function Rpc.respond(socket,fns) - RpcLuan.respond( socket, fns or Rpc.functions ) -end +function Rpc.responder(socket,fns) + fns = fns or Rpc.functions + local java_socket = socket.java.socket + local server = RpcServer.new(java_socket) + local responder = {} + function responder.is_closed() + return server.isClosed() + end_function + function responder.respond() + local call = server.read() + if call==nil then return end + local cmd = call.cmd + if cmd == "close" then + server.close() + return + end_if + local fn = fns[cmd] + if fn == nil then + server.write(JavaRpc.COMMAND_NOT_FOUND) + return + end_if + local rtn = try { + function() + return {fn(luan_args(call.args,call["in"]))} + end + catch = function(e) + logger.warn(e) + local ex = RpcException.new("luan",e.get_message()) + server.write(ex) + return nil + end + } + if rtn==nil then return end + local binary_in, len_in = encode_binary(rtn) + local result + if binary_in == nil then + result = RpcResult.new(java_args(rtn)) + else + result = RpcResult.new(binary_in,len_in,java_args(rtn)) + end + server.write(result) + end + return responder +end_function function Rpc.remote_socket(socket_uri) + local socket = Io.uri(socket_uri) + local call = Rpc.caller(socket) local mt = {} function mt.__index(_,key) return function(...) - local socket = Io.uri(socket_uri) - return Rpc.call(socket,key,...) + return call(key,...) end end local t = {} set_metatable(t,mt) return t -end +end_function function Rpc.remote(domain) local socket = "socket:" .. domain .. ":" .. Rpc.port return Rpc.remote_socket(socket) -end +end_function function Rpc.serve(port,fns) - local server = Io.socket_server(port or Rpc.port) + local socket_server = Io.socket_server(port or Rpc.port) while true do try { function() - local socket = server() - local function respond() + local socket = socket_server() + local function server() try { function() - Rpc.respond(socket,fns) + local responder = Rpc.responder(socket) + while not responder.is_closed() do + responder.respond() + end end catch = function(e) logger.error(e) end } end - Thread.fork(respond) + Thread.fork(server) end catch = function(e) logger.error(e) end } - end -end + end_while +end_function return Rpc
--- a/src/luan/modules/RpcLuan.java Mon Aug 07 12:35:45 2017 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,311 +0,0 @@ -package luan.modules; - -import java.io.InputStream; -import java.io.OutputStream; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.InputStreamReader; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.EOFException; -import java.net.Socket; -import java.nio.charset.StandardCharsets; -import java.util.Set; -import java.util.IdentityHashMap; -import java.util.Collections; -import java.util.Map; -import luan.Luan; -import luan.LuanState; -import luan.LuanTable; -import luan.LuanFunction; -import luan.LuanException; -import luan.LuanMethod; -import luan.lib.json.JsonToString; -import luan.lib.json.JsonParser; -import luan.lib.parser.ParseException; - - -public final class RpcLuan { - private static final int NIL = 0; - private static final int STRING = 1; - private static final int BOOLEAN = 2; - private static final int NUMBER = 3; - private static final int BINARY = 4; - private static final int TABLE = 5; - private static final int IO = 6; - private static final int LONG = 7; - - @LuanMethod public static Object[] call(LuanState luan,LuanTable socketTbl,String fnName,Object... args) - throws LuanException, IOException - { - IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); - Socket socket = luanSocket.socket; - InputStream in = new BufferedInputStream(socket.getInputStream()); - OutputStream out = new BufferedOutputStream(socket.getOutputStream()); - Close close = new Close(); - try { - writeString(out,fnName); - writeObjs(out,luan,args); - out.flush(); - socket.shutdownOutput(); - boolean ok = readBoolean(in); - if( ok ) { - return readObjs(in,luan,close); - } else { - String msg = readString(in); - throw new LuanException(msg); - } - } finally { - if( close.b) { - socket.close(); - } - } - } - - public static void respond(LuanState luan,LuanTable socketTbl,LuanTable fns) - throws IOException, LuanException - { - IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); - Socket socket = luanSocket.socket; - InputStream in = new BufferedInputStream(socket.getInputStream()); - OutputStream out = new BufferedOutputStream(socket.getOutputStream()); - try { - Object[] rtn; - try { - String fnName = readString(in); - Object[] args = readObjs(in,luan,null); - LuanFunction fn = (LuanFunction)fns.get(luan,fnName); - if( fn == null ) - throw new LuanException( "function not found: " + fnName ); - rtn = Luan.array(fn.call(luan,args)); - } catch(LuanException e) { - writeBoolean(out,false); - writeString(out,e.getFullMessage()); - out.flush(); - return; - } - writeBoolean(out,true); - writeObjs(out,luan,rtn); - out.flush(); - } finally { - socket.close(); - } - } - - private static void writeObjs(OutputStream out,LuanState luan,Object[] a) throws IOException, LuanException { - IoLuan.LuanIn luanIn = null; - writeInt(out,a.length); - for( Object obj : a ) { - if( obj instanceof LuanTable ) { - LuanTable tbl = (LuanTable)obj; - Object java = tbl.rawGet("java"); - if( java instanceof IoLuan.LuanIn ) { - if( luanIn != null ) - throw new LuanException("can't have multiple IO params"); - luanIn = (IoLuan.LuanIn)java; - out.write(IO); - continue; - } - } - writeObj(out,luan,obj); - } - if( luanIn != null ) { - InputStream in = luanIn.inputStream(); - Utils.copyAll(in,out); - } - } - - private static Object[] readObjs(InputStream in,LuanState luan,Close close) throws IOException, LuanException { - int n = readInt(in); - Object[] rtn = new Object[n]; - for( int i=0; i<n; i++ ) { - rtn[i] = readObj(in,luan,close); - } - return rtn; - } - - private static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException { - if( obj == null ) { - out.write(NIL); - } - else if( obj instanceof String ) { - out.write(STRING); - writeString(out,(String)obj); - } - else if( obj instanceof Boolean ) { - out.write(BOOLEAN); - writeBoolean(out,(Boolean)obj); - } - else if( obj instanceof Long ) { - out.write(LONG); - writeString(out,obj.toString()); - } - else if( obj instanceof Number ) { - out.write(NUMBER); - writeString(out,obj.toString()); - } - else if( obj instanceof byte[] ) { - byte[] a = (byte[])obj; - out.write(BINARY); - writeInt(out,a.length); - out.write(a); - } - else if( obj instanceof LuanTable ) { - out.write(TABLE); -// String s = pickle( luan, obj, Collections.newSetFromMap(new IdentityHashMap<LuanTable,Boolean>()) ); - String s = JsonToString.toString(Luan.toJava(obj)); - writeString(out,s); - } - else - throw new LuanException( "invalid type: " + obj.getClass() ); - } - - private static Object readObj(InputStream in,LuanState luan,Close close) throws IOException, LuanException { - int type = in.read(); - switch(type) { - case NIL: - return null; - case STRING: - return readString(in); - case BOOLEAN: - return readBoolean(in); - case LONG: - return Long.valueOf(readString(in)); - case NUMBER: - return Double.valueOf(readString(in)); - case BINARY: - return readBinary(in,readInt(in)); - case TABLE: - String s = readString(in); -/* - LuanFunction fn = Luan.load("return "+s,"rpc-reader"); - return fn.call(luan); -*/ - try { - return Luan.toLuan(JsonParser.parse(s)); - } catch(ParseException e) { - throw new LuanException(e); - } - case IO: - return new LuanInputStream(in,close).table(); - default: - throw new LuanException( "invalid type: " + type ); - } - } - - private static Boolean readBoolean(InputStream in) throws IOException { - return Boolean.valueOf(readString(in)); - } - - private static String readString(InputStream in) throws IOException { - int len = readInt(in); - byte[] a = readBinary(in,len); - return new String(a,StandardCharsets.UTF_8); - } - - private static int readInt(InputStream in) throws IOException { - int ch1 = in.read(); - int ch2 = in.read(); - int ch3 = in.read(); - int ch4 = in.read(); - if ((ch1 | ch2 | ch3 | ch4) < 0) - throw new EOFException(); - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); - } - - private static byte[] readBinary(InputStream in,int size) throws IOException { - byte[] a = new byte[size]; - int i = 0; - while( i < size ) { - int n = in.read(a,i,size-i); - if( n == -1 ) - throw new EOFException(); - i += n; - } - return a; - } - - private static void writeBoolean(OutputStream out,Boolean b) throws IOException { - writeString(out,b.toString()); - } - - private static void writeString(OutputStream out,String s) throws IOException { - byte[] a = s.getBytes(StandardCharsets.UTF_8); - writeInt(out,a.length); - out.write(a); - } - - private static void writeInt(OutputStream out,int v) throws IOException { - out.write((v >>> 24) & 0xFF); - out.write((v >>> 16) & 0xFF); - out.write((v >>> 8) & 0xFF); - out.write((v >>> 0) & 0xFF); - } - -/* - private static String pickle(LuanState luan,Object obj,Set<LuanTable> set) throws LuanException { - if( obj == null ) - return "nil"; - if( obj instanceof Boolean ) - return obj.toString(); - if( obj instanceof Number ) - return Luan.toString((Number)obj); - if( obj instanceof String ) - return "\"" + Luan.stringEncode((String)obj) + "\""; - if( obj instanceof LuanTable ) { - LuanTable tbl = (LuanTable)obj; - if( !set.add(tbl) ) { - throw new LuanException( "circular reference in table" ); - } - StringBuilder sb = new StringBuilder(); - sb.append( "{" ); - for( Map.Entry<Object,Object> entry : tbl.iterable(luan) ) { - sb.append( "[" ); - sb.append( pickle(luan,entry.getKey(),set) ); - sb.append( "]=" ); - sb.append( pickle(luan,entry.getValue(),set) ); - sb.append( ", " ); - } - sb.append( "}" ); - return sb.toString(); - } - throw new LuanException( "invalid type: " + obj.getClass() ); - } -*/ - - private static class Close { - boolean b = true; - } - - private static class LuanInputStream extends IoLuan.LuanIn { - private final InputStream in; - private final boolean close; - - public LuanInputStream(InputStream in,Close close) { - this.in = in; - this.close = close!=null && close.b; - if(this.close) close.b = false; - } - - @Override public InputStream inputStream() { - return new FilterInputStream(in) { - @Override public void close() throws IOException { - if(close) super.close(); - } - }; - } - - @Override public String to_string() { - return "<input_stream>"; - } - - @Override public String to_uri_string() { - throw new UnsupportedOperationException(); - } - - @Override public boolean exists() { - return true; - } - }; - -}
--- a/src/luan/modules/host/Hosting.luan Mon Aug 07 12:35:45 2017 -0600 +++ b/src/luan/modules/host/Hosting.luan Mon Aug 07 23:50:52 2017 -0600 @@ -59,26 +59,34 @@ process( nil, tree, my_dir ) host.update_handler(domain,password) + host.close() end function Hosting.delete(domain,password) local host = Rpc.remote(domain) host.delete(domain,password) + host.close() end function Hosting.exists(domain) local host = Rpc.remote(domain) - return host.exists(domain) + local rtn = host.exists(domain) + host.close() + return rtn end function Hosting.change_domain(old_domain,new_domain,password) local host = Rpc.remote(new_domain) - return host.change_domain(old_domain,new_domain,password) + local rtn = host.change_domain(old_domain,new_domain,password) + host.close() + return rtn end function Hosting.change_password(domain,old_password,new_password) local host = Rpc.remote(domain) - return host.change_password(domain,old_password,new_password) + local rtn = host.change_password(domain,old_password,new_password) + host.close() + return rtn end function Hosting.caller(domain) @@ -86,6 +94,9 @@ local mt = {} function mt.__index(_,key) return function(...) + if key == "close" then + return host.close() + end return host.call(domain,key,...) end end