Mercurial Hosting > luan
changeset 963:4b6216fa9cec
replace SelectChannelEndPoint._state with isDispatched
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Fri, 14 Oct 2016 00:15:28 -0600 |
parents | 94498d6daf5b |
children | 768414c16e10 |
files | src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java src/org/eclipse/jetty/server/AsyncHttpConnection.java |
diffstat | 2 files changed, 215 insertions(+), 216 deletions(-) [+] |
line wrap: on
line diff
diff -r 94498d6daf5b -r 4b6216fa9cec src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java --- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Thu Oct 13 22:56:15 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Fri Oct 14 00:15:28 2016 -0600 @@ -60,12 +60,14 @@ * from HTTP to proxy connect or websocket. */ private volatile AsyncConnection _connection; - +/* private static final int STATE_NEEDS_DISPATCH = -1; private static final int STATE_UNDISPATCHED = 0; private static final int STATE_DISPATCHED = 1; private int _state; - +*/ + private boolean isDispatched = false; + /** true if the last write operation succeed and wrote all offered bytes */ private volatile boolean _writable = true; @@ -75,7 +77,7 @@ /** True if a thread has is blocked in {@link #blockWritable(long)} */ private boolean _writeBlocked; - private boolean _ishut; + private boolean _ishut = false; public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) throws IOException @@ -84,7 +86,6 @@ _manager = selectSet.getManager(); _selectSet = selectSet; - _state = STATE_UNDISPATCHED; _key = key; } @@ -106,12 +107,14 @@ */ public synchronized void schedule() { - // If there is no key, then do nothing if (!_key.isValid()) { +/* _readBlocked = false; _writeBlocked = false; this.notifyAll(); +*/ + _key.cancel(); return; } @@ -120,7 +123,7 @@ { // assert _dispatched; if (_readBlocked && _key.isReadable()) - _readBlocked=false; + _readBlocked = false; if (_writeBlocked && _key.isWritable()) _writeBlocked = false; @@ -129,7 +132,7 @@ // we are not interested in further selecting _key.interestOps(0); - if (_state<STATE_DISPATCHED) + if( !isDispatched ) updateKey(); return; } @@ -144,7 +147,7 @@ } // If dispatched, then deregister interest - if (_state>=STATE_DISPATCHED) + if (isDispatched) _key.interestOps(0); else { @@ -156,15 +159,15 @@ @Override public synchronized void dispatch() { - if (_state<=STATE_UNDISPATCHED) + if( !isDispatched ) { - _state = STATE_DISPATCHED; + isDispatched = true; try { _manager.execute(_handler); } catch(RejectedExecutionException e) { - _state = STATE_NEEDS_DISPATCH; + isDispatched = false; LOG.warn("Dispatched Failed! "+this+" to "+_manager); - updateKey(); +// updateKey(); } } } @@ -187,7 +190,7 @@ synchronized (this) { _writable = false; - if (_state<STATE_DISPATCHED) + if( !isDispatched ) updateKey(); } } @@ -209,7 +212,7 @@ synchronized (this) { _writable = false; - if (_state<STATE_DISPATCHED) + if( !isDispatched ) updateKey(); } } @@ -226,42 +229,39 @@ * Allows thread to block waiting for further events. */ @Override - public boolean blockReadable(long timeoutMs) throws IOException + public synchronized boolean blockReadable(long timeoutMs) throws IOException { - synchronized (this) - { - if (isInputShutdown()) - throw new EofException(); + if (isInputShutdown()) + throw new EofException(); - long now = _selectSet.getNow(); - long end = now+timeoutMs; - try + long now = _selectSet.getNow(); + long end = now+timeoutMs; + try + { + _readBlocked = true; + while (!isInputShutdown() && _readBlocked) { - _readBlocked = true; - while (!isInputShutdown() && _readBlocked) + try { - try - { - updateKey(); - this.wait(timeoutMs>0?(end-now):10000); - } - catch (final InterruptedException e) - { - LOG.warn("",e); - } - finally - { - now=_selectSet.getNow(); - } + updateKey(); + this.wait(timeoutMs>0?(end-now):10000); + } + catch (final InterruptedException e) + { + LOG.warn("",e); + } + finally + { + now = _selectSet.getNow(); + } - if (_readBlocked && timeoutMs>0 && now>=end) - return false; - } + if (_readBlocked && timeoutMs>0 && now>=end) + return false; } - finally - { - _readBlocked = false; - } + } + finally + { + _readBlocked = false; } return true; } @@ -271,41 +271,38 @@ * Allows thread to block waiting for further events. */ @Override - public boolean blockWritable(long timeoutMs) throws IOException + public synchronized boolean blockWritable(long timeoutMs) throws IOException { - synchronized (this) - { - if (isOutputShutdown()) - throw new EofException(); + if (isOutputShutdown()) + throw new EofException(); - long now=_selectSet.getNow(); - long end=now+timeoutMs; - try + long now=_selectSet.getNow(); + long end=now+timeoutMs; + try + { + _writeBlocked = true; + while (_writeBlocked && !isOutputShutdown()) { - _writeBlocked = true; - while (_writeBlocked && !isOutputShutdown()) + try { - try - { - updateKey(); - this.wait(timeoutMs>0?(end-now):10000); - } - catch (final InterruptedException e) - { - LOG.warn("",e); - } - finally - { - now=_selectSet.getNow(); - } - if (_writeBlocked && timeoutMs>0 && now>=end) - return false; + updateKey(); + this.wait(timeoutMs>0?(end-now):10000); + } + catch (final InterruptedException e) + { + LOG.warn("",e); } + finally + { + now = _selectSet.getNow(); + } + if (_writeBlocked && timeoutMs>0 && now>=end) + return false; } - finally - { - _writeBlocked = false; - } + } + finally + { + _writeBlocked = false; } return true; } @@ -326,8 +323,10 @@ { if( getChannel().isOpen() && _key.isValid()) { - boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); - boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); + boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended()); + boolean write_interest = _writeBlocked || (!isDispatched && !_writable); +// boolean write_interest = _writeBlocked || !isDispatched; +// boolean write_interest = true; int interestOps = ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) @@ -400,7 +399,7 @@ } finally { - _state = STATE_UNDISPATCHED; + isDispatched = false; updateKey(); } } @@ -445,11 +444,11 @@ { keyString += "!"; } - return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", + return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", hashCode(), _socket.getRemoteSocketAddress(), _socket.getLocalSocketAddress(), - _state, + isDispatched, isOpen(), isInputShutdown(), isOutputShutdown(),
diff -r 94498d6daf5b -r 4b6216fa9cec src/org/eclipse/jetty/server/AsyncHttpConnection.java --- a/src/org/eclipse/jetty/server/AsyncHttpConnection.java Thu Oct 13 22:56:15 2016 -0600 +++ b/src/org/eclipse/jetty/server/AsyncHttpConnection.java Fri Oct 14 00:15:28 2016 -0600 @@ -37,159 +37,159 @@ */ public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection { - private final static int NO_PROGRESS_INFO = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_INFO",100); - private final static int NO_PROGRESS_CLOSE = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_CLOSE",200); + private final static int NO_PROGRESS_INFO = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_INFO",100); + private final static int NO_PROGRESS_CLOSE = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_CLOSE",200); - private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class); - private int _total_no_progress; - private final AsyncEndPoint _asyncEndp; - private boolean _readInterested = true; + private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class); + private int _total_no_progress; + private final AsyncEndPoint _asyncEndp; + private boolean _readInterested = true; - public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server) - { - super(connector,endpoint,server); - _asyncEndp=(AsyncEndPoint)endpoint; - } + public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server) + { + super(connector,endpoint,server); + _asyncEndp=(AsyncEndPoint)endpoint; + } - @Override - public Connection handle() throws IOException - { - Connection connection = this; - boolean some_progress=false; - boolean progress=true; + @Override + public Connection handle() throws IOException + { + Connection connection = this; + boolean some_progress = false; + boolean progress = true; - try - { - setCurrentConnection(this); + try + { + setCurrentConnection(this); - // While progress and the connection has not changed - while (progress && connection==this) - { - progress=false; - try - { - // Parse more input - if (!_parser.isComplete() && _parser.parseAvailable()) - progress=true; + // While progress and the connection has not changed + while (progress && connection==this) + { + progress=false; + try + { + // Parse more input + if (!_parser.isComplete() && _parser.parseAvailable()) + progress = true; - // Generate more output - if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown()) - if (_generator.flushBuffer()>0) - progress=true; + // Generate more output + if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown()) + if (_generator.flushBuffer()>0) + progress = true; - // Flush output - _endp.flush(); + // Flush output + _endp.flush(); - // Has any IO been done by the endpoint itself since last loop - if (_asyncEndp.hasProgressed()) - progress=true; - } - catch (HttpException e) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("uri="+_uri); - LOG.debug("fields="+_requestFields); - LOG.debug("",e); - } - progress=true; - _generator.sendError(e.getStatus(), e.getReason(), null, true); - } - finally - { - some_progress|=progress; - // Is this request/response round complete and are fully flushed? - boolean parserComplete = _parser.isComplete(); - boolean generatorComplete = _generator.isComplete(); - boolean complete = parserComplete && generatorComplete; - if (parserComplete) - { - if (generatorComplete) - { - // Reset the parser/generator - progress=true; + // Has any IO been done by the endpoint itself since last loop + if (_asyncEndp.hasProgressed()) + progress = true; + } + catch (HttpException e) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("uri="+_uri); + LOG.debug("fields="+_requestFields); + LOG.debug("",e); + } + progress = true; + _generator.sendError(e.getStatus(), e.getReason(), null, true); + } + finally + { + some_progress |= progress; + // Is this request/response round complete and are fully flushed? + boolean parserComplete = _parser.isComplete(); + boolean generatorComplete = _generator.isComplete(); + boolean complete = parserComplete && generatorComplete; + if (parserComplete) + { + if (generatorComplete) + { + // Reset the parser/generator + progress=true; - // look for a switched connection instance? - if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) - { - Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection"); - if (switched!=null) - connection=switched; - } + // look for a switched connection instance? + if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) + { + Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection"); + if (switched!=null) + connection=switched; + } - reset(); + reset(); - // TODO Is this still required? - if (!_generator.isPersistent() && !_endp.isOutputShutdown()) - { - LOG.warn("Safety net oshut!!! IF YOU SEE THIS, PLEASE RAISE BUGZILLA"); - _endp.shutdownOutput(); - } - } - else - { - // We have finished parsing, but not generating so - // we must not be interested in reading until we - // have finished generating and we reset the generator - _readInterested = false; - LOG.debug("Disabled read interest while writing response {}", _endp); - } - } - } - } - } - finally - { - setCurrentConnection(null); + // TODO Is this still required? + if (!_generator.isPersistent() && !_endp.isOutputShutdown()) + { + LOG.warn("Safety net oshut!!! IF YOU SEE THIS, PLEASE RAISE BUGZILLA"); + _endp.shutdownOutput(); + } + } + else + { + // We have finished parsing, but not generating so + // we must not be interested in reading until we + // have finished generating and we reset the generator + _readInterested = false; + LOG.debug("Disabled read interest while writing response {}", _endp); + } + } + } + } + } + finally + { + setCurrentConnection(null); - // return buffers - _parser.returnBuffers(); - _generator.returnBuffers(); + // return buffers + _parser.returnBuffers(); + _generator.returnBuffers(); - // Safety net to catch spinning - if (some_progress) - _total_no_progress=0; - else - { - _total_no_progress++; - if (NO_PROGRESS_INFO>0 && _total_no_progress%NO_PROGRESS_INFO==0 && (NO_PROGRESS_CLOSE<=0 || _total_no_progress< NO_PROGRESS_CLOSE)) - LOG.info("EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this); - if (NO_PROGRESS_CLOSE>0 && _total_no_progress==NO_PROGRESS_CLOSE) - { - LOG.warn("Closing EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this); - if (_endp instanceof SelectChannelEndPoint) - ((SelectChannelEndPoint)_endp).getChannel().close(); - } - } - } - return connection; - } + // Safety net to catch spinning + if (some_progress) + _total_no_progress = 0; + else + { + _total_no_progress++; + if (NO_PROGRESS_INFO>0 && _total_no_progress%NO_PROGRESS_INFO==0 && (NO_PROGRESS_CLOSE<=0 || _total_no_progress< NO_PROGRESS_CLOSE)) + LOG.info("EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this); + if (NO_PROGRESS_CLOSE>0 && _total_no_progress==NO_PROGRESS_CLOSE) + { + LOG.warn("Closing EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this); + if (_endp instanceof SelectChannelEndPoint) + ((SelectChannelEndPoint)_endp).getChannel().close(); + } + } + } + return connection; + } - public void onInputShutdown() throws IOException - { - // If we don't have a committed response and we are not suspended - if (_generator.isIdle()) - { - // then no more can happen, so close. - _endp.close(); - } + public void onInputShutdown() throws IOException + { + // If we don't have a committed response and we are not suspended + if (_generator.isIdle()) + { + // then no more can happen, so close. + _endp.close(); + } - // Make idle parser seek EOF - if (_parser.isIdle()) - _parser.setPersistent(false); - } + // Make idle parser seek EOF + if (_parser.isIdle()) + _parser.setPersistent(false); + } - @Override - public void reset() - { - _readInterested = true; - LOG.debug("Enabled read interest {}", _endp); - super.reset(); - } + @Override + public void reset() + { + _readInterested = true; + LOG.debug("Enabled read interest {}", _endp); + super.reset(); + } - @Override - public boolean isSuspended() - { - return !_readInterested || super.isSuspended(); - } + @Override + public boolean isSuspended() + { + return !_readInterested || super.isSuspended(); + } }