Mercurial Hosting > luan
diff src/org/eclipse/jetty/server/bio/SocketConnector.java @ 865:6b210bb66c63
remove ThreadPool
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 20:38:06 -0600 |
parents | 8e9db0bbf4f9 |
children |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/server/bio/SocketConnector.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/bio/SocketConnector.java Sun Oct 02 20:38:06 2016 -0600 @@ -25,6 +25,8 @@ import java.net.SocketException; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.io.Buffer; @@ -56,270 +58,274 @@ */ public class SocketConnector extends AbstractConnector { - private static final Logger LOG = LoggerFactory.getLogger(SocketConnector.class); + private static final Logger LOG = LoggerFactory.getLogger(SocketConnector.class); - protected ServerSocket _serverSocket; - protected final Set<EndPoint> _connections; - protected volatile int _localPort=-1; + protected ServerSocket _serverSocket; + protected final Set<EndPoint> _connections; + protected volatile int _localPort=-1; - /* ------------------------------------------------------------ */ - /** Constructor. - * - */ - public SocketConnector() - { - _connections=new HashSet<EndPoint>(); - } + /* ------------------------------------------------------------ */ + /** Constructor. + * + */ + public SocketConnector() + { + _connections=new HashSet<EndPoint>(); + } - /* ------------------------------------------------------------ */ - public Object getConnection() - { - return _serverSocket; - } + /* ------------------------------------------------------------ */ + public Object getConnection() + { + return _serverSocket; + } - /* ------------------------------------------------------------ */ - public void open() throws IOException - { - // Create a new server socket and set to non blocking mode - if (_serverSocket==null || _serverSocket.isClosed()) - _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize()); - _serverSocket.setReuseAddress(getReuseAddress()); - _localPort=_serverSocket.getLocalPort(); - if (_localPort<=0) - throw new IllegalStateException("port not allocated for "+this); + /* ------------------------------------------------------------ */ + public void open() throws IOException + { + // Create a new server socket and set to non blocking mode + if (_serverSocket==null || _serverSocket.isClosed()) + _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize()); + _serverSocket.setReuseAddress(getReuseAddress()); + _localPort=_serverSocket.getLocalPort(); + if (_localPort<=0) + throw new IllegalStateException("port not allocated for "+this); - } + } - /* ------------------------------------------------------------ */ - protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException - { - ServerSocket ss= host==null? - new ServerSocket(port,backlog): - new ServerSocket(port,backlog,InetAddress.getByName(host)); + /* ------------------------------------------------------------ */ + protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException + { + ServerSocket ss= host==null? + new ServerSocket(port,backlog): + new ServerSocket(port,backlog,InetAddress.getByName(host)); - return ss; - } + return ss; + } - /* ------------------------------------------------------------ */ - public void close() throws IOException - { - if (_serverSocket!=null) - _serverSocket.close(); - _serverSocket=null; - _localPort=-2; - } + /* ------------------------------------------------------------ */ + public void close() throws IOException + { + if (_serverSocket!=null) + _serverSocket.close(); + _serverSocket=null; + _localPort=-2; + } - /* ------------------------------------------------------------ */ - @Override - public void accept(int acceptorID) - throws IOException, InterruptedException - { - Socket socket = _serverSocket.accept(); - configure(socket); + /* ------------------------------------------------------------ */ + @Override + public void accept(int acceptorID) + throws IOException, InterruptedException + { + Socket socket = _serverSocket.accept(); + configure(socket); - ConnectorEndPoint connection=new ConnectorEndPoint(socket); - connection.dispatch(); - } + ConnectorEndPoint connection=new ConnectorEndPoint(socket); + connection.dispatch(); + } - /* ------------------------------------------------------------------------------- */ - /** - * Allows subclass to override Conection if required. - */ - protected Connection newConnection(EndPoint endpoint) - { - return new BlockingHttpConnection(this, endpoint, getServer()); - } + /* ------------------------------------------------------------------------------- */ + /** + * Allows subclass to override Conection if required. + */ + protected Connection newConnection(EndPoint endpoint) + { + return new BlockingHttpConnection(this, endpoint, getServer()); + } - /* ------------------------------------------------------------------------------- */ - @Override - public void customize(EndPoint endpoint, Request request) - throws IOException - { - ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; - int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; - connection.setMaxIdleTime(lrmit); + /* ------------------------------------------------------------------------------- */ + @Override + public void customize(EndPoint endpoint, Request request) + throws IOException + { + ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; + int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; + connection.setMaxIdleTime(lrmit); - super.customize(endpoint, request); - } + super.customize(endpoint, request); + } - /* ------------------------------------------------------------------------------- */ - public int getLocalPort() - { - return _localPort; - } + /* ------------------------------------------------------------------------------- */ + public int getLocalPort() + { + return _localPort; + } - /* ------------------------------------------------------------------------------- */ - @Override - protected void doStart() throws Exception - { - _connections.clear(); - super.doStart(); - } + /* ------------------------------------------------------------------------------- */ + @Override + protected void doStart() throws Exception + { + _connections.clear(); + super.doStart(); + } - /* ------------------------------------------------------------------------------- */ - @Override - protected void doStop() throws Exception - { - super.doStop(); - Set<EndPoint> set = new HashSet<EndPoint>(); - synchronized(_connections) - { - set.addAll(_connections); - } - for (EndPoint endPoint : set) - { - ConnectorEndPoint connection = (ConnectorEndPoint)endPoint; - connection.close(); - } - } + /* ------------------------------------------------------------------------------- */ + @Override + protected void doStop() throws Exception + { + super.doStop(); + Set<EndPoint> set = new HashSet<EndPoint>(); + synchronized(_connections) + { + set.addAll(_connections); + } + for (EndPoint endPoint : set) + { + ConnectorEndPoint connection = (ConnectorEndPoint)endPoint; + connection.close(); + } + } - @Override - public void dump(Appendable out, String indent) throws IOException - { - super.dump(out, indent); - Set<EndPoint> connections = new HashSet<EndPoint>(); - synchronized (_connections) - { - connections.addAll(_connections); - } - AggregateLifeCycle.dump(out, indent, connections); - } + @Override + public void dump(Appendable out, String indent) throws IOException + { + super.dump(out, indent); + Set<EndPoint> connections = new HashSet<EndPoint>(); + synchronized (_connections) + { + connections.addAll(_connections); + } + AggregateLifeCycle.dump(out, indent, connections); + } - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint - { - volatile Connection _connection; - protected final Socket _socket; + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint + { + volatile Connection _connection; + protected final Socket _socket; - public ConnectorEndPoint(Socket socket) throws IOException - { - super(socket,_maxIdleTime); - _connection = newConnection(this); - _socket=socket; - } + public ConnectorEndPoint(Socket socket) throws IOException + { + super(socket,_maxIdleTime); + _connection = newConnection(this); + _socket=socket; + } - public Connection getConnection() - { - return _connection; - } + public Connection getConnection() + { + return _connection; + } - public void setConnection(Connection connection) - { - if (_connection!=connection && _connection!=null) - connectionUpgraded(_connection,connection); - _connection=connection; - } + public void setConnection(Connection connection) + { + if (_connection!=connection && _connection!=null) + connectionUpgraded(_connection,connection); + _connection=connection; + } - public void dispatch() throws IOException - { - if (getThreadPool()==null || !getThreadPool().dispatch(this)) - { - LOG.warn("dispatch failed for {}",_connection); - close(); - } - } + public void dispatch() throws IOException + { + ThreadPoolExecutor tpe = getThreadPool(); + if( tpe != null ) { + try { + tpe.execute(this); + return; + } catch(RejectedExecutionException e) {} + } + LOG.warn("dispatch failed for {}",_connection); + close(); + } - @Override - public int fill(Buffer buffer) throws IOException - { - int l = super.fill(buffer); - if (l<0) - { - if (!isInputShutdown()) - shutdownInput(); - if (isOutputShutdown()) - close(); - } - return l; - } + @Override + public int fill(Buffer buffer) throws IOException + { + int l = super.fill(buffer); + if (l<0) + { + if (!isInputShutdown()) + shutdownInput(); + if (isOutputShutdown()) + close(); + } + return l; + } - @Override - public void close() throws IOException - { - if (_connection instanceof AbstractHttpConnection) - ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel(); - super.close(); - } - - public void run() - { - try - { - connectionOpened(_connection); - synchronized(_connections) - { - _connections.add(this); - } + @Override + public void close() throws IOException + { + if (_connection instanceof AbstractHttpConnection) + ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel(); + super.close(); + } - while (isStarted() && !isClosed()) - { - if (_connection.isIdle()) - { - if (isLowResources()) - setMaxIdleTime(getLowResourcesMaxIdleTime()); - } + public void run() + { + try + { + connectionOpened(_connection); + synchronized(_connections) + { + _connections.add(this); + } + + while (isStarted() && !isClosed()) + { + if (_connection.isIdle()) + { + if (isLowResources()) + setMaxIdleTime(getLowResourcesMaxIdleTime()); + } - _connection=_connection.handle(); - } - } - catch (EofException e) - { - LOG.debug("EOF", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (SocketException e) - { - LOG.debug("EOF", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (HttpException e) - { - LOG.debug("BAD", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch(Exception e) - { - LOG.warn("handle failed?",e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - finally - { - connectionClosed(_connection); - synchronized(_connections) - { - _connections.remove(this); - } + _connection=_connection.handle(); + } + } + catch (EofException e) + { + LOG.debug("EOF", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (SocketException e) + { + LOG.debug("EOF", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (HttpException e) + { + LOG.debug("BAD", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch(Exception e) + { + LOG.warn("handle failed?",e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + finally + { + connectionClosed(_connection); + synchronized(_connections) + { + _connections.remove(this); + } - // wait for client to close, but if not, close ourselves. - try - { - if (!_socket.isClosed()) - { - long timestamp=System.currentTimeMillis(); - int max_idle=getMaxIdleTime(); + // wait for client to close, but if not, close ourselves. + try + { + if (!_socket.isClosed()) + { + long timestamp=System.currentTimeMillis(); + int max_idle=getMaxIdleTime(); - _socket.setSoTimeout(getMaxIdleTime()); - int c=0; - do - { - c = _socket.getInputStream().read(); - } - while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); - if (!_socket.isClosed()) - _socket.close(); - } - } - catch(IOException e) - { - LOG.trace("",e); - } - } - } - } + _socket.setSoTimeout(getMaxIdleTime()); + int c=0; + do + { + c = _socket.getInputStream().read(); + } + while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); + if (!_socket.isClosed()) + _socket.close(); + } + } + catch(IOException e) + { + LOG.trace("",e); + } + } + } + } }