Mercurial Hosting > luan
diff src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 865:6b210bb66c63
remove ThreadPool
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 20:38:06 -0600 |
parents | 8e9db0bbf4f9 |
children | 54308d65265a |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Sun Oct 02 20:38:06 2016 -0600 @@ -25,6 +25,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Locale; +import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; @@ -42,827 +43,827 @@ */ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint { - public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); + public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); - private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); - private final SelectorManager.SelectSet _selectSet; - private final SelectorManager _manager; - private SelectionKey _key; - private final Runnable _handler = new Runnable() - { - public void run() { handle(); } - }; + private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); + private final SelectorManager.SelectSet _selectSet; + private final SelectorManager _manager; + private SelectionKey _key; + private final Runnable _handler = new Runnable() + { + public void run() { handle(); } + }; - /** The desired value for {@link SelectionKey#interestOps()} */ - private int _interestOps; + /** The desired value for {@link SelectionKey#interestOps()} */ + private int _interestOps; - /** - * The connection instance is the handler for any IO activity on the endpoint. - * There is a different type of connection for HTTP, AJP, WebSocket and - * ProxyConnect. The connection may change for an SCEP as it is upgraded - * from HTTP to proxy connect or websocket. - */ - private volatile AsyncConnection _connection; + /** + * The connection instance is the handler for any IO activity on the endpoint. + * There is a different type of connection for HTTP, AJP, WebSocket and + * ProxyConnect. The connection may change for an SCEP as it is upgraded + * from HTTP to proxy connect or websocket. + */ + private volatile AsyncConnection _connection; - private static final int STATE_NEEDS_DISPATCH=-1; - private static final int STATE_UNDISPATCHED=0; - private static final int STATE_DISPATCHED=1; - private static final int STATE_ASYNC=2; - private int _state; - - private boolean _onIdle; + private static final int STATE_NEEDS_DISPATCH=-1; + private static final int STATE_UNDISPATCHED=0; + private static final int STATE_DISPATCHED=1; + private static final int STATE_ASYNC=2; + private int _state; + + private boolean _onIdle; - /** true if the last write operation succeed and wrote all offered bytes */ - private volatile boolean _writable = true; + /** true if the last write operation succeed and wrote all offered bytes */ + private volatile boolean _writable = true; - /** True if a thread has is blocked in {@link #blockReadable(long)} */ - private boolean _readBlocked; + /** True if a thread has is blocked in {@link #blockReadable(long)} */ + private boolean _readBlocked; - /** True if a thread has is blocked in {@link #blockWritable(long)} */ - private boolean _writeBlocked; + /** True if a thread has is blocked in {@link #blockWritable(long)} */ + private boolean _writeBlocked; - /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ - private boolean _open; + /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ + private boolean _open; - private volatile long _idleTimestamp; - private volatile boolean _checkIdle; - - private boolean _interruptable; + private volatile long _idleTimestamp; + private volatile boolean _checkIdle; + + private boolean _interruptable; - private boolean _ishut; + private boolean _ishut; - /* ------------------------------------------------------------ */ - public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) - throws IOException - { - super(channel, maxIdleTime); + /* ------------------------------------------------------------ */ + public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) + throws IOException + { + super(channel, maxIdleTime); - _manager = selectSet.getManager(); - _selectSet = selectSet; - _state=STATE_UNDISPATCHED; - _onIdle=false; - _open=true; - _key = key; + _manager = selectSet.getManager(); + _selectSet = selectSet; + _state=STATE_UNDISPATCHED; + _onIdle=false; + _open=true; + _key = key; - setCheckForIdle(true); - } + setCheckForIdle(true); + } - /* ------------------------------------------------------------ */ - public SelectionKey getSelectionKey() - { - synchronized (this) - { - return _key; - } - } + /* ------------------------------------------------------------ */ + public SelectionKey getSelectionKey() + { + synchronized (this) + { + return _key; + } + } - /* ------------------------------------------------------------ */ - public SelectorManager getSelectManager() - { - return _manager; - } + /* ------------------------------------------------------------ */ + public SelectorManager getSelectManager() + { + return _manager; + } - /* ------------------------------------------------------------ */ - public Connection getConnection() - { - return _connection; - } + /* ------------------------------------------------------------ */ + public Connection getConnection() + { + return _connection; + } - /* ------------------------------------------------------------ */ - public void setConnection(Connection connection) - { - Connection old=_connection; - _connection=(AsyncConnection)connection; - if (old!=null && old!=_connection) - _manager.endPointUpgraded(this,old); - } + /* ------------------------------------------------------------ */ + public void setConnection(Connection connection) + { + Connection old=_connection; + _connection=(AsyncConnection)connection; + if (old!=null && old!=_connection) + _manager.endPointUpgraded(this,old); + } - /* ------------------------------------------------------------ */ - public long getIdleTimestamp() - { - return _idleTimestamp; - } + /* ------------------------------------------------------------ */ + public long getIdleTimestamp() + { + return _idleTimestamp; + } - /* ------------------------------------------------------------ */ - /** Called by selectSet to schedule handling - * - */ - public void schedule() - { - synchronized (this) - { - // If there is no key, then do nothing - if (_key == null || !_key.isValid()) - { - _readBlocked=false; - _writeBlocked=false; - this.notifyAll(); - return; - } + /* ------------------------------------------------------------ */ + /** Called by selectSet to schedule handling + * + */ + public void schedule() + { + synchronized (this) + { + // If there is no key, then do nothing + if (_key == null || !_key.isValid()) + { + _readBlocked=false; + _writeBlocked=false; + this.notifyAll(); + return; + } - // If there are threads dispatched reading and writing - if (_readBlocked || _writeBlocked) - { - // assert _dispatched; - if (_readBlocked && _key.isReadable()) - _readBlocked=false; - if (_writeBlocked && _key.isWritable()) - _writeBlocked=false; + // If there are threads dispatched reading and writing + if (_readBlocked || _writeBlocked) + { + // assert _dispatched; + if (_readBlocked && _key.isReadable()) + _readBlocked=false; + if (_writeBlocked && _key.isWritable()) + _writeBlocked=false; - // wake them up is as good as a dispatched. - this.notifyAll(); + // wake them up is as good as a dispatched. + this.notifyAll(); - // we are not interested in further selecting - _key.interestOps(0); - if (_state<STATE_DISPATCHED) - updateKey(); - return; - } + // we are not interested in further selecting + _key.interestOps(0); + if (_state<STATE_DISPATCHED) + updateKey(); + return; + } - // Remove writeable op - if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) - { - // Remove writeable op - _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; - _key.interestOps(_interestOps); - _writable = true; // Once writable is in ops, only removed with dispatch. - } + // Remove writeable op + if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) + { + // Remove writeable op + _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; + _key.interestOps(_interestOps); + _writable = true; // Once writable is in ops, only removed with dispatch. + } - // If dispatched, then deregister interest - if (_state>=STATE_DISPATCHED) - _key.interestOps(0); - else - { - // other wise do the dispatch - dispatch(); - if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0()) - { - _key.interestOps(0); - } - } - } - } + // If dispatched, then deregister interest + if (_state>=STATE_DISPATCHED) + _key.interestOps(0); + else + { + // other wise do the dispatch + dispatch(); + if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0()) + { + _key.interestOps(0); + } + } + } + } - /* ------------------------------------------------------------ */ - public void asyncDispatch() - { - synchronized(this) - { - switch(_state) - { - case STATE_NEEDS_DISPATCH: - case STATE_UNDISPATCHED: - dispatch(); - break; - - case STATE_DISPATCHED: - case STATE_ASYNC: - _state=STATE_ASYNC; - break; - } - } - } + /* ------------------------------------------------------------ */ + public void asyncDispatch() + { + synchronized(this) + { + switch(_state) + { + case STATE_NEEDS_DISPATCH: + case STATE_UNDISPATCHED: + dispatch(); + break; + + case STATE_DISPATCHED: + case STATE_ASYNC: + _state=STATE_ASYNC; + break; + } + } + } - /* ------------------------------------------------------------ */ - public void dispatch() - { - synchronized(this) - { - if (_state<=STATE_UNDISPATCHED) - { - if (_onIdle) - _state = STATE_NEEDS_DISPATCH; - else - { - _state = STATE_DISPATCHED; - boolean dispatched = _manager.dispatch(_handler); - if(!dispatched) - { - _state = STATE_NEEDS_DISPATCH; - LOG.warn("Dispatched Failed! "+this+" to "+_manager); - updateKey(); - } - } - } - } - } + /* ------------------------------------------------------------ */ + public void dispatch() + { + synchronized(this) + { + if (_state<=STATE_UNDISPATCHED) + { + if (_onIdle) + _state = STATE_NEEDS_DISPATCH; + else + { + _state = STATE_DISPATCHED; + try { + _manager.execute(_handler); + } catch(RejectedExecutionException e) { + _state = STATE_NEEDS_DISPATCH; + LOG.warn("Dispatched Failed! "+this+" to "+_manager); + updateKey(); + } + } + } + } + } - /* ------------------------------------------------------------ */ - /** - * Called when a dispatched thread is no longer handling the endpoint. - * The selection key operations are updated. - * @return If false is returned, the endpoint has been redispatched and - * thread must keep handling the endpoint. - */ - protected boolean undispatch() - { - synchronized (this) - { - switch(_state) - { - case STATE_ASYNC: - _state=STATE_DISPATCHED; - return false; + /* ------------------------------------------------------------ */ + /** + * Called when a dispatched thread is no longer handling the endpoint. + * The selection key operations are updated. + * @return If false is returned, the endpoint has been redispatched and + * thread must keep handling the endpoint. + */ + protected boolean undispatch() + { + synchronized (this) + { + switch(_state) + { + case STATE_ASYNC: + _state=STATE_DISPATCHED; + return false; - default: - _state=STATE_UNDISPATCHED; - updateKey(); - return true; - } - } - } + default: + _state=STATE_UNDISPATCHED; + updateKey(); + return true; + } + } + } - /* ------------------------------------------------------------ */ - public void cancelTimeout(Task task) - { - getSelectSet().cancelTimeout(task); - } + /* ------------------------------------------------------------ */ + public void cancelTimeout(Task task) + { + getSelectSet().cancelTimeout(task); + } - /* ------------------------------------------------------------ */ - public void scheduleTimeout(Task task, long timeoutMs) - { - getSelectSet().scheduleTimeout(task,timeoutMs); - } + /* ------------------------------------------------------------ */ + public void scheduleTimeout(Task task, long timeoutMs) + { + getSelectSet().scheduleTimeout(task,timeoutMs); + } - /* ------------------------------------------------------------ */ - public void setCheckForIdle(boolean check) - { - if (check) - { - _idleTimestamp=System.currentTimeMillis(); - _checkIdle=true; - } - else - _checkIdle=false; - } + /* ------------------------------------------------------------ */ + public void setCheckForIdle(boolean check) + { + if (check) + { + _idleTimestamp=System.currentTimeMillis(); + _checkIdle=true; + } + else + _checkIdle=false; + } - /* ------------------------------------------------------------ */ - public boolean isCheckForIdle() - { - return _checkIdle; - } + /* ------------------------------------------------------------ */ + public boolean isCheckForIdle() + { + return _checkIdle; + } - /* ------------------------------------------------------------ */ - protected void notIdle() - { - _idleTimestamp=System.currentTimeMillis(); - } + /* ------------------------------------------------------------ */ + protected void notIdle() + { + _idleTimestamp=System.currentTimeMillis(); + } - /* ------------------------------------------------------------ */ - public void checkIdleTimestamp(long now) - { - if (isCheckForIdle() && _maxIdleTime>0) - { - final long idleForMs=now-_idleTimestamp; + /* ------------------------------------------------------------ */ + public void checkIdleTimestamp(long now) + { + if (isCheckForIdle() && _maxIdleTime>0) + { + final long idleForMs=now-_idleTimestamp; - if (idleForMs>_maxIdleTime) - { - // Don't idle out again until onIdleExpired task completes. - setCheckForIdle(false); - _manager.dispatch(new Runnable() - { - public void run() - { - try - { - onIdleExpired(idleForMs); - } - finally - { - setCheckForIdle(true); - } - } - }); - } - } - } + if (idleForMs>_maxIdleTime) + { + // Don't idle out again until onIdleExpired task completes. + setCheckForIdle(false); + _manager.execute(new Runnable() + { + public void run() + { + try + { + onIdleExpired(idleForMs); + } + finally + { + setCheckForIdle(true); + } + } + }); + } + } + } - /* ------------------------------------------------------------ */ - public void onIdleExpired(long idleForMs) - { - try - { - synchronized (this) - { - _onIdle=true; - } + /* ------------------------------------------------------------ */ + public void onIdleExpired(long idleForMs) + { + try + { + synchronized (this) + { + _onIdle=true; + } - _connection.onIdleExpired(idleForMs); - } - finally - { - synchronized (this) - { - _onIdle=false; - if (_state==STATE_NEEDS_DISPATCH) - dispatch(); - } - } - } + _connection.onIdleExpired(idleForMs); + } + finally + { + synchronized (this) + { + _onIdle=false; + if (_state==STATE_NEEDS_DISPATCH) + dispatch(); + } + } + } - /* ------------------------------------------------------------ */ - @Override - public int fill(Buffer buffer) throws IOException - { - int fill=super.fill(buffer); - if (fill>0) - notIdle(); - return fill; - } + /* ------------------------------------------------------------ */ + @Override + public int fill(Buffer buffer) throws IOException + { + int fill=super.fill(buffer); + if (fill>0) + notIdle(); + return fill; + } - /* ------------------------------------------------------------ */ - @Override - public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException - { - int l = super.flush(header, buffer, trailer); + /* ------------------------------------------------------------ */ + @Override + public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException + { + int l = super.flush(header, buffer, trailer); - // If there was something to write and it wasn't written, then we are not writable. - if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) - { - synchronized (this) - { - _writable=false; - if (_state<STATE_DISPATCHED) - updateKey(); - } - } - else if (l>0) - { - _writable=true; - notIdle(); - } - return l; - } + // If there was something to write and it wasn't written, then we are not writable. + if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) + { + synchronized (this) + { + _writable=false; + if (_state<STATE_DISPATCHED) + updateKey(); + } + } + else if (l>0) + { + _writable=true; + notIdle(); + } + return l; + } - /* ------------------------------------------------------------ */ - /* - */ - @Override - public int flush(Buffer buffer) throws IOException - { - int l = super.flush(buffer); + /* ------------------------------------------------------------ */ + /* + */ + @Override + public int flush(Buffer buffer) throws IOException + { + int l = super.flush(buffer); - // If there was something to write and it wasn't written, then we are not writable. - if (l==0 && buffer!=null && buffer.hasContent()) - { - synchronized (this) - { - _writable=false; - if (_state<STATE_DISPATCHED) - updateKey(); - } - } - else if (l>0) - { - _writable=true; - notIdle(); - } + // If there was something to write and it wasn't written, then we are not writable. + if (l==0 && buffer!=null && buffer.hasContent()) + { + synchronized (this) + { + _writable=false; + if (_state<STATE_DISPATCHED) + updateKey(); + } + } + else if (l>0) + { + _writable=true; + notIdle(); + } - return l; - } + return l; + } - /* ------------------------------------------------------------ */ - /* - * Allows thread to block waiting for further events. - */ - @Override - public boolean blockReadable(long timeoutMs) throws IOException - { - synchronized (this) - { - if (isInputShutdown()) - throw new EofException(); + /* ------------------------------------------------------------ */ + /* + * Allows thread to block waiting for further events. + */ + @Override + public boolean blockReadable(long timeoutMs) throws IOException + { + synchronized (this) + { + if (isInputShutdown()) + throw new EofException(); - long now=_selectSet.getNow(); - long end=now+timeoutMs; - boolean check=isCheckForIdle(); - setCheckForIdle(true); - try - { - _readBlocked=true; - while (!isInputShutdown() && _readBlocked) - { - try - { - updateKey(); - this.wait(timeoutMs>0?(end-now):10000); - } - catch (final InterruptedException e) - { - LOG.warn("",e); - if (_interruptable) - throw new InterruptedIOException(){{this.initCause(e);}}; - } - finally - { - now=_selectSet.getNow(); - } + long now=_selectSet.getNow(); + long end=now+timeoutMs; + boolean check=isCheckForIdle(); + setCheckForIdle(true); + try + { + _readBlocked=true; + while (!isInputShutdown() && _readBlocked) + { + try + { + updateKey(); + this.wait(timeoutMs>0?(end-now):10000); + } + catch (final InterruptedException e) + { + LOG.warn("",e); + if (_interruptable) + throw new InterruptedIOException(){{this.initCause(e);}}; + } + finally + { + now=_selectSet.getNow(); + } - if (_readBlocked && timeoutMs>0 && now>=end) - return false; - } - } - finally - { - _readBlocked=false; - setCheckForIdle(check); - } - } - return true; - } + if (_readBlocked && timeoutMs>0 && now>=end) + return false; + } + } + finally + { + _readBlocked=false; + setCheckForIdle(check); + } + } + return true; + } - /* ------------------------------------------------------------ */ - /* - * Allows thread to block waiting for further events. - */ - @Override - public boolean blockWritable(long timeoutMs) throws IOException - { - synchronized (this) - { - if (isOutputShutdown()) - throw new EofException(); + /* ------------------------------------------------------------ */ + /* + * Allows thread to block waiting for further events. + */ + @Override + public boolean blockWritable(long timeoutMs) throws IOException + { + synchronized (this) + { + if (isOutputShutdown()) + throw new EofException(); - long now=_selectSet.getNow(); - long end=now+timeoutMs; - boolean check=isCheckForIdle(); - setCheckForIdle(true); - try - { - _writeBlocked=true; - while (_writeBlocked && !isOutputShutdown()) - { - try - { - updateKey(); - this.wait(timeoutMs>0?(end-now):10000); - } - catch (final InterruptedException e) - { - LOG.warn("",e); - if (_interruptable) - throw new InterruptedIOException(){{this.initCause(e);}}; - } - finally - { - now=_selectSet.getNow(); - } - if (_writeBlocked && timeoutMs>0 && now>=end) - return false; - } - } - finally - { - _writeBlocked=false; - setCheckForIdle(check); - } - } - return true; - } + long now=_selectSet.getNow(); + long end=now+timeoutMs; + boolean check=isCheckForIdle(); + setCheckForIdle(true); + try + { + _writeBlocked=true; + while (_writeBlocked && !isOutputShutdown()) + { + try + { + updateKey(); + this.wait(timeoutMs>0?(end-now):10000); + } + catch (final InterruptedException e) + { + LOG.warn("",e); + if (_interruptable) + throw new InterruptedIOException(){{this.initCause(e);}}; + } + finally + { + now=_selectSet.getNow(); + } + if (_writeBlocked && timeoutMs>0 && now>=end) + return false; + } + } + finally + { + _writeBlocked=false; + setCheckForIdle(check); + } + } + return true; + } - /* ------------------------------------------------------------ */ - /** Set the interruptable mode of the endpoint. - * If set to false (default), then interrupts are assumed to be spurious - * and blocking operations continue unless the endpoint has been closed. - * If true, then interrupts of blocking operations result in InterruptedIOExceptions - * being thrown. - * @param interupable - */ - public void setInterruptable(boolean interupable) - { - synchronized (this) - { - _interruptable=interupable; - } - } + /* ------------------------------------------------------------ */ + /** Set the interruptable mode of the endpoint. + * If set to false (default), then interrupts are assumed to be spurious + * and blocking operations continue unless the endpoint has been closed. + * If true, then interrupts of blocking operations result in InterruptedIOExceptions + * being thrown. + * @param interupable + */ + public void setInterruptable(boolean interupable) + { + synchronized (this) + { + _interruptable=interupable; + } + } - /* ------------------------------------------------------------ */ - public boolean isInterruptable() - { - return _interruptable; - } - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite() - */ - public void scheduleWrite() - { - if (_writable) - LOG.debug("Required scheduleWrite {}",this); + /* ------------------------------------------------------------ */ + public boolean isInterruptable() + { + return _interruptable; + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite() + */ + public void scheduleWrite() + { + if (_writable) + LOG.debug("Required scheduleWrite {}",this); - _writable=false; - updateKey(); - } + _writable=false; + updateKey(); + } - /* ------------------------------------------------------------ */ - public boolean isWritable() - { - return _writable; - } + /* ------------------------------------------------------------ */ + public boolean isWritable() + { + return _writable; + } - /* ------------------------------------------------------------ */ - public boolean hasProgressed() - { - return false; - } + /* ------------------------------------------------------------ */ + public boolean hasProgressed() + { + return false; + } - /* ------------------------------------------------------------ */ - /** - * Updates selection key. Adds operations types to the selection key as needed. No operations - * are removed as this is only done during dispatch. This method records the new key and - * schedules a call to doUpdateKey to do the keyChange - */ - private void updateKey() - { - final boolean changed; - synchronized (this) - { - int current_ops=-1; - if (getChannel().isOpen()) - { - boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); - boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); + /* ------------------------------------------------------------ */ + /** + * Updates selection key. Adds operations types to the selection key as needed. No operations + * are removed as this is only done during dispatch. This method records the new key and + * schedules a call to doUpdateKey to do the keyChange + */ + private void updateKey() + { + final boolean changed; + synchronized (this) + { + int current_ops=-1; + if (getChannel().isOpen()) + { + boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); + boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); - _interestOps = - ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) - | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); - try - { - current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); - } - catch(Exception e) - { - _key=null; - LOG.trace("",e); - } - } - changed=_interestOps!=current_ops; - } + _interestOps = + ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) + | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); + try + { + current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); + } + catch(Exception e) + { + _key=null; + LOG.trace("",e); + } + } + changed=_interestOps!=current_ops; + } - if(changed) - { - _selectSet.addChange(this); - _selectSet.wakeup(); - } - } + if(changed) + { + _selectSet.addChange(this); + _selectSet.wakeup(); + } + } - /* ------------------------------------------------------------ */ - /** - * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey - */ - void doUpdateKey() - { - synchronized (this) - { - if (getChannel().isOpen()) - { - if (_interestOps>0) - { - if (_key==null || !_key.isValid()) - { - SelectableChannel sc = (SelectableChannel)getChannel(); - if (sc.isRegistered()) - { - updateKey(); - } - else - { - try - { - _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this); - } - catch (Exception e) - { - LOG.trace("",e); - if (_key!=null && _key.isValid()) - { - _key.cancel(); - } + /* ------------------------------------------------------------ */ + /** + * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey + */ + void doUpdateKey() + { + synchronized (this) + { + if (getChannel().isOpen()) + { + if (_interestOps>0) + { + if (_key==null || !_key.isValid()) + { + SelectableChannel sc = (SelectableChannel)getChannel(); + if (sc.isRegistered()) + { + updateKey(); + } + else + { + try + { + _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this); + } + catch (Exception e) + { + LOG.trace("",e); + if (_key!=null && _key.isValid()) + { + _key.cancel(); + } - if (_open) - { - _selectSet.destroyEndPoint(this); - } - _open=false; - _key = null; - } - } - } - else - { - _key.interestOps(_interestOps); - } - } - else - { - if (_key!=null && _key.isValid()) - _key.interestOps(0); - else - _key=null; - } - } - else - { - if (_key!=null && _key.isValid()) - _key.cancel(); + if (_open) + { + _selectSet.destroyEndPoint(this); + } + _open=false; + _key = null; + } + } + } + else + { + _key.interestOps(_interestOps); + } + } + else + { + if (_key!=null && _key.isValid()) + _key.interestOps(0); + else + _key=null; + } + } + else + { + if (_key!=null && _key.isValid()) + _key.cancel(); - if (_open) - { - _open=false; - _selectSet.destroyEndPoint(this); - } - _key = null; - } - } - } + if (_open) + { + _open=false; + _selectSet.destroyEndPoint(this); + } + _key = null; + } + } + } - /* ------------------------------------------------------------ */ - /* - */ - protected void handle() - { - boolean dispatched=true; - try - { - while(dispatched) - { - try - { - while(true) - { - final AsyncConnection next = (AsyncConnection)_connection.handle(); - if (next!=_connection) - { - LOG.debug("{} replaced {}",next,_connection); - Connection old=_connection; - _connection=next; - _manager.endPointUpgraded(this,old); - continue; - } - break; - } - } - catch (ClosedChannelException e) - { - LOG.trace("",e); - } - catch (EofException e) - { - LOG.debug("EOF", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (IOException e) - { - LOG.warn(e.toString()); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (Throwable e) - { - LOG.warn("handle failed", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - finally - { - if (!_ishut && isInputShutdown() && isOpen()) - { - _ishut=true; - try - { - _connection.onInputShutdown(); - } - catch(Throwable x) - { - LOG.warn("onInputShutdown failed", x); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - finally - { - updateKey(); - } - } - dispatched=!undispatch(); - } - } - } - finally - { - if (dispatched) - { - dispatched=!undispatch(); - while (dispatched) - { - LOG.warn("SCEP.run() finally DISPATCHED"); - dispatched=!undispatch(); - } - } - } - } + /* ------------------------------------------------------------ */ + /* + */ + protected void handle() + { + boolean dispatched=true; + try + { + while(dispatched) + { + try + { + while(true) + { + final AsyncConnection next = (AsyncConnection)_connection.handle(); + if (next!=_connection) + { + LOG.debug("{} replaced {}",next,_connection); + Connection old=_connection; + _connection=next; + _manager.endPointUpgraded(this,old); + continue; + } + break; + } + } + catch (ClosedChannelException e) + { + LOG.trace("",e); + } + catch (EofException e) + { + LOG.debug("EOF", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (IOException e) + { + LOG.warn(e.toString()); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (Throwable e) + { + LOG.warn("handle failed", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + finally + { + if (!_ishut && isInputShutdown() && isOpen()) + { + _ishut=true; + try + { + _connection.onInputShutdown(); + } + catch(Throwable x) + { + LOG.warn("onInputShutdown failed", x); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + finally + { + updateKey(); + } + } + dispatched=!undispatch(); + } + } + } + finally + { + if (dispatched) + { + dispatched=!undispatch(); + while (dispatched) + { + LOG.warn("SCEP.run() finally DISPATCHED"); + dispatched=!undispatch(); + } + } + } + } - /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.io.nio.ChannelEndPoint#close() - */ - @Override - public void close() throws IOException - { - // On unix systems there is a JVM issue that if you cancel before closing, it can - // cause the selector to block waiting for a channel to close and that channel can - // block waiting for the remote end. But on windows, if you don't cancel before a - // close, then the selector can block anyway! - // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318 - if (WORK_AROUND_JVM_BUG_6346658) - { - try - { - SelectionKey key = _key; - if (key!=null) - key.cancel(); - } - catch (Throwable e) - { - LOG.trace("",e); - } - } + /* ------------------------------------------------------------ */ + /* + * @see org.eclipse.io.nio.ChannelEndPoint#close() + */ + @Override + public void close() throws IOException + { + // On unix systems there is a JVM issue that if you cancel before closing, it can + // cause the selector to block waiting for a channel to close and that channel can + // block waiting for the remote end. But on windows, if you don't cancel before a + // close, then the selector can block anyway! + // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318 + if (WORK_AROUND_JVM_BUG_6346658) + { + try + { + SelectionKey key = _key; + if (key!=null) + key.cancel(); + } + catch (Throwable e) + { + LOG.trace("",e); + } + } - try - { - super.close(); - } - catch (IOException e) - { - LOG.trace("",e); - } - finally - { - updateKey(); - } - } + try + { + super.close(); + } + catch (IOException e) + { + LOG.trace("",e); + } + finally + { + updateKey(); + } + } - /* ------------------------------------------------------------ */ - @Override - public String toString() - { - // Do NOT use synchronized (this) - // because it's very easy to deadlock when debugging is enabled. - // We do a best effort to print the right toString() and that's it. - SelectionKey key = _key; - String keyString = ""; - if (key != null) - { - if (key.isValid()) - { - if (key.isReadable()) - keyString += "r"; - if (key.isWritable()) - keyString += "w"; - } - else - { - keyString += "!"; - } - } - else - { - keyString += "-"; - } - return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}", - hashCode(), - _socket.getRemoteSocketAddress(), - _socket.getLocalSocketAddress(), - _state, - isOpen(), - isInputShutdown(), - isOutputShutdown(), - _readBlocked, - _writeBlocked, - _writable, - _interestOps, - keyString, - _connection); - } + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + // Do NOT use synchronized (this) + // because it's very easy to deadlock when debugging is enabled. + // We do a best effort to print the right toString() and that's it. + SelectionKey key = _key; + String keyString = ""; + if (key != null) + { + if (key.isValid()) + { + if (key.isReadable()) + keyString += "r"; + if (key.isWritable()) + keyString += "w"; + } + else + { + keyString += "!"; + } + } + else + { + keyString += "-"; + } + return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}", + hashCode(), + _socket.getRemoteSocketAddress(), + _socket.getLocalSocketAddress(), + _state, + isOpen(), + isInputShutdown(), + isOutputShutdown(), + _readBlocked, + _writeBlocked, + _writable, + _interestOps, + keyString, + _connection); + } - /* ------------------------------------------------------------ */ - public SelectSet getSelectSet() - { - return _selectSet; - } + /* ------------------------------------------------------------ */ + public SelectSet getSelectSet() + { + return _selectSet; + } - /* ------------------------------------------------------------ */ - /** - * Don't set the SoTimeout - * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) - */ - @Override - public void setMaxIdleTime(int timeMs) throws IOException - { - _maxIdleTime=timeMs; - } + /* ------------------------------------------------------------ */ + /** + * Don't set the SoTimeout + * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) + */ + @Override + public void setMaxIdleTime(int timeMs) throws IOException + { + _maxIdleTime=timeMs; + } }