Mercurial Hosting > luan
changeset 953:7db4a488fc82
simplify SelectorManager
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 12 Oct 2016 22:16:36 -0600 |
parents | 669769bcdf5c |
children | a021c4c9c244 |
files | src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java src/org/eclipse/jetty/io/nio/SelectorManager.java |
diffstat | 2 files changed, 77 insertions(+), 154 deletions(-) [+] |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Wed Oct 12 19:47:45 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Wed Oct 12 22:16:36 2016 -0600 @@ -64,9 +64,9 @@ */ private volatile AsyncConnection _connection; - private static final int STATE_NEEDS_DISPATCH=-1; - private static final int STATE_UNDISPATCHED=0; - private static final int STATE_DISPATCHED=1; + private static final int STATE_NEEDS_DISPATCH = -1; + private static final int STATE_UNDISPATCHED = 0; + private static final int STATE_DISPATCHED = 1; private int _state; private boolean _onIdle; @@ -126,62 +126,59 @@ public void setConnection(Connection connection) { - _connection=(AsyncConnection)connection; + _connection = (AsyncConnection)connection; } /* ------------------------------------------------------------ */ /** Called by selectSet to schedule handling * */ - public void schedule() + public synchronized void schedule() { - synchronized (this) + // If there is no key, then do nothing + if (_key == null || !_key.isValid()) { - // If there is no key, then do nothing - if (_key == null || !_key.isValid()) - { + _readBlocked=false; + _writeBlocked=false; + this.notifyAll(); + return; + } + + // If there are threads dispatched reading and writing + if (_readBlocked || _writeBlocked) + { + // assert _dispatched; + if (_readBlocked && _key.isReadable()) _readBlocked=false; + if (_writeBlocked && _key.isWritable()) _writeBlocked=false; - this.notifyAll(); - return; - } - // If there are threads dispatched reading and writing - if (_readBlocked || _writeBlocked) - { - // assert _dispatched; - if (_readBlocked && _key.isReadable()) - _readBlocked=false; - if (_writeBlocked && _key.isWritable()) - _writeBlocked=false; - - // wake them up is as good as a dispatched. - this.notifyAll(); + // wake them up is as good as a dispatched. + this.notifyAll(); - // we are not interested in further selecting - _key.interestOps(0); - if (_state<STATE_DISPATCHED) - updateKey(); - return; - } + // we are not interested in further selecting + _key.interestOps(0); + if (_state<STATE_DISPATCHED) + updateKey(); + return; + } + // Remove writeable op + if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) + { // Remove writeable op - if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) - { - // Remove writeable op - _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; - _key.interestOps(_interestOps); - _writable = true; // Once writable is in ops, only removed with dispatch. - } + _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; + _key.interestOps(_interestOps); + _writable = true; // Once writable is in ops, only removed with dispatch. + } - // If dispatched, then deregister interest - if (_state>=STATE_DISPATCHED) - _key.interestOps(0); - else - { - // other wise do the dispatch - dispatch(); - } + // If dispatched, then deregister interest + if (_state>=STATE_DISPATCHED) + _key.interestOps(0); + else + { + // other wise do the dispatch + dispatch(); } } @@ -216,7 +213,6 @@ { _state = STATE_UNDISPATCHED; updateKey(); -// return true; } public void setCheckForIdle(boolean check) @@ -441,7 +437,6 @@ return true; } - /* ------------------------------------------------------------ */ public boolean hasProgressed() { return false; @@ -473,7 +468,7 @@ } catch(Exception e) { - _key=null; + _key = null; LOG.trace("",e); } } @@ -509,7 +504,7 @@ { try { - _key = _selectSet.getSelector().register((SelectableChannel)getChannel(),_interestOps,this); + _key = _selectSet.getSelector().register(sc,_interestOps,this); } catch (Exception e) { @@ -538,7 +533,7 @@ if (_key!=null && _key.isValid()) _key.interestOps(0); else - _key=null; + _key = null; } } else
--- a/src/org/eclipse/jetty/io/nio/SelectorManager.java Wed Oct 12 19:47:45 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java Wed Oct 12 22:16:36 2016 -0600 @@ -61,7 +61,7 @@ private int _maxIdleTime; private long _lowResourcesConnections; private SelectSet[] _selectSet; - private int _selectSets=1; + private int _selectSets = 1; private volatile int _set=0; /* ------------------------------------------------------------ */ @@ -242,9 +242,14 @@ private void addChange(SocketChannel channel) { try { - SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null); +//System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq a"); +// SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null); + SelectionKey key = _selector.register(channel,0,null); SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); + key.interestOps(SelectionKey.OP_READ); + _selector.update(); +//System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq b"); endpoint.schedule(); } catch(IOException e) { LOG.warn("",e); @@ -261,7 +266,7 @@ * * @throws IOException */ - public void doSelect() throws IOException + private void doSelect() throws IOException { try { @@ -276,8 +281,6 @@ // Look for things to do for (SelectionKey key: selector.selectedKeys()) { - SocketChannel channel=null; - try { if (!key.isValid()) @@ -289,50 +292,9 @@ continue; } - Object att = key.attachment(); - if (att instanceof SelectChannelEndPoint) - { - if (key.isReadable()||key.isWritable()) - ((SelectChannelEndPoint)att).schedule(); - } - else if (key.isConnectable()) - { - // Complete a connection of a registered channel - channel = (SocketChannel)key.channel(); - boolean connected=false; - try - { - connected=channel.finishConnect(); - } - catch(Exception e) - { - LOG.warn(e+","+channel+","+att); - LOG.debug("",e); - } - finally - { - if (connected) - { - key.interestOps(SelectionKey.OP_READ); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else - { - key.cancel(); - channel.close(); - } - } - } - else - { - // Wrap readable registered channel in an endpoint - channel = (SocketChannel)key.channel(); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - if (key.isReadable()) - endpoint.schedule(); + if (key.isReadable()||key.isWritable()) { + SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); + endpoint.schedule(); } key = null; } @@ -346,19 +308,6 @@ LOG.warn("",e); else LOG.trace("",e); - - try - { - if (channel!=null) - channel.close(); - } - catch(IOException e2) - { - LOG.debug("",e2); - } - - if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) - key.cancel(); } } @@ -415,56 +364,35 @@ return _selector; } - void stop() throws Exception + synchronized void stop() throws Exception { - // Spin for a while waiting for selector to complete - // to avoid unneccessary closed channel exceptions -/* - try + // close endpoints and selector + for (SelectionKey key : _selector.keys()) { - for (int i=0;i<100 && _selecting!=null;i++) + Object att=key.attachment(); + if (att instanceof EndPoint) { - _selector.wakeup(); - Thread.sleep(10); + EndPoint endpoint = (EndPoint)att; + try + { + endpoint.close(); + } + catch(IOException e) + { + LOG.trace("",e); + } } } - catch(Exception e) - { - LOG.warn("",e); - } -*/ - // close endpoints and selector - synchronized (this) + + try { - for (SelectionKey key : _selector.keys()) - { - if (key==null) - continue; - Object att=key.attachment(); - if (att instanceof EndPoint) - { - EndPoint endpoint = (EndPoint)att; - try - { - endpoint.close(); - } - catch(IOException e) - { - LOG.trace("",e); - } - } - } - - try - { - _selector.close(); - } - catch (IOException e) - { - LOG.trace("",e); - } - _selector = null; + _selector.close(); } + catch (IOException e) + { + LOG.trace("",e); + } + _selector = null; } public String dump()