Mercurial Hosting > luan
changeset 940:b77d631b9e28
remove scheduleTimeout() and cancelTimeout()
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Tue, 11 Oct 2016 00:13:30 -0600 |
parents | 8db5996c8c89 |
children | c948f674a2d5 |
files | src/org/eclipse/jetty/io/AsyncEndPoint.java src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java src/org/eclipse/jetty/io/nio/SelectorManager.java src/org/eclipse/jetty/io/nio/SslConnection.java |
diffstat | 4 files changed, 741 insertions(+), 793 deletions(-) [+] |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/io/AsyncEndPoint.java Mon Oct 10 00:50:06 2016 -0600 +++ b/src/org/eclipse/jetty/io/AsyncEndPoint.java Tue Oct 11 00:13:30 2016 -0600 @@ -18,66 +18,55 @@ package org.eclipse.jetty.io; -import org.eclipse.jetty.util.thread.Timeout; public interface AsyncEndPoint extends ConnectedEndPoint { - /* ------------------------------------------------------------ */ - /** - * Dispatch the endpoint if it is not already dispatched - * - */ - public void dispatch(); - - /* ------------------------------------------------------------ */ - /** - * Dispatch the endpoint. If it is already dispatched, schedule a redispatch - * - */ - public void asyncDispatch(); - - /* ------------------------------------------------------------ */ - /** Schedule a write dispatch. - * Set the endpoint to not be writable and schedule a dispatch when - * it becomes writable. - */ - public void scheduleWrite(); - - /* ------------------------------------------------------------ */ - /** Callback when idle. - * <p>An endpoint is idle if there has been no IO activity for - * {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true. - * @param idleForMs TODO - */ - public void onIdleExpired(long idleForMs); + /* ------------------------------------------------------------ */ + /** + * Dispatch the endpoint if it is not already dispatched + * + */ + public void dispatch(); + + /* ------------------------------------------------------------ */ + /** + * Dispatch the endpoint. If it is already dispatched, schedule a redispatch + * + */ + public void asyncDispatch(); + + /* ------------------------------------------------------------ */ + /** Schedule a write dispatch. + * Set the endpoint to not be writable and schedule a dispatch when + * it becomes writable. + */ + public void scheduleWrite(); - /* ------------------------------------------------------------ */ - /** Set if the endpoint should be checked for idleness - */ - public void setCheckForIdle(boolean check); + /* ------------------------------------------------------------ */ + /** Callback when idle. + * <p>An endpoint is idle if there has been no IO activity for + * {@link #getMaxIdleTime()} and {@link #isCheckForIdle()} is true. + * @param idleForMs TODO + */ + public void onIdleExpired(long idleForMs); - /* ------------------------------------------------------------ */ - /** Get if the endpoint should be checked for idleness - */ - public boolean isCheckForIdle(); - - - /* ------------------------------------------------------------ */ - public boolean isWritable(); + /* ------------------------------------------------------------ */ + /** Set if the endpoint should be checked for idleness + */ + public void setCheckForIdle(boolean check); - /* ------------------------------------------------------------ */ - /** - * @return True if IO has been successfully performed since the last call to {@link #hasProgressed()} - */ - public boolean hasProgressed(); - - /* ------------------------------------------------------------ */ - /** - */ - public void scheduleTimeout(Timeout.Task task, long timeoutMs); + /* ------------------------------------------------------------ */ + /** Get if the endpoint should be checked for idleness + */ + public boolean isCheckForIdle(); - /* ------------------------------------------------------------ */ - /** - */ - public void cancelTimeout(Timeout.Task task); + + /* ------------------------------------------------------------ */ + public boolean isWritable(); + + /* ------------------------------------------------------------ */ + /** + * @return True if IO has been successfully performed since the last call to {@link #hasProgressed()} + */ + public boolean hasProgressed(); }
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Mon Oct 10 00:50:06 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Tue Oct 11 00:13:30 2016 -0600 @@ -35,7 +35,7 @@ import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.eclipse.jetty.util.thread.Timeout.Task; + /* ------------------------------------------------------------ */ /** @@ -267,18 +267,6 @@ } /* ------------------------------------------------------------ */ - public void cancelTimeout(Task task) - { - getSelectSet().cancelTimeout(task); - } - - /* ------------------------------------------------------------ */ - public void scheduleTimeout(Task task, long timeoutMs) - { - getSelectSet().scheduleTimeout(task,timeoutMs); - } - - /* ------------------------------------------------------------ */ public void setCheckForIdle(boolean check) { if (check)
--- a/src/org/eclipse/jetty/io/nio/SelectorManager.java Mon Oct 10 00:50:06 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java Tue Oct 11 00:13:30 2016 -0600 @@ -732,25 +732,6 @@ return _timeout.getNow(); } - /* ------------------------------------------------------------ */ - /** - * @param task The task to timeout. If it implements Runnable, then - * expired will be called from a dispatched thread. - * - * @param timeoutMs - */ - public void scheduleTimeout(Timeout.Task task, long timeoutMs) - { - if (!(task instanceof Runnable)) - throw new IllegalArgumentException("!Runnable"); - _timeout.schedule(task, timeoutMs); - } - - public void cancelTimeout(Timeout.Task task) - { - task.cancel(); - } - public void wakeup() { try
--- a/src/org/eclipse/jetty/io/nio/SslConnection.java Mon Oct 10 00:50:06 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SslConnection.java Tue Oct 11 00:13:30 2016 -0600 @@ -34,7 +34,7 @@ import org.eclipse.jetty.io.EndPoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.eclipse.jetty.util.thread.Timeout.Task; + /* ------------------------------------------------------------ */ /** SSL Connection. @@ -47,819 +47,809 @@ */ public class SslConnection extends AbstractConnection implements AsyncConnection { - private final Logger _logger = LoggerFactory.getLogger("org.eclipse.jetty.io.nio.ssl"); + private final Logger _logger = LoggerFactory.getLogger("org.eclipse.jetty.io.nio.ssl"); - private static final NIOBuffer __ZERO_BUFFER=new IndirectNIOBuffer(0); + private static final NIOBuffer __ZERO_BUFFER=new IndirectNIOBuffer(0); - private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>(); - private final SSLEngine _engine; - private final SSLSession _session; - private AsyncConnection _connection; - private final SslEndPoint _sslEndPoint; - private int _allocations; - private SslBuffers _buffers; - private NIOBuffer _inbound; - private NIOBuffer _unwrapBuf; - private NIOBuffer _outbound; - private AsyncEndPoint _aEndp; - private boolean _allowRenegotiate=true; - private boolean _handshook; - private boolean _ishut; - private boolean _oshut; - private final AtomicBoolean _progressed = new AtomicBoolean(); + private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>(); + private final SSLEngine _engine; + private final SSLSession _session; + private AsyncConnection _connection; + private final SslEndPoint _sslEndPoint; + private int _allocations; + private SslBuffers _buffers; + private NIOBuffer _inbound; + private NIOBuffer _unwrapBuf; + private NIOBuffer _outbound; + private AsyncEndPoint _aEndp; + private boolean _allowRenegotiate=true; + private boolean _handshook; + private boolean _ishut; + private boolean _oshut; + private final AtomicBoolean _progressed = new AtomicBoolean(); - /* ------------------------------------------------------------ */ - /* this is a half baked buffer pool - */ - private static class SslBuffers - { - final NIOBuffer _in; - final NIOBuffer _out; - final NIOBuffer _unwrap; + /* ------------------------------------------------------------ */ + /* this is a half baked buffer pool + */ + private static class SslBuffers + { + final NIOBuffer _in; + final NIOBuffer _out; + final NIOBuffer _unwrap; - SslBuffers(int packetSize, int appSize) - { - _in=new IndirectNIOBuffer(packetSize); - _out=new IndirectNIOBuffer(packetSize); - _unwrap=new IndirectNIOBuffer(appSize); - } - } + SslBuffers(int packetSize, int appSize) + { + _in=new IndirectNIOBuffer(packetSize); + _out=new IndirectNIOBuffer(packetSize); + _unwrap=new IndirectNIOBuffer(appSize); + } + } - /* ------------------------------------------------------------ */ - public SslConnection(SSLEngine engine,EndPoint endp) - { - this(engine,endp,System.currentTimeMillis()); - } + /* ------------------------------------------------------------ */ + public SslConnection(SSLEngine engine,EndPoint endp) + { + this(engine,endp,System.currentTimeMillis()); + } - /* ------------------------------------------------------------ */ - public SslConnection(SSLEngine engine,EndPoint endp, long timeStamp) - { - super(endp,timeStamp); - _engine=engine; - _session=_engine.getSession(); - _aEndp=(AsyncEndPoint)endp; - _sslEndPoint = newSslEndPoint(); - } + /* ------------------------------------------------------------ */ + public SslConnection(SSLEngine engine,EndPoint endp, long timeStamp) + { + super(endp,timeStamp); + _engine=engine; + _session=_engine.getSession(); + _aEndp=(AsyncEndPoint)endp; + _sslEndPoint = newSslEndPoint(); + } - /* ------------------------------------------------------------ */ - protected SslEndPoint newSslEndPoint() - { - return new SslEndPoint(); - } + /* ------------------------------------------------------------ */ + protected SslEndPoint newSslEndPoint() + { + return new SslEndPoint(); + } - /* ------------------------------------------------------------ */ - /** - * @return True if SSL re-negotiation is allowed (default false) - */ - public boolean isAllowRenegotiate() - { - return _allowRenegotiate; - } + /* ------------------------------------------------------------ */ + /** + * @return True if SSL re-negotiation is allowed (default false) + */ + public boolean isAllowRenegotiate() + { + return _allowRenegotiate; + } - /* ------------------------------------------------------------ */ - /** - * Set if SSL re-negotiation is allowed. CVE-2009-3555 discovered - * a vulnerability in SSL/TLS with re-negotiation. If your JVM - * does not have CVE-2009-3555 fixed, then re-negotiation should - * not be allowed. CVE-2009-3555 was fixed in Sun java 1.6 with a ban - * of renegotiates in u19 and with RFC5746 in u22. - * - * @param allowRenegotiate - * true if re-negotiation is allowed (default false) - */ - public void setAllowRenegotiate(boolean allowRenegotiate) - { - _allowRenegotiate = allowRenegotiate; - } + /* ------------------------------------------------------------ */ + /** + * Set if SSL re-negotiation is allowed. CVE-2009-3555 discovered + * a vulnerability in SSL/TLS with re-negotiation. If your JVM + * does not have CVE-2009-3555 fixed, then re-negotiation should + * not be allowed. CVE-2009-3555 was fixed in Sun java 1.6 with a ban + * of renegotiates in u19 and with RFC5746 in u22. + * + * @param allowRenegotiate + * true if re-negotiation is allowed (default false) + */ + public void setAllowRenegotiate(boolean allowRenegotiate) + { + _allowRenegotiate = allowRenegotiate; + } - /* ------------------------------------------------------------ */ - private void allocateBuffers() - { - synchronized (this) - { - if (_allocations++==0) - { - if (_buffers==null) - { - _buffers=__buffers.get(); - if (_buffers==null) - _buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2); - _inbound=_buffers._in; - _outbound=_buffers._out; - _unwrapBuf=_buffers._unwrap; - __buffers.set(null); - } - } - } - } + /* ------------------------------------------------------------ */ + private void allocateBuffers() + { + synchronized (this) + { + if (_allocations++==0) + { + if (_buffers==null) + { + _buffers=__buffers.get(); + if (_buffers==null) + _buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2); + _inbound=_buffers._in; + _outbound=_buffers._out; + _unwrapBuf=_buffers._unwrap; + __buffers.set(null); + } + } + } + } - /* ------------------------------------------------------------ */ - private void releaseBuffers() - { - synchronized (this) - { - if (--_allocations==0) - { - if (_buffers!=null && - _inbound.length()==0 && - _outbound.length()==0 && - _unwrapBuf.length()==0) - { - _inbound=null; - _outbound=null; - _unwrapBuf=null; - __buffers.set(_buffers); - _buffers=null; - } - } - } - } + /* ------------------------------------------------------------ */ + private void releaseBuffers() + { + synchronized (this) + { + if (--_allocations==0) + { + if (_buffers!=null && + _inbound.length()==0 && + _outbound.length()==0 && + _unwrapBuf.length()==0) + { + _inbound=null; + _outbound=null; + _unwrapBuf=null; + __buffers.set(_buffers); + _buffers=null; + } + } + } + } - /* ------------------------------------------------------------ */ - public Connection handle() throws IOException - { - try - { - allocateBuffers(); + /* ------------------------------------------------------------ */ + public Connection handle() throws IOException + { + try + { + allocateBuffers(); - boolean progress=true; + boolean progress=true; - while (progress) - { - progress=false; + while (progress) + { + progress=false; - // If we are handshook let the delegate connection - if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING) - progress=process(null,null); + // If we are handshook let the delegate connection + if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING) + progress=process(null,null); - // handle the delegate connection - AsyncConnection next = (AsyncConnection)_connection.handle(); - if (next!=_connection && next!=null) - { - _connection=next; - progress=true; - } + // handle the delegate connection + AsyncConnection next = (AsyncConnection)_connection.handle(); + if (next!=_connection && next!=null) + { + _connection=next; + progress=true; + } - _logger.debug("{} handle {} progress={}", _session, this, progress); - } - } - finally - { - releaseBuffers(); + _logger.debug("{} handle {} progress={}", _session, this, progress); + } + } + finally + { + releaseBuffers(); - if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen()) - { - _ishut=true; - try - { - _connection.onInputShutdown(); - } - catch(Throwable x) - { - _logger.warn("onInputShutdown failed", x); - try{_sslEndPoint.close();} - catch(IOException e2){ - _logger.trace("",e2);} - } - } - } + if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen()) + { + _ishut=true; + try + { + _connection.onInputShutdown(); + } + catch(Throwable x) + { + _logger.warn("onInputShutdown failed", x); + try{_sslEndPoint.close();} + catch(IOException e2){ + _logger.trace("",e2);} + } + } + } - return this; - } + return this; + } - /* ------------------------------------------------------------ */ - public boolean isIdle() - { - return false; - } + /* ------------------------------------------------------------ */ + public boolean isIdle() + { + return false; + } - /* ------------------------------------------------------------ */ - public boolean isSuspended() - { - return false; - } + /* ------------------------------------------------------------ */ + public boolean isSuspended() + { + return false; + } - /* ------------------------------------------------------------ */ - public void onClose() - { - Connection connection = _sslEndPoint.getConnection(); - if (connection != null && connection != this) - connection.onClose(); - } + /* ------------------------------------------------------------ */ + public void onClose() + { + Connection connection = _sslEndPoint.getConnection(); + if (connection != null && connection != this) + connection.onClose(); + } - /* ------------------------------------------------------------ */ - @Override - public void onIdleExpired(long idleForMs) - { - try - { - _logger.debug("onIdleExpired {}ms on {}",idleForMs,this); - if (_endp.isOutputShutdown()) - _sslEndPoint.close(); - else - _sslEndPoint.shutdownOutput(); - } - catch (IOException e) - { - _logger.warn("",e); - super.onIdleExpired(idleForMs); - } - } + /* ------------------------------------------------------------ */ + @Override + public void onIdleExpired(long idleForMs) + { + try + { + _logger.debug("onIdleExpired {}ms on {}",idleForMs,this); + if (_endp.isOutputShutdown()) + _sslEndPoint.close(); + else + _sslEndPoint.shutdownOutput(); + } + catch (IOException e) + { + _logger.warn("",e); + super.onIdleExpired(idleForMs); + } + } - /* ------------------------------------------------------------ */ - public void onInputShutdown() throws IOException - { + /* ------------------------------------------------------------ */ + public void onInputShutdown() throws IOException + { - } + } - /* ------------------------------------------------------------ */ - private synchronized boolean process(Buffer toFill, Buffer toFlush) throws IOException - { - boolean some_progress=false; - try - { - // We need buffers to progress - allocateBuffers(); + /* ------------------------------------------------------------ */ + private synchronized boolean process(Buffer toFill, Buffer toFlush) throws IOException + { + boolean some_progress=false; + try + { + // We need buffers to progress + allocateBuffers(); - // if we don't have a buffer to put received data into - if (toFill==null) - { - // use the unwrapbuffer to hold received data. - _unwrapBuf.compact(); - toFill=_unwrapBuf; - } - // Else if the fill buffer is too small for the SSL session - else if (toFill.capacity()<_session.getApplicationBufferSize()) - { - // fill to the temporary unwrapBuffer - boolean progress=process(null,toFlush); + // if we don't have a buffer to put received data into + if (toFill==null) + { + // use the unwrapbuffer to hold received data. + _unwrapBuf.compact(); + toFill=_unwrapBuf; + } + // Else if the fill buffer is too small for the SSL session + else if (toFill.capacity()<_session.getApplicationBufferSize()) + { + // fill to the temporary unwrapBuffer + boolean progress=process(null,toFlush); - // if we received any data, - if (_unwrapBuf!=null && _unwrapBuf.hasContent()) - { - // transfer from temp buffer to fill buffer - _unwrapBuf.skip(toFill.put(_unwrapBuf)); - return true; - } - else - // return progress from recursive call - return progress; - } - // Else if there is some temporary data - else if (_unwrapBuf!=null && _unwrapBuf.hasContent()) - { - // transfer from temp buffer to fill buffer - _unwrapBuf.skip(toFill.put(_unwrapBuf)); - return true; - } + // if we received any data, + if (_unwrapBuf!=null && _unwrapBuf.hasContent()) + { + // transfer from temp buffer to fill buffer + _unwrapBuf.skip(toFill.put(_unwrapBuf)); + return true; + } + else + // return progress from recursive call + return progress; + } + // Else if there is some temporary data + else if (_unwrapBuf!=null && _unwrapBuf.hasContent()) + { + // transfer from temp buffer to fill buffer + _unwrapBuf.skip(toFill.put(_unwrapBuf)); + return true; + } - // If we are here, we have a buffer ready into which we can put some read data. + // If we are here, we have a buffer ready into which we can put some read data. - // If we have no data to flush, flush the empty buffer - if (toFlush==null) - toFlush=__ZERO_BUFFER; + // If we have no data to flush, flush the empty buffer + if (toFlush==null) + toFlush=__ZERO_BUFFER; - // While we are making progress processing SSL engine - boolean progress=true; - while (progress) - { - progress=false; + // While we are making progress processing SSL engine + boolean progress=true; + while (progress) + { + progress=false; - // Do any real IO - int filled=0,flushed=0; - try - { - // Read any available data - if (_inbound.space()>0 && (filled=_endp.fill(_inbound))>0) - progress = true; + // Do any real IO + int filled=0,flushed=0; + try + { + // Read any available data + if (_inbound.space()>0 && (filled=_endp.fill(_inbound))>0) + progress = true; - // flush any output data - if (_outbound.hasContent() && (flushed=_endp.flush(_outbound))>0) - progress = true; - } - catch (IOException e) - { - _endp.close(); - throw e; - } - finally - { - _logger.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.length(),flushed,_outbound.length()); - } + // flush any output data + if (_outbound.hasContent() && (flushed=_endp.flush(_outbound))>0) + progress = true; + } + catch (IOException e) + { + _endp.close(); + throw e; + } + finally + { + _logger.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.length(),flushed,_outbound.length()); + } - // handle the current hand share status - switch(_engine.getHandshakeStatus()) - { - case FINISHED: - throw new IllegalStateException(); + // handle the current hand share status + switch(_engine.getHandshakeStatus()) + { + case FINISHED: + throw new IllegalStateException(); - case NOT_HANDSHAKING: - { - // Try unwrapping some application data - if (toFill.space()>0 && _inbound.hasContent() && unwrap(toFill)) - progress=true; + case NOT_HANDSHAKING: + { + // Try unwrapping some application data + if (toFill.space()>0 && _inbound.hasContent() && unwrap(toFill)) + progress=true; - // Try wrapping some application data - if (toFlush.hasContent() && _outbound.space()>0 && wrap(toFlush)) - progress=true; - } - break; + // Try wrapping some application data + if (toFlush.hasContent() && _outbound.space()>0 && wrap(toFlush)) + progress=true; + } + break; - case NEED_TASK: - { - // A task needs to be run, so run it! - Runnable task; - while ((task=_engine.getDelegatedTask())!=null) - { - progress=true; - task.run(); - } + case NEED_TASK: + { + // A task needs to be run, so run it! + Runnable task; + while ((task=_engine.getDelegatedTask())!=null) + { + progress=true; + task.run(); + } - } - break; + } + break; - case NEED_WRAP: - { - // The SSL needs to send some handshake data to the other side - if (_handshook && !_allowRenegotiate) - _endp.close(); - else if (wrap(toFlush)) - progress=true; - } - break; + case NEED_WRAP: + { + // The SSL needs to send some handshake data to the other side + if (_handshook && !_allowRenegotiate) + _endp.close(); + else if (wrap(toFlush)) + progress=true; + } + break; - case NEED_UNWRAP: - { - // The SSL needs to receive some handshake data from the other side - if (_handshook && !_allowRenegotiate) - _endp.close(); - else if (!_inbound.hasContent()&&filled==-1) - { - // No more input coming - _endp.shutdownInput(); - } - else if (unwrap(toFill)) - progress=true; - } - break; - } + case NEED_UNWRAP: + { + // The SSL needs to receive some handshake data from the other side + if (_handshook && !_allowRenegotiate) + _endp.close(); + else if (!_inbound.hasContent()&&filled==-1) + { + // No more input coming + _endp.shutdownInput(); + } + else if (unwrap(toFill)) + progress=true; + } + break; + } - // pass on ishut/oshut state - if (_endp.isOpen() && _endp.isInputShutdown() && !_inbound.hasContent()) - closeInbound(); + // pass on ishut/oshut state + if (_endp.isOpen() && _endp.isInputShutdown() && !_inbound.hasContent()) + closeInbound(); - if (_endp.isOpen() && _engine.isOutboundDone() && !_outbound.hasContent()) - _endp.shutdownOutput(); + if (_endp.isOpen() && _engine.isOutboundDone() && !_outbound.hasContent()) + _endp.shutdownOutput(); - // remember if any progress has been made - some_progress|=progress; - } + // remember if any progress has been made + some_progress|=progress; + } - // If we are reading into the temp buffer and it has some content, then we should be dispatched. - if (toFill==_unwrapBuf && _unwrapBuf.hasContent() && !_connection.isSuspended()) - _aEndp.dispatch(); - } - finally - { - releaseBuffers(); - if (some_progress) - _progressed.set(true); - } - return some_progress; - } + // If we are reading into the temp buffer and it has some content, then we should be dispatched. + if (toFill==_unwrapBuf && _unwrapBuf.hasContent() && !_connection.isSuspended()) + _aEndp.dispatch(); + } + finally + { + releaseBuffers(); + if (some_progress) + _progressed.set(true); + } + return some_progress; + } - private void closeInbound() - { - try - { - _engine.closeInbound(); - } - catch (SSLException x) - { - _logger.debug("",x); - } - } + private void closeInbound() + { + try + { + _engine.closeInbound(); + } + catch (SSLException x) + { + _logger.debug("",x); + } + } - private synchronized boolean wrap(final Buffer buffer) throws IOException - { - ByteBuffer bbuf=extractByteBuffer(buffer); - final SSLEngineResult result; + private synchronized boolean wrap(final Buffer buffer) throws IOException + { + ByteBuffer bbuf=extractByteBuffer(buffer); + final SSLEngineResult result; - synchronized(bbuf) - { - _outbound.compact(); - ByteBuffer out_buffer=_outbound.getByteBuffer(); - synchronized(out_buffer) - { - try - { - bbuf.position(buffer.getIndex()); - bbuf.limit(buffer.putIndex()); - out_buffer.position(_outbound.putIndex()); - out_buffer.limit(out_buffer.capacity()); - result=_engine.wrap(bbuf,out_buffer); - if (_logger.isDebugEnabled()) - _logger.debug("{} wrap {} {} consumed={} produced={}", - _session, - result.getStatus(), - result.getHandshakeStatus(), - result.bytesConsumed(), - result.bytesProduced()); + synchronized(bbuf) + { + _outbound.compact(); + ByteBuffer out_buffer=_outbound.getByteBuffer(); + synchronized(out_buffer) + { + try + { + bbuf.position(buffer.getIndex()); + bbuf.limit(buffer.putIndex()); + out_buffer.position(_outbound.putIndex()); + out_buffer.limit(out_buffer.capacity()); + result=_engine.wrap(bbuf,out_buffer); + if (_logger.isDebugEnabled()) + _logger.debug("{} wrap {} {} consumed={} produced={}", + _session, + result.getStatus(), + result.getHandshakeStatus(), + result.bytesConsumed(), + result.bytesProduced()); - buffer.skip(result.bytesConsumed()); - _outbound.setPutIndex(_outbound.putIndex()+result.bytesProduced()); - } - catch(SSLException e) - { - _logger.debug(String.valueOf(_endp), e); - _endp.close(); - throw e; - } - finally - { - out_buffer.position(0); - out_buffer.limit(out_buffer.capacity()); - bbuf.position(0); - bbuf.limit(bbuf.capacity()); - } - } - } + buffer.skip(result.bytesConsumed()); + _outbound.setPutIndex(_outbound.putIndex()+result.bytesProduced()); + } + catch(SSLException e) + { + _logger.debug(String.valueOf(_endp), e); + _endp.close(); + throw e; + } + finally + { + out_buffer.position(0); + out_buffer.limit(out_buffer.capacity()); + bbuf.position(0); + bbuf.limit(bbuf.capacity()); + } + } + } - switch(result.getStatus()) - { - case BUFFER_UNDERFLOW: - throw new IllegalStateException(); + switch(result.getStatus()) + { + case BUFFER_UNDERFLOW: + throw new IllegalStateException(); - case BUFFER_OVERFLOW: - break; + case BUFFER_OVERFLOW: + break; - case OK: - if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) - _handshook=true; - break; + case OK: + if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) + _handshook=true; + break; - case CLOSED: - _logger.debug("wrap CLOSE {} {}",this,result); - if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) - _endp.close(); - break; + case CLOSED: + _logger.debug("wrap CLOSE {} {}",this,result); + if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) + _endp.close(); + break; - default: - _logger.debug("{} wrap default {}",_session,result); - throw new IOException(result.toString()); - } + default: + _logger.debug("{} wrap default {}",_session,result); + throw new IOException(result.toString()); + } - return result.bytesConsumed()>0 || result.bytesProduced()>0; - } + return result.bytesConsumed()>0 || result.bytesProduced()>0; + } - private synchronized boolean unwrap(final Buffer buffer) throws IOException - { - if (!_inbound.hasContent()) - return false; + private synchronized boolean unwrap(final Buffer buffer) throws IOException + { + if (!_inbound.hasContent()) + return false; - ByteBuffer bbuf=extractByteBuffer(buffer); - final SSLEngineResult result; + ByteBuffer bbuf=extractByteBuffer(buffer); + final SSLEngineResult result; - synchronized(bbuf) - { - ByteBuffer in_buffer=_inbound.getByteBuffer(); - synchronized(in_buffer) - { - try - { - bbuf.position(buffer.putIndex()); - bbuf.limit(buffer.capacity()); - in_buffer.position(_inbound.getIndex()); - in_buffer.limit(_inbound.putIndex()); + synchronized(bbuf) + { + ByteBuffer in_buffer=_inbound.getByteBuffer(); + synchronized(in_buffer) + { + try + { + bbuf.position(buffer.putIndex()); + bbuf.limit(buffer.capacity()); + in_buffer.position(_inbound.getIndex()); + in_buffer.limit(_inbound.putIndex()); - result=_engine.unwrap(in_buffer,bbuf); - if (_logger.isDebugEnabled()) - _logger.debug("{} unwrap {} {} consumed={} produced={}", - _session, - result.getStatus(), - result.getHandshakeStatus(), - result.bytesConsumed(), - result.bytesProduced()); + result=_engine.unwrap(in_buffer,bbuf); + if (_logger.isDebugEnabled()) + _logger.debug("{} unwrap {} {} consumed={} produced={}", + _session, + result.getStatus(), + result.getHandshakeStatus(), + result.bytesConsumed(), + result.bytesProduced()); - _inbound.skip(result.bytesConsumed()); - _inbound.compact(); - buffer.setPutIndex(buffer.putIndex()+result.bytesProduced()); - } - catch(SSLException e) - { - _logger.debug(String.valueOf(_endp), e); - _endp.close(); - throw e; - } - finally - { - in_buffer.position(0); - in_buffer.limit(in_buffer.capacity()); - bbuf.position(0); - bbuf.limit(bbuf.capacity()); - } - } - } + _inbound.skip(result.bytesConsumed()); + _inbound.compact(); + buffer.setPutIndex(buffer.putIndex()+result.bytesProduced()); + } + catch(SSLException e) + { + _logger.debug(String.valueOf(_endp), e); + _endp.close(); + throw e; + } + finally + { + in_buffer.position(0); + in_buffer.limit(in_buffer.capacity()); + bbuf.position(0); + bbuf.limit(bbuf.capacity()); + } + } + } - switch(result.getStatus()) - { - case BUFFER_UNDERFLOW: - if (_endp.isInputShutdown()) - _inbound.clear(); - break; + switch(result.getStatus()) + { + case BUFFER_UNDERFLOW: + if (_endp.isInputShutdown()) + _inbound.clear(); + break; - case BUFFER_OVERFLOW: - if (_logger.isDebugEnabled()) _logger.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound.toDetailString(),buffer.toDetailString()); - break; + case BUFFER_OVERFLOW: + if (_logger.isDebugEnabled()) _logger.debug("{} unwrap {} {}->{}",_session,result.getStatus(),_inbound.toDetailString(),buffer.toDetailString()); + break; - case OK: - if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) - _handshook=true; - break; + case OK: + if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) + _handshook=true; + break; - case CLOSED: - _logger.debug("unwrap CLOSE {} {}",this,result); - if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) - _endp.close(); - break; + case CLOSED: + _logger.debug("unwrap CLOSE {} {}",this,result); + if (result.getHandshakeStatus()==HandshakeStatus.FINISHED) + _endp.close(); + break; - default: - _logger.debug("{} wrap default {}",_session,result); - throw new IOException(result.toString()); - } + default: + _logger.debug("{} wrap default {}",_session,result); + throw new IOException(result.toString()); + } - //if (LOG.isDebugEnabled() && result.bytesProduced()>0) - // LOG.debug("{} unwrapped '{}'",_session,buffer); + //if (LOG.isDebugEnabled() && result.bytesProduced()>0) + // LOG.debug("{} unwrapped '{}'",_session,buffer); - return result.bytesConsumed()>0 || result.bytesProduced()>0; - } + return result.bytesConsumed()>0 || result.bytesProduced()>0; + } - /* ------------------------------------------------------------ */ - private ByteBuffer extractByteBuffer(Buffer buffer) - { - if (buffer.buffer() instanceof NIOBuffer) - return ((NIOBuffer)buffer.buffer()).getByteBuffer(); - return ByteBuffer.wrap(buffer.array()); - } + /* ------------------------------------------------------------ */ + private ByteBuffer extractByteBuffer(Buffer buffer) + { + if (buffer.buffer() instanceof NIOBuffer) + return ((NIOBuffer)buffer.buffer()).getByteBuffer(); + return ByteBuffer.wrap(buffer.array()); + } - /* ------------------------------------------------------------ */ - public AsyncEndPoint getSslEndPoint() - { - return _sslEndPoint; - } + /* ------------------------------------------------------------ */ + public AsyncEndPoint getSslEndPoint() + { + return _sslEndPoint; + } - /* ------------------------------------------------------------ */ - public String toString() - { - return String.format("%s %s", super.toString(), _sslEndPoint); - } + /* ------------------------------------------------------------ */ + public String toString() + { + return String.format("%s %s", super.toString(), _sslEndPoint); + } - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - public class SslEndPoint implements AsyncEndPoint - { - public SSLEngine getSslEngine() - { - return _engine; - } + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + public class SslEndPoint implements AsyncEndPoint + { + public SSLEngine getSslEngine() + { + return _engine; + } - public AsyncEndPoint getEndpoint() - { - return _aEndp; - } + public AsyncEndPoint getEndpoint() + { + return _aEndp; + } - public void shutdownOutput() throws IOException - { - synchronized (SslConnection.this) - { - _logger.debug("{} ssl endp.oshut {}",_session,this); - _engine.closeOutbound(); - _oshut=true; - } - flush(); - } + public void shutdownOutput() throws IOException + { + synchronized (SslConnection.this) + { + _logger.debug("{} ssl endp.oshut {}",_session,this); + _engine.closeOutbound(); + _oshut=true; + } + flush(); + } - public boolean isOutputShutdown() - { - synchronized (SslConnection.this) - { - return _oshut||!isOpen()||_engine.isOutboundDone(); - } - } + public boolean isOutputShutdown() + { + synchronized (SslConnection.this) + { + return _oshut||!isOpen()||_engine.isOutboundDone(); + } + } - public void shutdownInput() throws IOException - { - _logger.debug("{} ssl endp.ishut!",_session); - // We do not do a closeInput here, as SSL does not support half close. - // isInputShutdown works it out itself from buffer state and underlying endpoint state. - } + public void shutdownInput() throws IOException + { + _logger.debug("{} ssl endp.ishut!",_session); + // We do not do a closeInput here, as SSL does not support half close. + // isInputShutdown works it out itself from buffer state and underlying endpoint state. + } - public boolean isInputShutdown() - { - synchronized (SslConnection.this) - { - return _endp.isInputShutdown() && - !(_unwrapBuf!=null&&_unwrapBuf.hasContent()) && - !(_inbound!=null&&_inbound.hasContent()); - } - } - - public void close() throws IOException - { - _logger.debug("{} ssl endp.close",_session); - _endp.close(); - } + public boolean isInputShutdown() + { + synchronized (SslConnection.this) + { + return _endp.isInputShutdown() && + !(_unwrapBuf!=null&&_unwrapBuf.hasContent()) && + !(_inbound!=null&&_inbound.hasContent()); + } + } - public int fill(Buffer buffer) throws IOException - { - int size=buffer.length(); - process(buffer, null); - - int filled=buffer.length()-size; + public void close() throws IOException + { + _logger.debug("{} ssl endp.close",_session); + _endp.close(); + } - if (filled==0 && isInputShutdown()) - return -1; - return filled; - } + public int fill(Buffer buffer) throws IOException + { + int size=buffer.length(); + process(buffer, null); - public int flush(Buffer buffer) throws IOException - { - int size = buffer.length(); - process(null, buffer); - return size-buffer.length(); - } + int filled=buffer.length()-size; + + if (filled==0 && isInputShutdown()) + return -1; + return filled; + } - public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException - { - if (header!=null && header.hasContent()) - return flush(header); - if (buffer!=null && buffer.hasContent()) - return flush(buffer); - if (trailer!=null && trailer.hasContent()) - return flush(trailer); - return 0; - } + public int flush(Buffer buffer) throws IOException + { + int size = buffer.length(); + process(null, buffer); + return size-buffer.length(); + } - public boolean blockReadable(long millisecs) throws IOException - { - long now = System.currentTimeMillis(); - long end=millisecs>0?(now+millisecs):Long.MAX_VALUE; + public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException + { + if (header!=null && header.hasContent()) + return flush(header); + if (buffer!=null && buffer.hasContent()) + return flush(buffer); + if (trailer!=null && trailer.hasContent()) + return flush(trailer); + return 0; + } - while (now<end) - { - if (process(null,null)) - break; - _endp.blockReadable(end-now); - now = System.currentTimeMillis(); - } + public boolean blockReadable(long millisecs) throws IOException + { + long now = System.currentTimeMillis(); + long end=millisecs>0?(now+millisecs):Long.MAX_VALUE; - return now<end; - } + while (now<end) + { + if (process(null,null)) + break; + _endp.blockReadable(end-now); + now = System.currentTimeMillis(); + } - public boolean blockWritable(long millisecs) throws IOException - { - return _endp.blockWritable(millisecs); - } + return now<end; + } - public boolean isOpen() - { - return _endp.isOpen(); - } + public boolean blockWritable(long millisecs) throws IOException + { + return _endp.blockWritable(millisecs); + } - public Object getTransport() - { - return _endp; - } + public boolean isOpen() + { + return _endp.isOpen(); + } - public void flush() throws IOException - { - process(null, null); - } + public Object getTransport() + { + return _endp; + } - public void dispatch() - { - _aEndp.dispatch(); - } + public void flush() throws IOException + { + process(null, null); + } - public void asyncDispatch() - { - _aEndp.asyncDispatch(); - } + public void dispatch() + { + _aEndp.dispatch(); + } - public void scheduleWrite() - { - _aEndp.scheduleWrite(); - } + public void asyncDispatch() + { + _aEndp.asyncDispatch(); + } - public void onIdleExpired(long idleForMs) - { - _aEndp.onIdleExpired(idleForMs); - } + public void scheduleWrite() + { + _aEndp.scheduleWrite(); + } - public void setCheckForIdle(boolean check) - { - _aEndp.setCheckForIdle(check); - } + public void onIdleExpired(long idleForMs) + { + _aEndp.onIdleExpired(idleForMs); + } - public boolean isCheckForIdle() - { - return _aEndp.isCheckForIdle(); - } + public void setCheckForIdle(boolean check) + { + _aEndp.setCheckForIdle(check); + } - public void scheduleTimeout(Task task, long timeoutMs) - { - _aEndp.scheduleTimeout(task,timeoutMs); - } + public boolean isCheckForIdle() + { + return _aEndp.isCheckForIdle(); + } - public void cancelTimeout(Task task) - { - _aEndp.cancelTimeout(task); - } + public boolean isWritable() + { + return _aEndp.isWritable(); + } - public boolean isWritable() - { - return _aEndp.isWritable(); - } + public boolean hasProgressed() + { + return _progressed.getAndSet(false); + } - public boolean hasProgressed() - { - return _progressed.getAndSet(false); - } + public String getLocalAddr() + { + return _aEndp.getLocalAddr(); + } - public String getLocalAddr() - { - return _aEndp.getLocalAddr(); - } + public String getLocalHost() + { + return _aEndp.getLocalHost(); + } - public String getLocalHost() - { - return _aEndp.getLocalHost(); - } + public int getLocalPort() + { + return _aEndp.getLocalPort(); + } - public int getLocalPort() - { - return _aEndp.getLocalPort(); - } + public String getRemoteAddr() + { + return _aEndp.getRemoteAddr(); + } - public String getRemoteAddr() - { - return _aEndp.getRemoteAddr(); - } + public String getRemoteHost() + { + return _aEndp.getRemoteHost(); + } - public String getRemoteHost() - { - return _aEndp.getRemoteHost(); - } - - public int getRemotePort() - { - return _aEndp.getRemotePort(); - } + public int getRemotePort() + { + return _aEndp.getRemotePort(); + } - public boolean isBlocking() - { - return false; - } + public boolean isBlocking() + { + return false; + } - public int getMaxIdleTime() - { - return _aEndp.getMaxIdleTime(); - } + public int getMaxIdleTime() + { + return _aEndp.getMaxIdleTime(); + } - public void setMaxIdleTime(int timeMs) throws IOException - { - _aEndp.setMaxIdleTime(timeMs); - } + public void setMaxIdleTime(int timeMs) throws IOException + { + _aEndp.setMaxIdleTime(timeMs); + } - public Connection getConnection() - { - return _connection; - } + public Connection getConnection() + { + return _connection; + } - public void setConnection(Connection connection) - { - _connection=(AsyncConnection)connection; - } + public void setConnection(Connection connection) + { + _connection=(AsyncConnection)connection; + } - public String toString() - { - // Do NOT use synchronized (SslConnection.this) - // because it's very easy to deadlock when debugging is enabled. - // We do a best effort to print the right toString() and that's it. - Buffer inbound = _inbound; - Buffer outbound = _outbound; - Buffer unwrap = _unwrapBuf; - int i = inbound == null? -1 : inbound.length(); - int o = outbound == null ? -1 : outbound.length(); - int u = unwrap == null ? -1 : unwrap.length(); - return String.format("SSL %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}", - _engine.getHandshakeStatus(), - i, o, u, - _ishut, _oshut, - _connection); - } + public String toString() + { + // Do NOT use synchronized (SslConnection.this) + // because it's very easy to deadlock when debugging is enabled. + // We do a best effort to print the right toString() and that's it. + Buffer inbound = _inbound; + Buffer outbound = _outbound; + Buffer unwrap = _unwrapBuf; + int i = inbound == null? -1 : inbound.length(); + int o = outbound == null ? -1 : outbound.length(); + int u = unwrap == null ? -1 : unwrap.length(); + return String.format("SSL %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}", + _engine.getHandshakeStatus(), + i, o, u, + _ishut, _oshut, + _connection); + } - } + } }