Mercurial Hosting > luan
diff src/org/eclipse/jetty/server/nio/SelectChannelConnector.java @ 864:e21ca9878a10
simplify ThreadPool use
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 16:17:38 -0600 |
parents | 3428c60d7cfc |
children | 6b210bb66c63 |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java Sun Oct 02 05:22:55 2016 -0600 +++ b/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java Sun Oct 02 16:17:38 2016 -0600 @@ -66,269 +66,260 @@ */ public class SelectChannelConnector extends AbstractNIOConnector { - protected ServerSocketChannel _acceptChannel; - private int _lowResourcesConnections; - private int _lowResourcesMaxIdleTime; - private int _localPort=-1; + protected ServerSocketChannel _acceptChannel; + private int _lowResourcesConnections; + private int _lowResourcesMaxIdleTime; + private int _localPort=-1; - private final SelectorManager _manager = new ConnectorSelectorManager(); + private final SelectorManager _manager = new ConnectorSelectorManager(); - /* ------------------------------------------------------------------------------- */ - /** - * Constructor. - * - */ - public SelectChannelConnector() - { - _manager.setMaxIdleTime(getMaxIdleTime()); - addBean(_manager,true); - setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4)); - } - - @Override - public void setThreadPool(ThreadPool pool) - { - super.setThreadPool(pool); - // preserve start order - removeBean(_manager); - addBean(_manager,true); - } - - /* ------------------------------------------------------------ */ - @Override - public void accept(int acceptorID) throws IOException - { - ServerSocketChannel server; - synchronized(this) - { - server = _acceptChannel; - } + /* ------------------------------------------------------------------------------- */ + /** + * Constructor. + * + */ + public SelectChannelConnector() + { + _manager.setMaxIdleTime(getMaxIdleTime()); + addBean(_manager,true); + setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4)); + } + + /* ------------------------------------------------------------ */ + @Override + public void accept(int acceptorID) throws IOException + { + ServerSocketChannel server; + synchronized(this) + { + server = _acceptChannel; + } + + if (server!=null && server.isOpen() && _manager.isStarted()) + { + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + Socket socket = channel.socket(); + configure(socket); + _manager.register(channel); + } + } - if (server!=null && server.isOpen() && _manager.isStarted()) - { - SocketChannel channel = server.accept(); - channel.configureBlocking(false); - Socket socket = channel.socket(); - configure(socket); - _manager.register(channel); - } - } + /* ------------------------------------------------------------ */ + public void close() throws IOException + { + synchronized(this) + { + if (_acceptChannel != null) + { + removeBean(_acceptChannel); + if (_acceptChannel.isOpen()) + _acceptChannel.close(); + } + _acceptChannel = null; + _localPort=-2; + } + } - /* ------------------------------------------------------------ */ - public void close() throws IOException - { - synchronized(this) - { - if (_acceptChannel != null) - { - removeBean(_acceptChannel); - if (_acceptChannel.isOpen()) - _acceptChannel.close(); - } - _acceptChannel = null; - _localPort=-2; - } - } + /* ------------------------------------------------------------------------------- */ + @Override + public void customize(EndPoint endpoint, Request request) throws IOException + { + request.setTimeStamp(System.currentTimeMillis()); + endpoint.setMaxIdleTime(_maxIdleTime); + super.customize(endpoint, request); + } - /* ------------------------------------------------------------------------------- */ - @Override - public void customize(EndPoint endpoint, Request request) throws IOException - { - request.setTimeStamp(System.currentTimeMillis()); - endpoint.setMaxIdleTime(_maxIdleTime); - super.customize(endpoint, request); - } + /* ------------------------------------------------------------------------------- */ + @Override + public void persist(EndPoint endpoint) throws IOException + { + AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint); + aEndp.setCheckForIdle(true); + super.persist(endpoint); + } - /* ------------------------------------------------------------------------------- */ - @Override - public void persist(EndPoint endpoint) throws IOException - { - AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint); - aEndp.setCheckForIdle(true); - super.persist(endpoint); - } + /* ------------------------------------------------------------ */ + public SelectorManager getSelectorManager() + { + return _manager; + } - /* ------------------------------------------------------------ */ - public SelectorManager getSelectorManager() - { - return _manager; - } + /* ------------------------------------------------------------ */ + public synchronized Object getConnection() + { + return _acceptChannel; + } - /* ------------------------------------------------------------ */ - public synchronized Object getConnection() - { - return _acceptChannel; - } + /* ------------------------------------------------------------------------------- */ + public int getLocalPort() + { + synchronized(this) + { + return _localPort; + } + } - /* ------------------------------------------------------------------------------- */ - public int getLocalPort() - { - synchronized(this) - { - return _localPort; - } - } + /* ------------------------------------------------------------ */ + public void open() throws IOException + { + synchronized(this) + { + if (_acceptChannel == null) + { + // Create a new server socket + _acceptChannel = ServerSocketChannel.open(); + // Set to blocking mode + _acceptChannel.configureBlocking(true); - /* ------------------------------------------------------------ */ - public void open() throws IOException - { - synchronized(this) - { - if (_acceptChannel == null) - { - // Create a new server socket - _acceptChannel = ServerSocketChannel.open(); - // Set to blocking mode - _acceptChannel.configureBlocking(true); + // Bind the server socket to the local host and port + _acceptChannel.socket().setReuseAddress(getReuseAddress()); + InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); + _acceptChannel.socket().bind(addr,getAcceptQueueSize()); + + _localPort=_acceptChannel.socket().getLocalPort(); + if (_localPort<=0) + throw new IOException("Server channel not bound"); - // Bind the server socket to the local host and port - _acceptChannel.socket().setReuseAddress(getReuseAddress()); - InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); - _acceptChannel.socket().bind(addr,getAcceptQueueSize()); + addBean(_acceptChannel); + } + } + } - _localPort=_acceptChannel.socket().getLocalPort(); - if (_localPort<=0) - throw new IOException("Server channel not bound"); - - addBean(_acceptChannel); - } - } - } + /* ------------------------------------------------------------ */ + @Override + public void setMaxIdleTime(int maxIdleTime) + { + _manager.setMaxIdleTime(maxIdleTime); + super.setMaxIdleTime(maxIdleTime); + } - /* ------------------------------------------------------------ */ - @Override - public void setMaxIdleTime(int maxIdleTime) - { - _manager.setMaxIdleTime(maxIdleTime); - super.setMaxIdleTime(maxIdleTime); - } + /* ------------------------------------------------------------ */ + /** + * @return the lowResourcesConnections + */ + public int getLowResourcesConnections() + { + return _lowResourcesConnections; + } - /* ------------------------------------------------------------ */ - /** - * @return the lowResourcesConnections - */ - public int getLowResourcesConnections() - { - return _lowResourcesConnections; - } + /* ------------------------------------------------------------ */ + /** + * Set the number of connections, which if exceeded places this manager in low resources state. + * This is not an exact measure as the connection count is averaged over the select sets. + * @param lowResourcesConnections the number of connections + * @see #setLowResourcesMaxIdleTime(int) + */ + public void setLowResourcesConnections(int lowResourcesConnections) + { + _lowResourcesConnections=lowResourcesConnections; + } - /* ------------------------------------------------------------ */ - /** - * Set the number of connections, which if exceeded places this manager in low resources state. - * This is not an exact measure as the connection count is averaged over the select sets. - * @param lowResourcesConnections the number of connections - * @see #setLowResourcesMaxIdleTime(int) - */ - public void setLowResourcesConnections(int lowResourcesConnections) - { - _lowResourcesConnections=lowResourcesConnections; - } + /* ------------------------------------------------------------ */ + /** + * @return the lowResourcesMaxIdleTime + */ + @Override + public int getLowResourcesMaxIdleTime() + { + return _lowResourcesMaxIdleTime; + } - /* ------------------------------------------------------------ */ - /** - * @return the lowResourcesMaxIdleTime - */ - @Override - public int getLowResourcesMaxIdleTime() - { - return _lowResourcesMaxIdleTime; - } - - /* ------------------------------------------------------------ */ - /** - * Set the period in ms that a connection is allowed to be idle when this there are more - * than {@link #getLowResourcesConnections()} connections. This allows the server to rapidly close idle connections - * in order to gracefully handle high load situations. - * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low. - * @see #setMaxIdleTime(int) - */ - @Override - public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime) - { - _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime; - super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime); - } + /* ------------------------------------------------------------ */ + /** + * Set the period in ms that a connection is allowed to be idle when this there are more + * than {@link #getLowResourcesConnections()} connections. This allows the server to rapidly close idle connections + * in order to gracefully handle high load situations. + * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low. + * @see #setMaxIdleTime(int) + */ + @Override + public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime) + { + _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime; + super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime); + } - /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.jetty.server.server.AbstractConnector#doStart() - */ - @Override - protected void doStart() throws Exception - { - _manager.setSelectSets(getAcceptors()); - _manager.setMaxIdleTime(getMaxIdleTime()); - _manager.setLowResourcesConnections(getLowResourcesConnections()); - _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); + /* ------------------------------------------------------------ */ + /* + * @see org.eclipse.jetty.server.server.AbstractConnector#doStart() + */ + @Override + protected void doStart() throws Exception + { + _manager.setSelectSets(getAcceptors()); + _manager.setMaxIdleTime(getMaxIdleTime()); + _manager.setLowResourcesConnections(getLowResourcesConnections()); + _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); - super.doStart(); - } + super.doStart(); + } - /* ------------------------------------------------------------ */ - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException - { - SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); - endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); - return endp; - } + /* ------------------------------------------------------------ */ + protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException + { + SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); + endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); + return endp; + } - /* ------------------------------------------------------------------------------- */ - protected void endPointClosed(SelectChannelEndPoint endpoint) - { - connectionClosed(endpoint.getConnection()); - } + /* ------------------------------------------------------------------------------- */ + protected void endPointClosed(SelectChannelEndPoint endpoint) + { + connectionClosed(endpoint.getConnection()); + } - /* ------------------------------------------------------------------------------- */ - protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint) - { - return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer()); - } + /* ------------------------------------------------------------------------------- */ + protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint) + { + return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer()); + } - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - private final class ConnectorSelectorManager extends SelectorManager - { - @Override - public boolean dispatch(Runnable task) - { - ThreadPool pool=getThreadPool(); - if (pool==null) - pool=getServer().getThreadPool(); - return pool.dispatch(task); - } + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private final class ConnectorSelectorManager extends SelectorManager + { + @Override + public boolean dispatch(Runnable task) + { + ThreadPool pool=getThreadPool(); + if (pool==null) + pool=getServer().getThreadPool(); + return pool.dispatch(task); + } - @Override - protected void endPointClosed(final SelectChannelEndPoint endpoint) - { - SelectChannelConnector.this.endPointClosed(endpoint); - } + @Override + protected void endPointClosed(final SelectChannelEndPoint endpoint) + { + SelectChannelConnector.this.endPointClosed(endpoint); + } - @Override - protected void endPointOpened(SelectChannelEndPoint endpoint) - { - // TODO handle max connections and low resources - connectionOpened(endpoint.getConnection()); - } + @Override + protected void endPointOpened(SelectChannelEndPoint endpoint) + { + // TODO handle max connections and low resources + connectionOpened(endpoint.getConnection()); + } - @Override - protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) - { - connectionUpgraded(oldConnection,endpoint.getConnection()); - } + @Override + protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) + { + connectionUpgraded(oldConnection,endpoint.getConnection()); + } - @Override - public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment) - { - return SelectChannelConnector.this.newConnection(channel,endpoint); - } + @Override + public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment) + { + return SelectChannelConnector.this.newConnection(channel,endpoint); + } - @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException - { - return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey); - } - } + @Override + protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException + { + return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey); + } + } }