Mercurial Hosting > luan
changeset 954:a021c4c9c244
use just one SelectSet per SelectorManager
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 13 Oct 2016 00:54:10 -0600 |
parents | 7db4a488fc82 |
children | 6f49b8dfffe6 |
files | src/org/eclipse/jetty/io/nio/SelectorManager.java src/org/eclipse/jetty/server/Connector.java src/org/eclipse/jetty/server/nio/SelectChannelConnector.java |
diffstat | 3 files changed, 85 insertions(+), 174 deletions(-) [+] |
line wrap: on
line diff
diff -r 7db4a488fc82 -r a021c4c9c244 src/org/eclipse/jetty/io/nio/SelectorManager.java --- a/src/org/eclipse/jetty/io/nio/SelectorManager.java Wed Oct 12 22:16:36 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java Thu Oct 13 00:54:10 2016 -0600 @@ -60,9 +60,7 @@ private int _maxIdleTime; private long _lowResourcesConnections; - private SelectSet[] _selectSet; - private int _selectSets = 1; - private volatile int _set=0; + private SelectSet _selectSet; /* ------------------------------------------------------------ */ /** @@ -75,34 +73,14 @@ } /* ------------------------------------------------------------ */ - /** - * @param selectSets number of select sets to create - */ - public void setSelectSets(int selectSets) - { - long lrc = _lowResourcesConnections * _selectSets; - _selectSets=selectSets; - _lowResourcesConnections=lrc/_selectSets; - } - - /* ------------------------------------------------------------ */ /** Register a channel * @param channel */ public void register(SocketChannel channel) { - // The ++ increment here is not atomic, but it does not matter. - // so long as the value changes sometimes, then connections will - // be distributed over the available sets. - - int s = _set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet[] sets = _selectSet; - if (sets!=null) + SelectSet set = _selectSet; + if (set!=null) { - SelectSet set=sets[s]; set.addChange(channel); } } @@ -113,7 +91,7 @@ */ public long getLowResourcesConnections() { - return _lowResourcesConnections*_selectSets; + return _lowResourcesConnections; } /* ------------------------------------------------------------ */ @@ -125,7 +103,7 @@ */ public void setLowResourcesConnections(long lowResourcesConnections) { - _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; + _lowResourcesConnections = lowResourcesConnections; } @@ -138,55 +116,48 @@ @Override protected void doStart() throws Exception { - _selectSet = new SelectSet[_selectSets]; - for (int i=0;i<_selectSet.length;i++) - _selectSet[i]= new SelectSet(i); + _selectSet = new SelectSet(); super.doStart(); // start a thread to Select - for (int i=0;i<_selectSets;i++) + execute(new Runnable() { - final int id=i; - execute(new Runnable() + public void run() { - public void run() + String name=Thread.currentThread().getName(); + try { - String name=Thread.currentThread().getName(); - try - { - SelectSet[] sets=_selectSet; - if (sets==null) - return; - SelectSet set=sets[id]; + SelectSet set = _selectSet; + if (set==null) + return; - Thread.currentThread().setName(name+" Selector"+id); - LOG.debug("Starting {} on {}",Thread.currentThread(),this); - while (isRunning()) + Thread.currentThread().setName(name+" Selector"); + LOG.debug("Starting {} on {}",Thread.currentThread(),this); + while (isRunning()) + { + try { - try - { - set.doSelect(); - } - catch(IOException e) - { - LOG.trace("",e); - } - catch(Exception e) - { - LOG.warn("",e); - } + set.doSelect(); + } + catch(IOException e) + { + LOG.trace("",e); + } + catch(Exception e) + { + LOG.warn("",e); } } - finally - { - LOG.debug("Stopped {} on {}",Thread.currentThread(),this); - Thread.currentThread().setName(name); - } } + finally + { + LOG.debug("Stopped {} on {}",Thread.currentThread(),this); + Thread.currentThread().setName(name); + } + } - }); - } + }); } @@ -194,15 +165,11 @@ @Override protected void doStop() throws Exception { - SelectSet[] sets= _selectSet; - _selectSet=null; - if (sets!=null) + SelectSet set = _selectSet; + _selectSet = null; + if (set!=null) { - for (SelectSet set : sets) - { - if (set!=null) - set.stop(); - } + set.stop(); } super.doStop(); } @@ -217,25 +184,20 @@ public void dump(Appendable out, String indent) throws IOException { AggregateLifeCycle.dumpObject(out,this); - AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); + AggregateLifeCycle.dump(out,indent,Collections.singletonList(_selectSet)); } - public class SelectSet implements Dumpable + public final class SelectSet implements Dumpable { - private final int _setID; private volatile long _now = System.currentTimeMillis(); - private volatile SaneSelector _selector; + private final SaneSelector _selector; - private volatile Thread _selecting; private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); - SelectSet(int acceptorID) throws Exception + SelectSet() throws IOException { - _setID=acceptorID; - - // create a selector; _selector = new SaneSelector(); } @@ -260,26 +222,15 @@ } } } - /* ------------------------------------------------------------ */ - /** - * Select and dispatch tasks found from changes and the selector. - * - * @throws IOException - */ + private void doSelect() throws IOException { try { - _selecting=Thread.currentThread(); - final SaneSelector selector = _selector; - // Stopped concurrently ? - if (selector == null) - return; - - selector.select(); + _selector.select(); // Look for things to do - for (SelectionKey key: selector.selectedKeys()) + for (SelectionKey key: _selector.selectedKeys()) { try { @@ -296,7 +247,6 @@ SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); endpoint.schedule(); } - key = null; } catch (CancelledKeyException e) { @@ -312,7 +262,7 @@ } // Everything always handled - selector.selectedKeys().clear(); + _selector.selectedKeys().clear(); _now = System.currentTimeMillis(); } @@ -327,10 +277,6 @@ { LOG.trace("",e); } - finally - { - _selecting=null; - } } public SelectorManager getManager() @@ -369,18 +315,14 @@ // close endpoints and selector for (SelectionKey key : _selector.keys()) { - Object att=key.attachment(); - if (att instanceof EndPoint) + EndPoint endpoint = (EndPoint)key.attachment(); + try { - EndPoint endpoint = (EndPoint)att; - try - { - endpoint.close(); - } - catch(IOException e) - { - LOG.trace("",e); - } + endpoint.close(); + } + catch(IOException e) + { + LOG.trace("",e); } } @@ -392,7 +334,6 @@ { LOG.trace("",e); } - _selector = null; } public String dump() @@ -402,7 +343,7 @@ public void dump(Appendable out, String indent) throws IOException { - out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); + out.append(String.valueOf(this)).append("\n"); AggregateLifeCycle.dump(out,indent,Collections.emptyList()); }
diff -r 7db4a488fc82 -r a021c4c9c244 src/org/eclipse/jetty/server/Connector.java --- a/src/org/eclipse/jetty/server/Connector.java Wed Oct 12 22:16:36 2016 -0600 +++ b/src/org/eclipse/jetty/server/Connector.java Thu Oct 13 00:54:10 2016 -0600 @@ -54,7 +54,7 @@ * <li>Optional reverse proxy headers checking</li> * </ul> */ -public abstract class Connector extends AggregateLifeCycle implements HttpBuffers, Dumpable +public abstract class Connector extends AggregateLifeCycle implements HttpBuffers, Dumpable, Runnable { private static final Logger LOG = LoggerFactory.getLogger(Connector.class); @@ -63,7 +63,6 @@ public final Server server; private String _host; public final int port; - private int _acceptors = 1; protected final int _maxIdleTime = 200000; protected int _soLingerTime = -1; @@ -137,27 +136,6 @@ /* ------------------------------------------------------------ */ /** - * @return Returns the number of acceptor threads. - */ - public int getAcceptors() - { - return _acceptors; - } - - /* ------------------------------------------------------------ */ - /** - * @param acceptors - * The number of acceptor threads to set. - */ - public void setAcceptors(int acceptors) - { - if (acceptors > 2 * Runtime.getRuntime().availableProcessors()) - LOG.warn("Acceptors should be <=2*availableProcessors: " + this); - _acceptors = acceptors; - } - - /* ------------------------------------------------------------ */ - /** * @param soLingerTime * The soLingerTime to set or -1 to disable. */ @@ -173,8 +151,7 @@ // Start selector thread ThreadPoolExecutor _threadPool = server.threadPool; - for (int i = 0; i < _acceptors; i++) - _threadPool.execute(new Acceptor()); + _threadPool.execute(this); if (server.isLowOnThreads()) LOG.warn("insufficient threads configured for {}",this); @@ -322,46 +299,42 @@ } - private class Acceptor implements Runnable + public void run() { + Thread current = Thread.currentThread(); + String name = current.getName(); + current.setName(name + " Acceptor" + " " + Connector.this); - public void run() + try { - Thread current = Thread.currentThread(); - String name = current.getName(); - current.setName(name + " Acceptor" + " " + Connector.this); - - try + while (isRunning() && _acceptChannel != null) { - while (isRunning() && _acceptChannel != null) + try { - try - { - accept(); - } - catch (EofException e) - { - LOG.trace("",e); - } - catch (IOException e) - { - LOG.trace("",e); - } - catch (InterruptedException x) - { - // Connector has been stopped - LOG.trace("",x); - } - catch (Throwable e) - { - LOG.warn("",e); - } + accept(); + } + catch (EofException e) + { + LOG.trace("",e); + } + catch (IOException e) + { + LOG.trace("",e); + } + catch (InterruptedException x) + { + // Connector has been stopped + LOG.trace("",x); + } + catch (Throwable e) + { + LOG.warn("",e); } } - finally - { - current.setName(name); - } + } + finally + { + current.setName(name); } }
diff -r 7db4a488fc82 -r a021c4c9c244 src/org/eclipse/jetty/server/nio/SelectChannelConnector.java --- a/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java Wed Oct 12 22:16:36 2016 -0600 +++ b/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java Thu Oct 13 00:54:10 2016 -0600 @@ -32,7 +32,6 @@ import org.eclipse.jetty.io.nio.AsyncConnection; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.nio.SelectorManager; -import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.eclipse.jetty.server.AsyncHttpConnection; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Request; @@ -62,7 +61,6 @@ super(server,port); _manager.setMaxIdleTime(getMaxIdleTime()); addBean(_manager,true); - setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4)); } @Override @@ -101,7 +99,6 @@ @Override protected synchronized void doStart() throws Exception { - _manager.setSelectSets(getAcceptors()); // _manager.setMaxIdleTime(getMaxIdleTime()); _manager.setLowResourcesConnections(0);