Mercurial Hosting > luan
diff src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 963:4b6216fa9cec
replace SelectChannelEndPoint._state with isDispatched
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Fri, 14 Oct 2016 00:15:28 -0600 |
parents | 94498d6daf5b |
children | 768414c16e10 |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Thu Oct 13 22:56:15 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Fri Oct 14 00:15:28 2016 -0600 @@ -60,12 +60,14 @@ * from HTTP to proxy connect or websocket. */ private volatile AsyncConnection _connection; - +/* private static final int STATE_NEEDS_DISPATCH = -1; private static final int STATE_UNDISPATCHED = 0; private static final int STATE_DISPATCHED = 1; private int _state; - +*/ + private boolean isDispatched = false; + /** true if the last write operation succeed and wrote all offered bytes */ private volatile boolean _writable = true; @@ -75,7 +77,7 @@ /** True if a thread has is blocked in {@link #blockWritable(long)} */ private boolean _writeBlocked; - private boolean _ishut; + private boolean _ishut = false; public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) throws IOException @@ -84,7 +86,6 @@ _manager = selectSet.getManager(); _selectSet = selectSet; - _state = STATE_UNDISPATCHED; _key = key; } @@ -106,12 +107,14 @@ */ public synchronized void schedule() { - // If there is no key, then do nothing if (!_key.isValid()) { +/* _readBlocked = false; _writeBlocked = false; this.notifyAll(); +*/ + _key.cancel(); return; } @@ -120,7 +123,7 @@ { // assert _dispatched; if (_readBlocked && _key.isReadable()) - _readBlocked=false; + _readBlocked = false; if (_writeBlocked && _key.isWritable()) _writeBlocked = false; @@ -129,7 +132,7 @@ // we are not interested in further selecting _key.interestOps(0); - if (_state<STATE_DISPATCHED) + if( !isDispatched ) updateKey(); return; } @@ -144,7 +147,7 @@ } // If dispatched, then deregister interest - if (_state>=STATE_DISPATCHED) + if (isDispatched) _key.interestOps(0); else { @@ -156,15 +159,15 @@ @Override public synchronized void dispatch() { - if (_state<=STATE_UNDISPATCHED) + if( !isDispatched ) { - _state = STATE_DISPATCHED; + isDispatched = true; try { _manager.execute(_handler); } catch(RejectedExecutionException e) { - _state = STATE_NEEDS_DISPATCH; + isDispatched = false; LOG.warn("Dispatched Failed! "+this+" to "+_manager); - updateKey(); +// updateKey(); } } } @@ -187,7 +190,7 @@ synchronized (this) { _writable = false; - if (_state<STATE_DISPATCHED) + if( !isDispatched ) updateKey(); } } @@ -209,7 +212,7 @@ synchronized (this) { _writable = false; - if (_state<STATE_DISPATCHED) + if( !isDispatched ) updateKey(); } } @@ -226,42 +229,39 @@ * Allows thread to block waiting for further events. */ @Override - public boolean blockReadable(long timeoutMs) throws IOException + public synchronized boolean blockReadable(long timeoutMs) throws IOException { - synchronized (this) - { - if (isInputShutdown()) - throw new EofException(); + if (isInputShutdown()) + throw new EofException(); - long now = _selectSet.getNow(); - long end = now+timeoutMs; - try + long now = _selectSet.getNow(); + long end = now+timeoutMs; + try + { + _readBlocked = true; + while (!isInputShutdown() && _readBlocked) { - _readBlocked = true; - while (!isInputShutdown() && _readBlocked) + try { - try - { - updateKey(); - this.wait(timeoutMs>0?(end-now):10000); - } - catch (final InterruptedException e) - { - LOG.warn("",e); - } - finally - { - now=_selectSet.getNow(); - } + updateKey(); + this.wait(timeoutMs>0?(end-now):10000); + } + catch (final InterruptedException e) + { + LOG.warn("",e); + } + finally + { + now = _selectSet.getNow(); + } - if (_readBlocked && timeoutMs>0 && now>=end) - return false; - } + if (_readBlocked && timeoutMs>0 && now>=end) + return false; } - finally - { - _readBlocked = false; - } + } + finally + { + _readBlocked = false; } return true; } @@ -271,41 +271,38 @@ * Allows thread to block waiting for further events. */ @Override - public boolean blockWritable(long timeoutMs) throws IOException + public synchronized boolean blockWritable(long timeoutMs) throws IOException { - synchronized (this) - { - if (isOutputShutdown()) - throw new EofException(); + if (isOutputShutdown()) + throw new EofException(); - long now=_selectSet.getNow(); - long end=now+timeoutMs; - try + long now=_selectSet.getNow(); + long end=now+timeoutMs; + try + { + _writeBlocked = true; + while (_writeBlocked && !isOutputShutdown()) { - _writeBlocked = true; - while (_writeBlocked && !isOutputShutdown()) + try { - try - { - updateKey(); - this.wait(timeoutMs>0?(end-now):10000); - } - catch (final InterruptedException e) - { - LOG.warn("",e); - } - finally - { - now=_selectSet.getNow(); - } - if (_writeBlocked && timeoutMs>0 && now>=end) - return false; + updateKey(); + this.wait(timeoutMs>0?(end-now):10000); + } + catch (final InterruptedException e) + { + LOG.warn("",e); } + finally + { + now = _selectSet.getNow(); + } + if (_writeBlocked && timeoutMs>0 && now>=end) + return false; } - finally - { - _writeBlocked = false; - } + } + finally + { + _writeBlocked = false; } return true; } @@ -326,8 +323,10 @@ { if( getChannel().isOpen() && _key.isValid()) { - boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); - boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); + boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended()); + boolean write_interest = _writeBlocked || (!isDispatched && !_writable); +// boolean write_interest = _writeBlocked || !isDispatched; +// boolean write_interest = true; int interestOps = ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) @@ -400,7 +399,7 @@ } finally { - _state = STATE_UNDISPATCHED; + isDispatched = false; updateKey(); } } @@ -445,11 +444,11 @@ { keyString += "!"; } - return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", + return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", hashCode(), _socket.getRemoteSocketAddress(), _socket.getLocalSocketAddress(), - _state, + isDispatched, isOpen(), isInputShutdown(), isOutputShutdown(),