Mercurial Hosting > luan
changeset 865:6b210bb66c63
remove ThreadPool
line wrap: on
line diff
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Sun Oct 02 20:38:06 2016 -0600 @@ -25,6 +25,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Locale; +import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; @@ -42,827 +43,827 @@ */ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint { - public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); + public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); - private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); - private final SelectorManager.SelectSet _selectSet; - private final SelectorManager _manager; - private SelectionKey _key; - private final Runnable _handler = new Runnable() - { - public void run() { handle(); } - }; + private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); + private final SelectorManager.SelectSet _selectSet; + private final SelectorManager _manager; + private SelectionKey _key; + private final Runnable _handler = new Runnable() + { + public void run() { handle(); } + }; - /** The desired value for {@link SelectionKey#interestOps()} */ - private int _interestOps; + /** The desired value for {@link SelectionKey#interestOps()} */ + private int _interestOps; - /** - * The connection instance is the handler for any IO activity on the endpoint. - * There is a different type of connection for HTTP, AJP, WebSocket and - * ProxyConnect. The connection may change for an SCEP as it is upgraded - * from HTTP to proxy connect or websocket. - */ - private volatile AsyncConnection _connection; + /** + * The connection instance is the handler for any IO activity on the endpoint. + * There is a different type of connection for HTTP, AJP, WebSocket and + * ProxyConnect. The connection may change for an SCEP as it is upgraded + * from HTTP to proxy connect or websocket. + */ + private volatile AsyncConnection _connection; - private static final int STATE_NEEDS_DISPATCH=-1; - private static final int STATE_UNDISPATCHED=0; - private static final int STATE_DISPATCHED=1; - private static final int STATE_ASYNC=2; - private int _state; - - private boolean _onIdle; + private static final int STATE_NEEDS_DISPATCH=-1; + private static final int STATE_UNDISPATCHED=0; + private static final int STATE_DISPATCHED=1; + private static final int STATE_ASYNC=2; + private int _state; + + private boolean _onIdle; - /** true if the last write operation succeed and wrote all offered bytes */ - private volatile boolean _writable = true; + /** true if the last write operation succeed and wrote all offered bytes */ + private volatile boolean _writable = true; - /** True if a thread has is blocked in {@link #blockReadable(long)} */ - private boolean _readBlocked; + /** True if a thread has is blocked in {@link #blockReadable(long)} */ + private boolean _readBlocked; - /** True if a thread has is blocked in {@link #blockWritable(long)} */ - private boolean _writeBlocked; + /** True if a thread has is blocked in {@link #blockWritable(long)} */ + private boolean _writeBlocked; - /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ - private boolean _open; + /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ + private boolean _open; - private volatile long _idleTimestamp; - private volatile boolean _checkIdle; - - private boolean _interruptable; + private volatile long _idleTimestamp; + private volatile boolean _checkIdle; + + private boolean _interruptable; - private boolean _ishut; + private boolean _ishut; - /* ------------------------------------------------------------ */ - public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) - throws IOException - { - super(channel, maxIdleTime); + /* ------------------------------------------------------------ */ + public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) + throws IOException + { + super(channel, maxIdleTime); - _manager = selectSet.getManager(); - _selectSet = selectSet; - _state=STATE_UNDISPATCHED; - _onIdle=false; - _open=true; - _key = key; + _manager = selectSet.getManager(); + _selectSet = selectSet; + _state=STATE_UNDISPATCHED; + _onIdle=false; + _open=true; + _key = key; - setCheckForIdle(true); - } + setCheckForIdle(true); + } - /* ------------------------------------------------------------ */ - public SelectionKey getSelectionKey() - { - synchronized (this) - { - return _key; - } - } + /* ------------------------------------------------------------ */ + public SelectionKey getSelectionKey() + { + synchronized (this) + { + return _key; + } + } - /* ------------------------------------------------------------ */ - public SelectorManager getSelectManager() - { - return _manager; - } + /* ------------------------------------------------------------ */ + public SelectorManager getSelectManager() + { + return _manager; + } - /* ------------------------------------------------------------ */ - public Connection getConnection() - { - return _connection; - } + /* ------------------------------------------------------------ */ + public Connection getConnection() + { + return _connection; + } - /* ------------------------------------------------------------ */ - public void setConnection(Connection connection) - { - Connection old=_connection; - _connection=(AsyncConnection)connection; - if (old!=null && old!=_connection) - _manager.endPointUpgraded(this,old); - } + /* ------------------------------------------------------------ */ + public void setConnection(Connection connection) + { + Connection old=_connection; + _connection=(AsyncConnection)connection; + if (old!=null && old!=_connection) + _manager.endPointUpgraded(this,old); + } - /* ------------------------------------------------------------ */ - public long getIdleTimestamp() - { - return _idleTimestamp; - } + /* ------------------------------------------------------------ */ + public long getIdleTimestamp() + { + return _idleTimestamp; + } - /* ------------------------------------------------------------ */ - /** Called by selectSet to schedule handling - * - */ - public void schedule() - { - synchronized (this) - { - // If there is no key, then do nothing - if (_key == null || !_key.isValid()) - { - _readBlocked=false; - _writeBlocked=false; - this.notifyAll(); - return; - } + /* ------------------------------------------------------------ */ + /** Called by selectSet to schedule handling + * + */ + public void schedule() + { + synchronized (this) + { + // If there is no key, then do nothing + if (_key == null || !_key.isValid()) + { + _readBlocked=false; + _writeBlocked=false; + this.notifyAll(); + return; + } - // If there are threads dispatched reading and writing - if (_readBlocked || _writeBlocked) - { - // assert _dispatched; - if (_readBlocked && _key.isReadable()) - _readBlocked=false; - if (_writeBlocked && _key.isWritable()) - _writeBlocked=false; + // If there are threads dispatched reading and writing + if (_readBlocked || _writeBlocked) + { + // assert _dispatched; + if (_readBlocked && _key.isReadable()) + _readBlocked=false; + if (_writeBlocked && _key.isWritable()) + _writeBlocked=false; - // wake them up is as good as a dispatched. - this.notifyAll(); + // wake them up is as good as a dispatched. + this.notifyAll(); - // we are not interested in further selecting - _key.interestOps(0); - if (_state<STATE_DISPATCHED) - updateKey(); - return; - } + // we are not interested in further selecting + _key.interestOps(0); + if (_state<STATE_DISPATCHED) + updateKey(); + return; + } - // Remove writeable op - if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) - { - // Remove writeable op - _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; - _key.interestOps(_interestOps); - _writable = true; // Once writable is in ops, only removed with dispatch. - } + // Remove writeable op + if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) + { + // Remove writeable op + _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; + _key.interestOps(_interestOps); + _writable = true; // Once writable is in ops, only removed with dispatch. + } - // If dispatched, then deregister interest - if (_state>=STATE_DISPATCHED) - _key.interestOps(0); - else - { - // other wise do the dispatch - dispatch(); - if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0()) - { - _key.interestOps(0); - } - } - } - } + // If dispatched, then deregister interest + if (_state>=STATE_DISPATCHED) + _key.interestOps(0); + else + { + // other wise do the dispatch + dispatch(); + if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0()) + { + _key.interestOps(0); + } + } + } + } - /* ------------------------------------------------------------ */ - public void asyncDispatch() - { - synchronized(this) - { - switch(_state) - { - case STATE_NEEDS_DISPATCH: - case STATE_UNDISPATCHED: - dispatch(); - break; - - case STATE_DISPATCHED: - case STATE_ASYNC: - _state=STATE_ASYNC; - break; - } - } - } + /* ------------------------------------------------------------ */ + public void asyncDispatch() + { + synchronized(this) + { + switch(_state) + { + case STATE_NEEDS_DISPATCH: + case STATE_UNDISPATCHED: + dispatch(); + break; + + case STATE_DISPATCHED: + case STATE_ASYNC: + _state=STATE_ASYNC; + break; + } + } + } - /* ------------------------------------------------------------ */ - public void dispatch() - { - synchronized(this) - { - if (_state<=STATE_UNDISPATCHED) - { - if (_onIdle) - _state = STATE_NEEDS_DISPATCH; - else - { - _state = STATE_DISPATCHED; - boolean dispatched = _manager.dispatch(_handler); - if(!dispatched) - { - _state = STATE_NEEDS_DISPATCH; - LOG.warn("Dispatched Failed! "+this+" to "+_manager); - updateKey(); - } - } - } - } - } + /* ------------------------------------------------------------ */ + public void dispatch() + { + synchronized(this) + { + if (_state<=STATE_UNDISPATCHED) + { + if (_onIdle) + _state = STATE_NEEDS_DISPATCH; + else + { + _state = STATE_DISPATCHED; + try { + _manager.execute(_handler); + } catch(RejectedExecutionException e) { + _state = STATE_NEEDS_DISPATCH; + LOG.warn("Dispatched Failed! "+this+" to "+_manager); + updateKey(); + } + } + } + } + } - /* ------------------------------------------------------------ */ - /** - * Called when a dispatched thread is no longer handling the endpoint. - * The selection key operations are updated. - * @return If false is returned, the endpoint has been redispatched and - * thread must keep handling the endpoint. - */ - protected boolean undispatch() - { - synchronized (this) - { - switch(_state) - { - case STATE_ASYNC: - _state=STATE_DISPATCHED; - return false; + /* ------------------------------------------------------------ */ + /** + * Called when a dispatched thread is no longer handling the endpoint. + * The selection key operations are updated. + * @return If false is returned, the endpoint has been redispatched and + * thread must keep handling the endpoint. + */ + protected boolean undispatch() + { + synchronized (this) + { + switch(_state) + { + case STATE_ASYNC: + _state=STATE_DISPATCHED; + return false; - default: - _state=STATE_UNDISPATCHED; - updateKey(); - return true; - } - } - } + default: + _state=STATE_UNDISPATCHED; + updateKey(); + return true; + } + } + } - /* ------------------------------------------------------------ */ - public void cancelTimeout(Task task) - { - getSelectSet().cancelTimeout(task); - } + /* ------------------------------------------------------------ */ + public void cancelTimeout(Task task) + { + getSelectSet().cancelTimeout(task); + } - /* ------------------------------------------------------------ */ - public void scheduleTimeout(Task task, long timeoutMs) - { - getSelectSet().scheduleTimeout(task,timeoutMs); - } + /* ------------------------------------------------------------ */ + public void scheduleTimeout(Task task, long timeoutMs) + { + getSelectSet().scheduleTimeout(task,timeoutMs); + } - /* ------------------------------------------------------------ */ - public void setCheckForIdle(boolean check) - { - if (check) - { - _idleTimestamp=System.currentTimeMillis(); - _checkIdle=true; - } - else - _checkIdle=false; - } + /* ------------------------------------------------------------ */ + public void setCheckForIdle(boolean check) + { + if (check) + { + _idleTimestamp=System.currentTimeMillis(); + _checkIdle=true; + } + else + _checkIdle=false; + } - /* ------------------------------------------------------------ */ - public boolean isCheckForIdle() - { - return _checkIdle; - } + /* ------------------------------------------------------------ */ + public boolean isCheckForIdle() + { + return _checkIdle; + } - /* ------------------------------------------------------------ */ - protected void notIdle() - { - _idleTimestamp=System.currentTimeMillis(); - } + /* ------------------------------------------------------------ */ + protected void notIdle() + { + _idleTimestamp=System.currentTimeMillis(); + } - /* ------------------------------------------------------------ */ - public void checkIdleTimestamp(long now) - { - if (isCheckForIdle() && _maxIdleTime>0) - { - final long idleForMs=now-_idleTimestamp; + /* ------------------------------------------------------------ */ + public void checkIdleTimestamp(long now) + { + if (isCheckForIdle() && _maxIdleTime>0) + { + final long idleForMs=now-_idleTimestamp; - if (idleForMs>_maxIdleTime) - { - // Don't idle out again until onIdleExpired task completes. - setCheckForIdle(false); - _manager.dispatch(new Runnable() - { - public void run() - { - try - { - onIdleExpired(idleForMs); - } - finally - { - setCheckForIdle(true); - } - } - }); - } - } - } + if (idleForMs>_maxIdleTime) + { + // Don't idle out again until onIdleExpired task completes. + setCheckForIdle(false); + _manager.execute(new Runnable() + { + public void run() + { + try + { + onIdleExpired(idleForMs); + } + finally + { + setCheckForIdle(true); + } + } + }); + } + } + } - /* ------------------------------------------------------------ */ - public void onIdleExpired(long idleForMs) - { - try - { - synchronized (this) - { - _onIdle=true; - } + /* ------------------------------------------------------------ */ + public void onIdleExpired(long idleForMs) + { + try + { + synchronized (this) + { + _onIdle=true; + } - _connection.onIdleExpired(idleForMs); - } - finally - { - synchronized (this) - { - _onIdle=false; - if (_state==STATE_NEEDS_DISPATCH) - dispatch(); - } - } - } + _connection.onIdleExpired(idleForMs); + } + finally + { + synchronized (this) + { + _onIdle=false; + if (_state==STATE_NEEDS_DISPATCH) + dispatch(); + } + } + } - /* ------------------------------------------------------------ */ - @Override - public int fill(Buffer buffer) throws IOException - { - int fill=super.fill(buffer); - if (fill>0) - notIdle(); - return fill; - } + /* ------------------------------------------------------------ */ + @Override + public int fill(Buffer buffer) throws IOException + { + int fill=super.fill(buffer); + if (fill>0) + notIdle(); + return fill; + } - /* ------------------------------------------------------------ */ - @Override - public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException - { - int l = super.flush(header, buffer, trailer); + /* ------------------------------------------------------------ */ + @Override + public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException + { + int l = super.flush(header, buffer, trailer); - // If there was something to write and it wasn't written, then we are not writable. - if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) - { - synchronized (this) - { - _writable=false; - if (_state<STATE_DISPATCHED) - updateKey(); - } - } - else if (l>0) - { - _writable=true; - notIdle(); - } - return l; - } + // If there was something to write and it wasn't written, then we are not writable. + if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) + { + synchronized (this) + { + _writable=false; + if (_state<STATE_DISPATCHED) + updateKey(); + } + } + else if (l>0) + { + _writable=true; + notIdle(); + } + return l; + } - /* ------------------------------------------------------------ */ - /* - */ - @Override - public int flush(Buffer buffer) throws IOException - { - int l = super.flush(buffer); + /* ------------------------------------------------------------ */ + /* + */ + @Override + public int flush(Buffer buffer) throws IOException + { + int l = super.flush(buffer); - // If there was something to write and it wasn't written, then we are not writable. - if (l==0 && buffer!=null && buffer.hasContent()) - { - synchronized (this) - { - _writable=false; - if (_state<STATE_DISPATCHED) - updateKey(); - } - } - else if (l>0) - { - _writable=true; - notIdle(); - } + // If there was something to write and it wasn't written, then we are not writable. + if (l==0 && buffer!=null && buffer.hasContent()) + { + synchronized (this) + { + _writable=false; + if (_state<STATE_DISPATCHED) + updateKey(); + } + } + else if (l>0) + { + _writable=true; + notIdle(); + } - return l; - } + return l; + } - /* ------------------------------------------------------------ */ - /* - * Allows thread to block waiting for further events. - */ - @Override - public boolean blockReadable(long timeoutMs) throws IOException - { - synchronized (this) - { - if (isInputShutdown()) - throw new EofException(); + /* ------------------------------------------------------------ */ + /* + * Allows thread to block waiting for further events. + */ + @Override + public boolean blockReadable(long timeoutMs) throws IOException + { + synchronized (this) + { + if (isInputShutdown()) + throw new EofException(); - long now=_selectSet.getNow(); - long end=now+timeoutMs; - boolean check=isCheckForIdle(); - setCheckForIdle(true); - try - { - _readBlocked=true; - while (!isInputShutdown() && _readBlocked) - { - try - { - updateKey(); - this.wait(timeoutMs>0?(end-now):10000); - } - catch (final InterruptedException e) - { - LOG.warn("",e); - if (_interruptable) - throw new InterruptedIOException(){{this.initCause(e);}}; - } - finally - { - now=_selectSet.getNow(); - } + long now=_selectSet.getNow(); + long end=now+timeoutMs; + boolean check=isCheckForIdle(); + setCheckForIdle(true); + try + { + _readBlocked=true; + while (!isInputShutdown() && _readBlocked) + { + try + { + updateKey(); + this.wait(timeoutMs>0?(end-now):10000); + } + catch (final InterruptedException e) + { + LOG.warn("",e); + if (_interruptable) + throw new InterruptedIOException(){{this.initCause(e);}}; + } + finally + { + now=_selectSet.getNow(); + } - if (_readBlocked && timeoutMs>0 && now>=end) - return false; - } - } - finally - { - _readBlocked=false; - setCheckForIdle(check); - } - } - return true; - } + if (_readBlocked && timeoutMs>0 && now>=end) + return false; + } + } + finally + { + _readBlocked=false; + setCheckForIdle(check); + } + } + return true; + } - /* ------------------------------------------------------------ */ - /* - * Allows thread to block waiting for further events. - */ - @Override - public boolean blockWritable(long timeoutMs) throws IOException - { - synchronized (this) - { - if (isOutputShutdown()) - throw new EofException(); + /* ------------------------------------------------------------ */ + /* + * Allows thread to block waiting for further events. + */ + @Override + public boolean blockWritable(long timeoutMs) throws IOException + { + synchronized (this) + { + if (isOutputShutdown()) + throw new EofException(); - long now=_selectSet.getNow(); - long end=now+timeoutMs; - boolean check=isCheckForIdle(); - setCheckForIdle(true); - try - { - _writeBlocked=true; - while (_writeBlocked && !isOutputShutdown()) - { - try - { - updateKey(); - this.wait(timeoutMs>0?(end-now):10000); - } - catch (final InterruptedException e) - { - LOG.warn("",e); - if (_interruptable) - throw new InterruptedIOException(){{this.initCause(e);}}; - } - finally - { - now=_selectSet.getNow(); - } - if (_writeBlocked && timeoutMs>0 && now>=end) - return false; - } - } - finally - { - _writeBlocked=false; - setCheckForIdle(check); - } - } - return true; - } + long now=_selectSet.getNow(); + long end=now+timeoutMs; + boolean check=isCheckForIdle(); + setCheckForIdle(true); + try + { + _writeBlocked=true; + while (_writeBlocked && !isOutputShutdown()) + { + try + { + updateKey(); + this.wait(timeoutMs>0?(end-now):10000); + } + catch (final InterruptedException e) + { + LOG.warn("",e); + if (_interruptable) + throw new InterruptedIOException(){{this.initCause(e);}}; + } + finally + { + now=_selectSet.getNow(); + } + if (_writeBlocked && timeoutMs>0 && now>=end) + return false; + } + } + finally + { + _writeBlocked=false; + setCheckForIdle(check); + } + } + return true; + } - /* ------------------------------------------------------------ */ - /** Set the interruptable mode of the endpoint. - * If set to false (default), then interrupts are assumed to be spurious - * and blocking operations continue unless the endpoint has been closed. - * If true, then interrupts of blocking operations result in InterruptedIOExceptions - * being thrown. - * @param interupable - */ - public void setInterruptable(boolean interupable) - { - synchronized (this) - { - _interruptable=interupable; - } - } + /* ------------------------------------------------------------ */ + /** Set the interruptable mode of the endpoint. + * If set to false (default), then interrupts are assumed to be spurious + * and blocking operations continue unless the endpoint has been closed. + * If true, then interrupts of blocking operations result in InterruptedIOExceptions + * being thrown. + * @param interupable + */ + public void setInterruptable(boolean interupable) + { + synchronized (this) + { + _interruptable=interupable; + } + } - /* ------------------------------------------------------------ */ - public boolean isInterruptable() - { - return _interruptable; - } - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite() - */ - public void scheduleWrite() - { - if (_writable) - LOG.debug("Required scheduleWrite {}",this); + /* ------------------------------------------------------------ */ + public boolean isInterruptable() + { + return _interruptable; + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite() + */ + public void scheduleWrite() + { + if (_writable) + LOG.debug("Required scheduleWrite {}",this); - _writable=false; - updateKey(); - } + _writable=false; + updateKey(); + } - /* ------------------------------------------------------------ */ - public boolean isWritable() - { - return _writable; - } + /* ------------------------------------------------------------ */ + public boolean isWritable() + { + return _writable; + } - /* ------------------------------------------------------------ */ - public boolean hasProgressed() - { - return false; - } + /* ------------------------------------------------------------ */ + public boolean hasProgressed() + { + return false; + } - /* ------------------------------------------------------------ */ - /** - * Updates selection key. Adds operations types to the selection key as needed. No operations - * are removed as this is only done during dispatch. This method records the new key and - * schedules a call to doUpdateKey to do the keyChange - */ - private void updateKey() - { - final boolean changed; - synchronized (this) - { - int current_ops=-1; - if (getChannel().isOpen()) - { - boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); - boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); + /* ------------------------------------------------------------ */ + /** + * Updates selection key. Adds operations types to the selection key as needed. No operations + * are removed as this is only done during dispatch. This method records the new key and + * schedules a call to doUpdateKey to do the keyChange + */ + private void updateKey() + { + final boolean changed; + synchronized (this) + { + int current_ops=-1; + if (getChannel().isOpen()) + { + boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); + boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); - _interestOps = - ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) - | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); - try - { - current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); - } - catch(Exception e) - { - _key=null; - LOG.trace("",e); - } - } - changed=_interestOps!=current_ops; - } + _interestOps = + ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) + | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); + try + { + current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); + } + catch(Exception e) + { + _key=null; + LOG.trace("",e); + } + } + changed=_interestOps!=current_ops; + } - if(changed) - { - _selectSet.addChange(this); - _selectSet.wakeup(); - } - } + if(changed) + { + _selectSet.addChange(this); + _selectSet.wakeup(); + } + } - /* ------------------------------------------------------------ */ - /** - * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey - */ - void doUpdateKey() - { - synchronized (this) - { - if (getChannel().isOpen()) - { - if (_interestOps>0) - { - if (_key==null || !_key.isValid()) - { - SelectableChannel sc = (SelectableChannel)getChannel(); - if (sc.isRegistered()) - { - updateKey(); - } - else - { - try - { - _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this); - } - catch (Exception e) - { - LOG.trace("",e); - if (_key!=null && _key.isValid()) - { - _key.cancel(); - } + /* ------------------------------------------------------------ */ + /** + * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey + */ + void doUpdateKey() + { + synchronized (this) + { + if (getChannel().isOpen()) + { + if (_interestOps>0) + { + if (_key==null || !_key.isValid()) + { + SelectableChannel sc = (SelectableChannel)getChannel(); + if (sc.isRegistered()) + { + updateKey(); + } + else + { + try + { + _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this); + } + catch (Exception e) + { + LOG.trace("",e); + if (_key!=null && _key.isValid()) + { + _key.cancel(); + } - if (_open) - { - _selectSet.destroyEndPoint(this); - } - _open=false; - _key = null; - } - } - } - else - { - _key.interestOps(_interestOps); - } - } - else - { - if (_key!=null && _key.isValid()) - _key.interestOps(0); - else - _key=null; - } - } - else - { - if (_key!=null && _key.isValid()) - _key.cancel(); + if (_open) + { + _selectSet.destroyEndPoint(this); + } + _open=false; + _key = null; + } + } + } + else + { + _key.interestOps(_interestOps); + } + } + else + { + if (_key!=null && _key.isValid()) + _key.interestOps(0); + else + _key=null; + } + } + else + { + if (_key!=null && _key.isValid()) + _key.cancel(); - if (_open) - { - _open=false; - _selectSet.destroyEndPoint(this); - } - _key = null; - } - } - } + if (_open) + { + _open=false; + _selectSet.destroyEndPoint(this); + } + _key = null; + } + } + } - /* ------------------------------------------------------------ */ - /* - */ - protected void handle() - { - boolean dispatched=true; - try - { - while(dispatched) - { - try - { - while(true) - { - final AsyncConnection next = (AsyncConnection)_connection.handle(); - if (next!=_connection) - { - LOG.debug("{} replaced {}",next,_connection); - Connection old=_connection; - _connection=next; - _manager.endPointUpgraded(this,old); - continue; - } - break; - } - } - catch (ClosedChannelException e) - { - LOG.trace("",e); - } - catch (EofException e) - { - LOG.debug("EOF", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (IOException e) - { - LOG.warn(e.toString()); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (Throwable e) - { - LOG.warn("handle failed", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - finally - { - if (!_ishut && isInputShutdown() && isOpen()) - { - _ishut=true; - try - { - _connection.onInputShutdown(); - } - catch(Throwable x) - { - LOG.warn("onInputShutdown failed", x); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - finally - { - updateKey(); - } - } - dispatched=!undispatch(); - } - } - } - finally - { - if (dispatched) - { - dispatched=!undispatch(); - while (dispatched) - { - LOG.warn("SCEP.run() finally DISPATCHED"); - dispatched=!undispatch(); - } - } - } - } + /* ------------------------------------------------------------ */ + /* + */ + protected void handle() + { + boolean dispatched=true; + try + { + while(dispatched) + { + try + { + while(true) + { + final AsyncConnection next = (AsyncConnection)_connection.handle(); + if (next!=_connection) + { + LOG.debug("{} replaced {}",next,_connection); + Connection old=_connection; + _connection=next; + _manager.endPointUpgraded(this,old); + continue; + } + break; + } + } + catch (ClosedChannelException e) + { + LOG.trace("",e); + } + catch (EofException e) + { + LOG.debug("EOF", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (IOException e) + { + LOG.warn(e.toString()); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (Throwable e) + { + LOG.warn("handle failed", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + finally + { + if (!_ishut && isInputShutdown() && isOpen()) + { + _ishut=true; + try + { + _connection.onInputShutdown(); + } + catch(Throwable x) + { + LOG.warn("onInputShutdown failed", x); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + finally + { + updateKey(); + } + } + dispatched=!undispatch(); + } + } + } + finally + { + if (dispatched) + { + dispatched=!undispatch(); + while (dispatched) + { + LOG.warn("SCEP.run() finally DISPATCHED"); + dispatched=!undispatch(); + } + } + } + } - /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.io.nio.ChannelEndPoint#close() - */ - @Override - public void close() throws IOException - { - // On unix systems there is a JVM issue that if you cancel before closing, it can - // cause the selector to block waiting for a channel to close and that channel can - // block waiting for the remote end. But on windows, if you don't cancel before a - // close, then the selector can block anyway! - // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318 - if (WORK_AROUND_JVM_BUG_6346658) - { - try - { - SelectionKey key = _key; - if (key!=null) - key.cancel(); - } - catch (Throwable e) - { - LOG.trace("",e); - } - } + /* ------------------------------------------------------------ */ + /* + * @see org.eclipse.io.nio.ChannelEndPoint#close() + */ + @Override + public void close() throws IOException + { + // On unix systems there is a JVM issue that if you cancel before closing, it can + // cause the selector to block waiting for a channel to close and that channel can + // block waiting for the remote end. But on windows, if you don't cancel before a + // close, then the selector can block anyway! + // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318 + if (WORK_AROUND_JVM_BUG_6346658) + { + try + { + SelectionKey key = _key; + if (key!=null) + key.cancel(); + } + catch (Throwable e) + { + LOG.trace("",e); + } + } - try - { - super.close(); - } - catch (IOException e) - { - LOG.trace("",e); - } - finally - { - updateKey(); - } - } + try + { + super.close(); + } + catch (IOException e) + { + LOG.trace("",e); + } + finally + { + updateKey(); + } + } - /* ------------------------------------------------------------ */ - @Override - public String toString() - { - // Do NOT use synchronized (this) - // because it's very easy to deadlock when debugging is enabled. - // We do a best effort to print the right toString() and that's it. - SelectionKey key = _key; - String keyString = ""; - if (key != null) - { - if (key.isValid()) - { - if (key.isReadable()) - keyString += "r"; - if (key.isWritable()) - keyString += "w"; - } - else - { - keyString += "!"; - } - } - else - { - keyString += "-"; - } - return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}", - hashCode(), - _socket.getRemoteSocketAddress(), - _socket.getLocalSocketAddress(), - _state, - isOpen(), - isInputShutdown(), - isOutputShutdown(), - _readBlocked, - _writeBlocked, - _writable, - _interestOps, - keyString, - _connection); - } + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + // Do NOT use synchronized (this) + // because it's very easy to deadlock when debugging is enabled. + // We do a best effort to print the right toString() and that's it. + SelectionKey key = _key; + String keyString = ""; + if (key != null) + { + if (key.isValid()) + { + if (key.isReadable()) + keyString += "r"; + if (key.isWritable()) + keyString += "w"; + } + else + { + keyString += "!"; + } + } + else + { + keyString += "-"; + } + return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}", + hashCode(), + _socket.getRemoteSocketAddress(), + _socket.getLocalSocketAddress(), + _state, + isOpen(), + isInputShutdown(), + isOutputShutdown(), + _readBlocked, + _writeBlocked, + _writable, + _interestOps, + keyString, + _connection); + } - /* ------------------------------------------------------------ */ - public SelectSet getSelectSet() - { - return _selectSet; - } + /* ------------------------------------------------------------ */ + public SelectSet getSelectSet() + { + return _selectSet; + } - /* ------------------------------------------------------------ */ - /** - * Don't set the SoTimeout - * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) - */ - @Override - public void setMaxIdleTime(int timeMs) throws IOException - { - _maxIdleTime=timeMs; - } + /* ------------------------------------------------------------ */ + /** + * Don't set the SoTimeout + * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) + */ + @Override + public void setMaxIdleTime(int timeMs) throws IOException + { + _maxIdleTime=timeMs; + } }
--- a/src/org/eclipse/jetty/io/nio/SelectorManager.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java Sun Oct 02 20:38:06 2016 -0600 @@ -58,977 +58,974 @@ */ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { - public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); + public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); - private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); - private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); - private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); - private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); + private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); + private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); + private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); + private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); - private int _maxIdleTime; - private int _lowResourcesMaxIdleTime; - private long _lowResourcesConnections; - private SelectSet[] _selectSet; - private int _selectSets=1; - private volatile int _set=0; - private boolean _deferringInterestedOps0=true; - private int _selectorPriorityDelta=0; + private int _maxIdleTime; + private int _lowResourcesMaxIdleTime; + private long _lowResourcesConnections; + private SelectSet[] _selectSet; + private int _selectSets=1; + private volatile int _set=0; + private boolean _deferringInterestedOps0=true; + private int _selectorPriorityDelta=0; - /* ------------------------------------------------------------ */ - /** - * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. - * @see #setLowResourcesMaxIdleTime(long) - */ - public void setMaxIdleTime(long maxIdleTime) - { - _maxIdleTime=(int)maxIdleTime; - } + /* ------------------------------------------------------------ */ + /** + * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. + * @see #setLowResourcesMaxIdleTime(long) + */ + public void setMaxIdleTime(long maxIdleTime) + { + _maxIdleTime=(int)maxIdleTime; + } - /* ------------------------------------------------------------ */ - /** - * @param selectSets number of select sets to create - */ - public void setSelectSets(int selectSets) - { - long lrc = _lowResourcesConnections * _selectSets; - _selectSets=selectSets; - _lowResourcesConnections=lrc/_selectSets; - } + /* ------------------------------------------------------------ */ + /** + * @param selectSets number of select sets to create + */ + public void setSelectSets(int selectSets) + { + long lrc = _lowResourcesConnections * _selectSets; + _selectSets=selectSets; + _lowResourcesConnections=lrc/_selectSets; + } - /* ------------------------------------------------------------ */ - /** - * @return the max idle time - */ - public long getMaxIdleTime() - { - return _maxIdleTime; - } + /* ------------------------------------------------------------ */ + /** + * @return the max idle time + */ + public long getMaxIdleTime() + { + return _maxIdleTime; + } - /* ------------------------------------------------------------ */ - /** - * @return the number of select sets in use - */ - public int getSelectSets() - { - return _selectSets; - } + /* ------------------------------------------------------------ */ + /** + * @return the number of select sets in use + */ + public int getSelectSets() + { + return _selectSets; + } - /* ------------------------------------------------------------ */ - /** - * @param i - * @return The select set - */ - public SelectSet getSelectSet(int i) - { - return _selectSet[i]; - } + /* ------------------------------------------------------------ */ + /** + * @param i + * @return The select set + */ + public SelectSet getSelectSet(int i) + { + return _selectSet[i]; + } - /* ------------------------------------------------------------ */ - /** Register a channel - * @param channel - * @param att Attached Object - */ - public void register(SocketChannel channel, Object att) - { - // The ++ increment here is not atomic, but it does not matter. - // so long as the value changes sometimes, then connections will - // be distributed over the available sets. + /* ------------------------------------------------------------ */ + /** Register a channel + * @param channel + * @param att Attached Object + */ + public void register(SocketChannel channel, Object att) + { + // The ++ increment here is not atomic, but it does not matter. + // so long as the value changes sometimes, then connections will + // be distributed over the available sets. - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet[] sets=_selectSet; - if (sets!=null) - { - SelectSet set=sets[s]; - set.addChange(channel,att); - set.wakeup(); - } - } + int s=_set++; + if (s<0) + s=-s; + s=s%_selectSets; + SelectSet[] sets=_selectSet; + if (sets!=null) + { + SelectSet set=sets[s]; + set.addChange(channel,att); + set.wakeup(); + } + } - /* ------------------------------------------------------------ */ - /** Register a channel - * @param channel - */ - public void register(SocketChannel channel) - { - // The ++ increment here is not atomic, but it does not matter. - // so long as the value changes sometimes, then connections will - // be distributed over the available sets. + /* ------------------------------------------------------------ */ + /** Register a channel + * @param channel + */ + public void register(SocketChannel channel) + { + // The ++ increment here is not atomic, but it does not matter. + // so long as the value changes sometimes, then connections will + // be distributed over the available sets. - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet[] sets=_selectSet; - if (sets!=null) - { - SelectSet set=sets[s]; - set.addChange(channel); - set.wakeup(); - } - } + int s=_set++; + if (s<0) + s=-s; + s=s%_selectSets; + SelectSet[] sets=_selectSet; + if (sets!=null) + { + SelectSet set=sets[s]; + set.addChange(channel); + set.wakeup(); + } + } - /* ------------------------------------------------------------ */ - /** Register a {@link ServerSocketChannel} - * @param acceptChannel - */ - public void register(ServerSocketChannel acceptChannel) - { - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet set=_selectSet[s]; - set.addChange(acceptChannel); - set.wakeup(); - } + /* ------------------------------------------------------------ */ + /** Register a {@link ServerSocketChannel} + * @param acceptChannel + */ + public void register(ServerSocketChannel acceptChannel) + { + int s=_set++; + if (s<0) + s=-s; + s=s%_selectSets; + SelectSet set=_selectSet[s]; + set.addChange(acceptChannel); + set.wakeup(); + } - /* ------------------------------------------------------------ */ - /** - * @return delta The value to add to the selector thread priority. - */ - public int getSelectorPriorityDelta() - { - return _selectorPriorityDelta; - } + /* ------------------------------------------------------------ */ + /** + * @return delta The value to add to the selector thread priority. + */ + public int getSelectorPriorityDelta() + { + return _selectorPriorityDelta; + } - /* ------------------------------------------------------------ */ - /** Set the selector thread priorty delta. - * @param delta The value to add to the selector thread priority. - */ - public void setSelectorPriorityDelta(int delta) - { - _selectorPriorityDelta=delta; - } + /* ------------------------------------------------------------ */ + /** Set the selector thread priorty delta. + * @param delta The value to add to the selector thread priority. + */ + public void setSelectorPriorityDelta(int delta) + { + _selectorPriorityDelta=delta; + } - /* ------------------------------------------------------------ */ - /** - * @return the lowResourcesConnections - */ - public long getLowResourcesConnections() - { - return _lowResourcesConnections*_selectSets; - } + /* ------------------------------------------------------------ */ + /** + * @return the lowResourcesConnections + */ + public long getLowResourcesConnections() + { + return _lowResourcesConnections*_selectSets; + } - /* ------------------------------------------------------------ */ - /** - * Set the number of connections, which if exceeded places this manager in low resources state. - * This is not an exact measure as the connection count is averaged over the select sets. - * @param lowResourcesConnections the number of connections - * @see #setLowResourcesMaxIdleTime(long) - */ - public void setLowResourcesConnections(long lowResourcesConnections) - { - _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; - } + /* ------------------------------------------------------------ */ + /** + * Set the number of connections, which if exceeded places this manager in low resources state. + * This is not an exact measure as the connection count is averaged over the select sets. + * @param lowResourcesConnections the number of connections + * @see #setLowResourcesMaxIdleTime(long) + */ + public void setLowResourcesConnections(long lowResourcesConnections) + { + _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; + } - /* ------------------------------------------------------------ */ - /** - * @return the lowResourcesMaxIdleTime - */ - public long getLowResourcesMaxIdleTime() - { - return _lowResourcesMaxIdleTime; - } + /* ------------------------------------------------------------ */ + /** + * @return the lowResourcesMaxIdleTime + */ + public long getLowResourcesMaxIdleTime() + { + return _lowResourcesMaxIdleTime; + } - /* ------------------------------------------------------------ */ - /** - * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} - * @see #setMaxIdleTime(long) - */ - public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) - { - _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; - } + /* ------------------------------------------------------------ */ + /** + * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} + * @see #setMaxIdleTime(long) + */ + public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) + { + _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; + } - /* ------------------------------------------------------------------------------- */ - public abstract boolean dispatch(Runnable task); + /* ------------------------------------------------------------------------------- */ + public abstract void execute(Runnable task); - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see org.eclipse.component.AbstractLifeCycle#doStart() - */ - @Override - protected void doStart() throws Exception - { - _selectSet = new SelectSet[_selectSets]; - for (int i=0;i<_selectSet.length;i++) - _selectSet[i]= new SelectSet(i); + /* ------------------------------------------------------------ */ + /* (non-Javadoc) + * @see org.eclipse.component.AbstractLifeCycle#doStart() + */ + @Override + protected void doStart() throws Exception + { + _selectSet = new SelectSet[_selectSets]; + for (int i=0;i<_selectSet.length;i++) + _selectSet[i]= new SelectSet(i); - super.doStart(); + super.doStart(); - // start a thread to Select - for (int i=0;i<getSelectSets();i++) - { - final int id=i; - boolean selecting=dispatch(new Runnable() - { - public void run() - { - String name=Thread.currentThread().getName(); - int priority=Thread.currentThread().getPriority(); - try - { - SelectSet[] sets=_selectSet; - if (sets==null) - return; - SelectSet set=sets[id]; + // start a thread to Select + for (int i=0;i<getSelectSets();i++) + { + final int id=i; + execute(new Runnable() + { + public void run() + { + String name=Thread.currentThread().getName(); + int priority=Thread.currentThread().getPriority(); + try + { + SelectSet[] sets=_selectSet; + if (sets==null) + return; + SelectSet set=sets[id]; - Thread.currentThread().setName(name+" Selector"+id); - if (getSelectorPriorityDelta()!=0) - Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta()); - LOG.debug("Starting {} on {}",Thread.currentThread(),this); - while (isRunning()) - { - try - { - set.doSelect(); - } - catch(IOException e) - { - LOG.trace("",e); - } - catch(Exception e) - { - LOG.warn("",e); - } - } - } - finally - { - LOG.debug("Stopped {} on {}",Thread.currentThread(),this); - Thread.currentThread().setName(name); - if (getSelectorPriorityDelta()!=0) - Thread.currentThread().setPriority(priority); - } - } + Thread.currentThread().setName(name+" Selector"+id); + if (getSelectorPriorityDelta()!=0) + Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta()); + LOG.debug("Starting {} on {}",Thread.currentThread(),this); + while (isRunning()) + { + try + { + set.doSelect(); + } + catch(IOException e) + { + LOG.trace("",e); + } + catch(Exception e) + { + LOG.warn("",e); + } + } + } + finally + { + LOG.debug("Stopped {} on {}",Thread.currentThread(),this); + Thread.currentThread().setName(name); + if (getSelectorPriorityDelta()!=0) + Thread.currentThread().setPriority(priority); + } + } - }); - - if (!selecting) - throw new IllegalStateException("!Selecting"); - } - } + }); + } + } - /* ------------------------------------------------------------------------------- */ - @Override - protected void doStop() throws Exception - { - SelectSet[] sets= _selectSet; - _selectSet=null; - if (sets!=null) - { - for (SelectSet set : sets) - { - if (set!=null) - set.stop(); - } - } - super.doStop(); - } + /* ------------------------------------------------------------------------------- */ + @Override + protected void doStop() throws Exception + { + SelectSet[] sets= _selectSet; + _selectSet=null; + if (sets!=null) + { + for (SelectSet set : sets) + { + if (set!=null) + set.stop(); + } + } + super.doStop(); + } - /* ------------------------------------------------------------ */ - /** - * @param endpoint - */ - protected abstract void endPointClosed(SelectChannelEndPoint endpoint); + /* ------------------------------------------------------------ */ + /** + * @param endpoint + */ + protected abstract void endPointClosed(SelectChannelEndPoint endpoint); - /* ------------------------------------------------------------ */ - /** - * @param endpoint - */ - protected abstract void endPointOpened(SelectChannelEndPoint endpoint); + /* ------------------------------------------------------------ */ + /** + * @param endpoint + */ + protected abstract void endPointOpened(SelectChannelEndPoint endpoint); - /* ------------------------------------------------------------ */ - protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); + /* ------------------------------------------------------------ */ + protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); - /* ------------------------------------------------------------------------------- */ - public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); + /* ------------------------------------------------------------------------------- */ + public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); - /* ------------------------------------------------------------ */ - /** - * Create a new end point - * @param channel - * @param selectSet - * @param sKey the selection key - * @return the new endpoint {@link SelectChannelEndPoint} - * @throws IOException - */ - protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; + /* ------------------------------------------------------------ */ + /** + * Create a new end point + * @param channel + * @param selectSet + * @param sKey the selection key + * @return the new endpoint {@link SelectChannelEndPoint} + * @throws IOException + */ + protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; - /* ------------------------------------------------------------------------------- */ - protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) - { - LOG.warn(ex+","+channel+","+attachment); - LOG.debug("",ex); - } + /* ------------------------------------------------------------------------------- */ + protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) + { + LOG.warn(ex+","+channel+","+attachment); + LOG.debug("",ex); + } - /* ------------------------------------------------------------ */ - public String dump() - { - return AggregateLifeCycle.dump(this); - } + /* ------------------------------------------------------------ */ + public String dump() + { + return AggregateLifeCycle.dump(this); + } - /* ------------------------------------------------------------ */ - public void dump(Appendable out, String indent) throws IOException - { - AggregateLifeCycle.dumpObject(out,this); - AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); - } + /* ------------------------------------------------------------ */ + public void dump(Appendable out, String indent) throws IOException + { + AggregateLifeCycle.dumpObject(out,this); + AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); + } - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - public class SelectSet implements Dumpable - { - private final int _setID; - private final Timeout _timeout; + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + public class SelectSet implements Dumpable + { + private final int _setID; + private final Timeout _timeout; - private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); + private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); - private volatile Selector _selector; + private volatile Selector _selector; - private volatile Thread _selecting; - private int _busySelects; - private long _monitorNext; - private boolean _pausing; - private boolean _paused; - private volatile long _idleTick; - private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); + private volatile Thread _selecting; + private int _busySelects; + private long _monitorNext; + private boolean _pausing; + private boolean _paused; + private volatile long _idleTick; + private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); - /* ------------------------------------------------------------ */ - SelectSet(int acceptorID) throws Exception - { - _setID=acceptorID; + /* ------------------------------------------------------------ */ + SelectSet(int acceptorID) throws Exception + { + _setID=acceptorID; - _idleTick = System.currentTimeMillis(); - _timeout = new Timeout(this); - _timeout.setDuration(0L); + _idleTick = System.currentTimeMillis(); + _timeout = new Timeout(this); + _timeout.setDuration(0L); - // create a selector; - _selector = Selector.open(); - _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; - } + // create a selector; + _selector = Selector.open(); + _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; + } - /* ------------------------------------------------------------ */ - public void addChange(Object change) - { - _changes.add(change); - } + /* ------------------------------------------------------------ */ + public void addChange(Object change) + { + _changes.add(change); + } - /* ------------------------------------------------------------ */ - public void addChange(SelectableChannel channel, Object att) - { - if (att==null) - addChange(channel); - else if (att instanceof EndPoint) - addChange(att); - else - addChange(new ChannelAndAttachment(channel,att)); - } + /* ------------------------------------------------------------ */ + public void addChange(SelectableChannel channel, Object att) + { + if (att==null) + addChange(channel); + else if (att instanceof EndPoint) + addChange(att); + else + addChange(new ChannelAndAttachment(channel,att)); + } - /* ------------------------------------------------------------ */ - /** - * Select and dispatch tasks found from changes and the selector. - * - * @throws IOException - */ - public void doSelect() throws IOException - { - try - { - _selecting=Thread.currentThread(); - final Selector selector=_selector; - // Stopped concurrently ? - if (selector == null) - return; + /* ------------------------------------------------------------ */ + /** + * Select and dispatch tasks found from changes and the selector. + * + * @throws IOException + */ + public void doSelect() throws IOException + { + try + { + _selecting=Thread.currentThread(); + final Selector selector=_selector; + // Stopped concurrently ? + if (selector == null) + return; - // Make any key changes required - Object change; - int changes=_changes.size(); - while (changes-->0 && (change=_changes.poll())!=null) - { - Channel ch=null; - SelectionKey key=null; + // Make any key changes required + Object change; + int changes=_changes.size(); + while (changes-->0 && (change=_changes.poll())!=null) + { + Channel ch=null; + SelectionKey key=null; - try - { - if (change instanceof EndPoint) - { - // Update the operations for a key. - SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; - ch=endpoint.getChannel(); - endpoint.doUpdateKey(); - } - else if (change instanceof ChannelAndAttachment) - { - // finish accepting/connecting this connection - final ChannelAndAttachment asc = (ChannelAndAttachment)change; - final SelectableChannel channel=asc._channel; - ch=channel; - final Object att = asc._attachment; + try + { + if (change instanceof EndPoint) + { + // Update the operations for a key. + SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; + ch=endpoint.getChannel(); + endpoint.doUpdateKey(); + } + else if (change instanceof ChannelAndAttachment) + { + // finish accepting/connecting this connection + final ChannelAndAttachment asc = (ChannelAndAttachment)change; + final SelectableChannel channel=asc._channel; + ch=channel; + final Object att = asc._attachment; - if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) - { - key = channel.register(selector,SelectionKey.OP_READ,att); - SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else if (channel.isOpen()) - { - key = channel.register(selector,SelectionKey.OP_CONNECT,att); - } - } - else if (change instanceof SocketChannel) - { - // Newly registered channel - final SocketChannel channel=(SocketChannel)change; - ch=channel; - key = channel.register(selector,SelectionKey.OP_READ,null); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else if (change instanceof ChangeTask) - { - ((Runnable)change).run(); - } - else if (change instanceof Runnable) - { - dispatch((Runnable)change); - } - else - throw new IllegalArgumentException(change.toString()); - } - catch (CancelledKeyException e) - { - LOG.trace("",e); - } - catch (Throwable e) - { - if (isRunning()) - LOG.warn("",e); - else - LOG.debug("",e); + if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) + { + key = channel.register(selector,SelectionKey.OP_READ,att); + SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); + key.attach(endpoint); + endpoint.schedule(); + } + else if (channel.isOpen()) + { + key = channel.register(selector,SelectionKey.OP_CONNECT,att); + } + } + else if (change instanceof SocketChannel) + { + // Newly registered channel + final SocketChannel channel=(SocketChannel)change; + ch=channel; + key = channel.register(selector,SelectionKey.OP_READ,null); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); + key.attach(endpoint); + endpoint.schedule(); + } + else if (change instanceof ChangeTask) + { + ((Runnable)change).run(); + } + else if (change instanceof Runnable) + { + execute((Runnable)change); + } + else + throw new IllegalArgumentException(change.toString()); + } + catch (CancelledKeyException e) + { + LOG.trace("",e); + } + catch (Throwable e) + { + if (isRunning()) + LOG.warn("",e); + else + LOG.debug("",e); - try - { - if (ch!=null) - ch.close(); - } - catch(IOException e2) - { - LOG.debug("",e2); - } - } - } + try + { + if (ch!=null) + ch.close(); + } + catch(IOException e2) + { + LOG.debug("",e2); + } + } + } - // Do and instant select to see if any connections can be handled. - int selected=selector.selectNow(); + // Do and instant select to see if any connections can be handled. + int selected=selector.selectNow(); - long now=System.currentTimeMillis(); + long now=System.currentTimeMillis(); - // if no immediate things to do - if (selected==0 && selector.selectedKeys().isEmpty()) - { - // If we are in pausing mode - if (_pausing) - { - try - { - Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop - } - catch(InterruptedException e) - { - LOG.trace("",e); - } - now=System.currentTimeMillis(); - } + // if no immediate things to do + if (selected==0 && selector.selectedKeys().isEmpty()) + { + // If we are in pausing mode + if (_pausing) + { + try + { + Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop + } + catch(InterruptedException e) + { + LOG.trace("",e); + } + now=System.currentTimeMillis(); + } - // workout how long to wait in select - _timeout.setNow(now); - long to_next_timeout=_timeout.getTimeToNext(); + // workout how long to wait in select + _timeout.setNow(now); + long to_next_timeout=_timeout.getTimeToNext(); - long wait = _changes.size()==0?__IDLE_TICK:0L; - if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) - wait = to_next_timeout; + long wait = _changes.size()==0?__IDLE_TICK:0L; + if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) + wait = to_next_timeout; - // If we should wait with a select - if (wait>0) - { - long before=now; - selector.select(wait); - now = System.currentTimeMillis(); - _timeout.setNow(now); + // If we should wait with a select + if (wait>0) + { + long before=now; + selector.select(wait); + now = System.currentTimeMillis(); + _timeout.setNow(now); - // If we are monitoring for busy selector - // and this select did not wait more than 1ms - if (__MONITOR_PERIOD>0 && now-before <=1) - { - // count this as a busy select and if there have been too many this monitor cycle - if (++_busySelects>__MAX_SELECTS) - { - // Start injecting pauses - _pausing=true; + // If we are monitoring for busy selector + // and this select did not wait more than 1ms + if (__MONITOR_PERIOD>0 && now-before <=1) + { + // count this as a busy select and if there have been too many this monitor cycle + if (++_busySelects>__MAX_SELECTS) + { + // Start injecting pauses + _pausing=true; - // if this is the first pause - if (!_paused) - { - // Log and dump some status - _paused=true; - LOG.warn("Selector {} is too busy, pausing!",this); - } - } - } - } - } + // if this is the first pause + if (!_paused) + { + // Log and dump some status + _paused=true; + LOG.warn("Selector {} is too busy, pausing!",this); + } + } + } + } + } - // have we been destroyed while sleeping - if (_selector==null || !selector.isOpen()) - return; + // have we been destroyed while sleeping + if (_selector==null || !selector.isOpen()) + return; - // Look for things to do - for (SelectionKey key: selector.selectedKeys()) - { - SocketChannel channel=null; + // Look for things to do + for (SelectionKey key: selector.selectedKeys()) + { + SocketChannel channel=null; - try - { - if (!key.isValid()) - { - key.cancel(); - SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); - if (endpoint != null) - endpoint.doUpdateKey(); - continue; - } + try + { + if (!key.isValid()) + { + key.cancel(); + SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); + if (endpoint != null) + endpoint.doUpdateKey(); + continue; + } - Object att = key.attachment(); - if (att instanceof SelectChannelEndPoint) - { - if (key.isReadable()||key.isWritable()) - ((SelectChannelEndPoint)att).schedule(); - } - else if (key.isConnectable()) - { - // Complete a connection of a registered channel - channel = (SocketChannel)key.channel(); - boolean connected=false; - try - { - connected=channel.finishConnect(); - } - catch(Exception e) - { - connectionFailed(channel,e,att); - } - finally - { - if (connected) - { - key.interestOps(SelectionKey.OP_READ); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else - { - key.cancel(); - channel.close(); - } - } - } - else - { - // Wrap readable registered channel in an endpoint - channel = (SocketChannel)key.channel(); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - if (key.isReadable()) - endpoint.schedule(); - } - key = null; - } - catch (CancelledKeyException e) - { - LOG.trace("",e); - } - catch (Exception e) - { - if (isRunning()) - LOG.warn("",e); - else - LOG.trace("",e); + Object att = key.attachment(); + if (att instanceof SelectChannelEndPoint) + { + if (key.isReadable()||key.isWritable()) + ((SelectChannelEndPoint)att).schedule(); + } + else if (key.isConnectable()) + { + // Complete a connection of a registered channel + channel = (SocketChannel)key.channel(); + boolean connected=false; + try + { + connected=channel.finishConnect(); + } + catch(Exception e) + { + connectionFailed(channel,e,att); + } + finally + { + if (connected) + { + key.interestOps(SelectionKey.OP_READ); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); + key.attach(endpoint); + endpoint.schedule(); + } + else + { + key.cancel(); + channel.close(); + } + } + } + else + { + // Wrap readable registered channel in an endpoint + channel = (SocketChannel)key.channel(); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); + key.attach(endpoint); + if (key.isReadable()) + endpoint.schedule(); + } + key = null; + } + catch (CancelledKeyException e) + { + LOG.trace("",e); + } + catch (Exception e) + { + if (isRunning()) + LOG.warn("",e); + else + LOG.trace("",e); - try - { - if (channel!=null) - channel.close(); - } - catch(IOException e2) - { - LOG.debug("",e2); - } + try + { + if (channel!=null) + channel.close(); + } + catch(IOException e2) + { + LOG.debug("",e2); + } - if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) - key.cancel(); - } - } + if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) + key.cancel(); + } + } - // Everything always handled - selector.selectedKeys().clear(); + // Everything always handled + selector.selectedKeys().clear(); - now=System.currentTimeMillis(); - _timeout.setNow(now); - Task task = _timeout.expired(); - while (task!=null) - { - if (task instanceof Runnable) - dispatch((Runnable)task); - task = _timeout.expired(); - } + now=System.currentTimeMillis(); + _timeout.setNow(now); + Task task = _timeout.expired(); + while (task!=null) + { + if (task instanceof Runnable) + execute((Runnable)task); + task = _timeout.expired(); + } - // Idle tick - if (now-_idleTick>__IDLE_TICK) - { - _idleTick=now; + // Idle tick + if (now-_idleTick>__IDLE_TICK) + { + _idleTick=now; - final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) - ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) - :now; + final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) + ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) + :now; - dispatch(new Runnable() - { - public void run() - { - for (SelectChannelEndPoint endp:_endPoints.keySet()) - { - endp.checkIdleTimestamp(idle_now); - } - } - public String toString() {return "Idle-"+super.toString();} - }); + execute(new Runnable() + { + public void run() + { + for (SelectChannelEndPoint endp:_endPoints.keySet()) + { + endp.checkIdleTimestamp(idle_now); + } + } + public String toString() {return "Idle-"+super.toString();} + }); - } + } - // Reset busy select monitor counts - if (__MONITOR_PERIOD>0 && now>_monitorNext) - { - _busySelects=0; - _pausing=false; - _monitorNext=now+__MONITOR_PERIOD; + // Reset busy select monitor counts + if (__MONITOR_PERIOD>0 && now>_monitorNext) + { + _busySelects=0; + _pausing=false; + _monitorNext=now+__MONITOR_PERIOD; - } - } - catch (ClosedSelectorException e) - { - if (isRunning()) - LOG.warn("",e); - else - LOG.trace("",e); - } - catch (CancelledKeyException e) - { - LOG.trace("",e); - } - finally - { - _selecting=null; - } - } + } + } + catch (ClosedSelectorException e) + { + if (isRunning()) + LOG.warn("",e); + else + LOG.trace("",e); + } + catch (CancelledKeyException e) + { + LOG.trace("",e); + } + finally + { + _selecting=null; + } + } - /* ------------------------------------------------------------ */ - private void renewSelector() - { - try - { - synchronized (this) - { - Selector selector=_selector; - if (selector==null) - return; - final Selector new_selector = Selector.open(); - for (SelectionKey k: selector.keys()) - { - if (!k.isValid() || k.interestOps()==0) - continue; + /* ------------------------------------------------------------ */ + private void renewSelector() + { + try + { + synchronized (this) + { + Selector selector=_selector; + if (selector==null) + return; + final Selector new_selector = Selector.open(); + for (SelectionKey k: selector.keys()) + { + if (!k.isValid() || k.interestOps()==0) + continue; - final SelectableChannel channel = k.channel(); - final Object attachment = k.attachment(); + final SelectableChannel channel = k.channel(); + final Object attachment = k.attachment(); - if (attachment==null) - addChange(channel); - else - addChange(channel,attachment); - } - _selector.close(); - _selector=new_selector; - } - } - catch(IOException e) - { - throw new RuntimeException("recreating selector",e); - } - } + if (attachment==null) + addChange(channel); + else + addChange(channel,attachment); + } + _selector.close(); + _selector=new_selector; + } + } + catch(IOException e) + { + throw new RuntimeException("recreating selector",e); + } + } - /* ------------------------------------------------------------ */ - public SelectorManager getManager() - { - return SelectorManager.this; - } + /* ------------------------------------------------------------ */ + public SelectorManager getManager() + { + return SelectorManager.this; + } - /* ------------------------------------------------------------ */ - public long getNow() - { - return _timeout.getNow(); - } + /* ------------------------------------------------------------ */ + public long getNow() + { + return _timeout.getNow(); + } - /* ------------------------------------------------------------ */ - /** - * @param task The task to timeout. If it implements Runnable, then - * expired will be called from a dispatched thread. - * - * @param timeoutMs - */ - public void scheduleTimeout(Timeout.Task task, long timeoutMs) - { - if (!(task instanceof Runnable)) - throw new IllegalArgumentException("!Runnable"); - _timeout.schedule(task, timeoutMs); - } + /* ------------------------------------------------------------ */ + /** + * @param task The task to timeout. If it implements Runnable, then + * expired will be called from a dispatched thread. + * + * @param timeoutMs + */ + public void scheduleTimeout(Timeout.Task task, long timeoutMs) + { + if (!(task instanceof Runnable)) + throw new IllegalArgumentException("!Runnable"); + _timeout.schedule(task, timeoutMs); + } - /* ------------------------------------------------------------ */ - public void cancelTimeout(Timeout.Task task) - { - task.cancel(); - } + /* ------------------------------------------------------------ */ + public void cancelTimeout(Timeout.Task task) + { + task.cancel(); + } - /* ------------------------------------------------------------ */ - public void wakeup() - { - try - { - Selector selector = _selector; - if (selector!=null) - selector.wakeup(); - } - catch(Exception e) - { - addChange(new ChangeTask() - { - public void run() - { - renewSelector(); - } - }); + /* ------------------------------------------------------------ */ + public void wakeup() + { + try + { + Selector selector = _selector; + if (selector!=null) + selector.wakeup(); + } + catch(Exception e) + { + addChange(new ChangeTask() + { + public void run() + { + renewSelector(); + } + }); - renewSelector(); - } - } + renewSelector(); + } + } - /* ------------------------------------------------------------ */ - private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException - { - SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); - LOG.debug("created {}",endp); - endPointOpened(endp); - _endPoints.put(endp,this); - return endp; - } + /* ------------------------------------------------------------ */ + private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException + { + SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); + LOG.debug("created {}",endp); + endPointOpened(endp); + _endPoints.put(endp,this); + return endp; + } - /* ------------------------------------------------------------ */ - public void destroyEndPoint(SelectChannelEndPoint endp) - { - LOG.debug("destroyEndPoint {}",endp); - _endPoints.remove(endp); - endPointClosed(endp); - } + /* ------------------------------------------------------------ */ + public void destroyEndPoint(SelectChannelEndPoint endp) + { + LOG.debug("destroyEndPoint {}",endp); + _endPoints.remove(endp); + endPointClosed(endp); + } - /* ------------------------------------------------------------ */ - Selector getSelector() - { - return _selector; - } + /* ------------------------------------------------------------ */ + Selector getSelector() + { + return _selector; + } - /* ------------------------------------------------------------ */ - void stop() throws Exception - { - // Spin for a while waiting for selector to complete - // to avoid unneccessary closed channel exceptions - try - { - for (int i=0;i<100 && _selecting!=null;i++) - { - wakeup(); - Thread.sleep(10); - } - } - catch(Exception e) - { - LOG.trace("",e); - } + /* ------------------------------------------------------------ */ + void stop() throws Exception + { + // Spin for a while waiting for selector to complete + // to avoid unneccessary closed channel exceptions + try + { + for (int i=0;i<100 && _selecting!=null;i++) + { + wakeup(); + Thread.sleep(10); + } + } + catch(Exception e) + { + LOG.trace("",e); + } - // close endpoints and selector - synchronized (this) - { - Selector selector=_selector; - for (SelectionKey key:selector.keys()) - { - if (key==null) - continue; - Object att=key.attachment(); - if (att instanceof EndPoint) - { - EndPoint endpoint = (EndPoint)att; - try - { - endpoint.close(); - } - catch(IOException e) - { - LOG.trace("",e); - } - } - } + // close endpoints and selector + synchronized (this) + { + Selector selector=_selector; + for (SelectionKey key:selector.keys()) + { + if (key==null) + continue; + Object att=key.attachment(); + if (att instanceof EndPoint) + { + EndPoint endpoint = (EndPoint)att; + try + { + endpoint.close(); + } + catch(IOException e) + { + LOG.trace("",e); + } + } + } - _timeout.cancelAll(); - try - { - selector=_selector; - if (selector != null) - selector.close(); - } - catch (IOException e) - { - LOG.trace("",e); - } - _selector=null; - } - } + _timeout.cancelAll(); + try + { + selector=_selector; + if (selector != null) + selector.close(); + } + catch (IOException e) + { + LOG.trace("",e); + } + _selector=null; + } + } - /* ------------------------------------------------------------ */ - public String dump() - { - return AggregateLifeCycle.dump(this); - } + /* ------------------------------------------------------------ */ + public String dump() + { + return AggregateLifeCycle.dump(this); + } - /* ------------------------------------------------------------ */ - public void dump(Appendable out, String indent) throws IOException - { - out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); + /* ------------------------------------------------------------ */ + public void dump(Appendable out, String indent) throws IOException + { + out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); - Thread selecting = _selecting; + Thread selecting = _selecting; - Object where = "not selecting"; - StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); - if (trace!=null) - { - for (StackTraceElement t:trace) - if (t.getClassName().startsWith("org.eclipse.jetty.")) - { - where=t; - break; - } - } + Object where = "not selecting"; + StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); + if (trace!=null) + { + for (StackTraceElement t:trace) + if (t.getClassName().startsWith("org.eclipse.jetty.")) + { + where=t; + break; + } + } - Selector selector=_selector; - if (selector!=null) - { - final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); - dump.add(where); + Selector selector=_selector; + if (selector!=null) + { + final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); + dump.add(where); - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); - addChange(new ChangeTask() - { - public void run() - { - dumpKeyState(dump); - latch.countDown(); - } - }); + addChange(new ChangeTask() + { + public void run() + { + dumpKeyState(dump); + latch.countDown(); + } + }); - try - { - latch.await(5,TimeUnit.SECONDS); - } - catch(InterruptedException e) - { - LOG.trace("",e); - } + try + { + latch.await(5,TimeUnit.SECONDS); + } + catch(InterruptedException e) + { + LOG.trace("",e); + } - AggregateLifeCycle.dump(out,indent,dump); - } - } + AggregateLifeCycle.dump(out,indent,dump); + } + } - /* ------------------------------------------------------------ */ - public void dumpKeyState(List<Object> dumpto) - { - Selector selector=_selector; - Set<SelectionKey> keys = selector.keys(); - dumpto.add(selector + " keys=" + keys.size()); - for (SelectionKey key: keys) - { - if (key.isValid()) - dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); - else - dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); - } - } + /* ------------------------------------------------------------ */ + public void dumpKeyState(List<Object> dumpto) + { + Selector selector=_selector; + Set<SelectionKey> keys = selector.keys(); + dumpto.add(selector + " keys=" + keys.size()); + for (SelectionKey key: keys) + { + if (key.isValid()) + dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); + else + dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); + } + } - /* ------------------------------------------------------------ */ - public String toString() - { - Selector selector=_selector; - return String.format("%s keys=%d selected=%d", - super.toString(), - selector != null && selector.isOpen() ? selector.keys().size() : -1, - selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); - } - } + /* ------------------------------------------------------------ */ + public String toString() + { + Selector selector=_selector; + return String.format("%s keys=%d selected=%d", + super.toString(), + selector != null && selector.isOpen() ? selector.keys().size() : -1, + selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); + } + } - /* ------------------------------------------------------------ */ - private static class ChannelAndAttachment - { - final SelectableChannel _channel; - final Object _attachment; + /* ------------------------------------------------------------ */ + private static class ChannelAndAttachment + { + final SelectableChannel _channel; + final Object _attachment; - public ChannelAndAttachment(SelectableChannel channel, Object attachment) - { - super(); - _channel = channel; - _attachment = attachment; - } - } + public ChannelAndAttachment(SelectableChannel channel, Object attachment) + { + super(); + _channel = channel; + _attachment = attachment; + } + } - /* ------------------------------------------------------------ */ - public boolean isDeferringInterestedOps0() - { - return _deferringInterestedOps0; - } + /* ------------------------------------------------------------ */ + public boolean isDeferringInterestedOps0() + { + return _deferringInterestedOps0; + } - /* ------------------------------------------------------------ */ - public void setDeferringInterestedOps0(boolean deferringInterestedOps0) - { - _deferringInterestedOps0 = deferringInterestedOps0; - } + /* ------------------------------------------------------------ */ + public void setDeferringInterestedOps0(boolean deferringInterestedOps0) + { + _deferringInterestedOps0 = deferringInterestedOps0; + } - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - private interface ChangeTask extends Runnable - {} + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private interface ChangeTask extends Runnable + {} }
--- a/src/org/eclipse/jetty/server/AbstractConnector.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/AbstractConnector.java Sun Oct 02 20:38:06 2016 -0600 @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import javax.servlet.ServletRequest; @@ -40,7 +41,6 @@ import org.eclipse.jetty.util.component.Dumpable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.eclipse.jetty.util.thread.ThreadPool; /** * Abstract Connector implementation. This abstract implementation of the Connector interface provides: @@ -112,7 +112,7 @@ } /* ------------------------------------------------------------ */ - public ThreadPool getThreadPool() + public ThreadPoolExecutor getThreadPool() { return _server.getThreadPool(); } @@ -297,11 +297,10 @@ { _acceptorThreads = new Thread[getAcceptors()]; - ThreadPool _threadPool = getThreadPool(); + ThreadPoolExecutor _threadPool = getThreadPool(); for (int i = 0; i < _acceptorThreads.length; i++) - if (!_threadPool.dispatch(new Acceptor(i))) - throw new IllegalStateException("!accepting"); - if (_threadPool.isLowOnThreads()) + _threadPool.execute(new Acceptor(i)); + if (_server.isLowOnThreads()) LOG.warn("insufficient threads configured for {}",this); } @@ -1012,9 +1011,9 @@ } /* ------------------------------------------------------------ */ - public boolean isLowResources() + public final boolean isLowResources() { - return getThreadPool().isLowOnThreads(); + return _server.isLowOnThreads(); } /* ------------------------------------------------------------ */
--- a/src/org/eclipse/jetty/server/AsyncContinuation.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/AsyncContinuation.java Sun Oct 02 20:38:06 2016 -0600 @@ -50,1111 +50,1111 @@ */ public class AsyncContinuation implements AsyncContext, Continuation { - private static final Logger LOG = LoggerFactory.getLogger(AsyncContinuation.class); + private static final Logger LOG = LoggerFactory.getLogger(AsyncContinuation.class); - private final static long DEFAULT_TIMEOUT=30000L; - - private final static ContinuationThrowable __exception = new ContinuationThrowable(); - - // STATES: - // handling() suspend() unhandle() resume() complete() doComplete() - // startAsync() dispatch() - // IDLE DISPATCHED - // DISPATCHED ASYNCSTARTED UNCOMPLETED - // ASYNCSTARTED ASYNCWAIT REDISPATCHING COMPLETING - // REDISPATCHING REDISPATCHED - // ASYNCWAIT REDISPATCH COMPLETING - // REDISPATCH REDISPATCHED - // REDISPATCHED ASYNCSTARTED UNCOMPLETED - // COMPLETING UNCOMPLETED UNCOMPLETED - // UNCOMPLETED COMPLETED - // COMPLETED - private static final int __IDLE=0; // Idle request - private static final int __DISPATCHED=1; // Request dispatched to filter/servlet - private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container - private static final int __REDISPATCHING=3;// resumed while dispatched - private static final int __ASYNCWAIT=4; // Suspended and parked - private static final int __REDISPATCH=5; // Has been scheduled - private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet - private static final int __COMPLETING=7; // complete while dispatched - private static final int __UNCOMPLETED=8; // Request is completable - private static final int __COMPLETED=9; // Request is complete - - /* ------------------------------------------------------------ */ - protected AbstractHttpConnection _connection; - private List<AsyncListener> _lastAsyncListeners; - private List<AsyncListener> _asyncListeners; - private List<ContinuationListener> _continuationListeners; + private final static long DEFAULT_TIMEOUT=30000L; + + private final static ContinuationThrowable __exception = new ContinuationThrowable(); + + // STATES: + // handling() suspend() unhandle() resume() complete() doComplete() + // startAsync() dispatch() + // IDLE DISPATCHED + // DISPATCHED ASYNCSTARTED UNCOMPLETED + // ASYNCSTARTED ASYNCWAIT REDISPATCHING COMPLETING + // REDISPATCHING REDISPATCHED + // ASYNCWAIT REDISPATCH COMPLETING + // REDISPATCH REDISPATCHED + // REDISPATCHED ASYNCSTARTED UNCOMPLETED + // COMPLETING UNCOMPLETED UNCOMPLETED + // UNCOMPLETED COMPLETED + // COMPLETED + private static final int __IDLE=0; // Idle request + private static final int __DISPATCHED=1; // Request dispatched to filter/servlet + private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container + private static final int __REDISPATCHING=3;// resumed while dispatched + private static final int __ASYNCWAIT=4; // Suspended and parked + private static final int __REDISPATCH=5; // Has been scheduled + private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet + private static final int __COMPLETING=7; // complete while dispatched + private static final int __UNCOMPLETED=8; // Request is completable + private static final int __COMPLETED=9; // Request is complete + + /* ------------------------------------------------------------ */ + protected AbstractHttpConnection _connection; + private List<AsyncListener> _lastAsyncListeners; + private List<AsyncListener> _asyncListeners; + private List<ContinuationListener> _continuationListeners; - /* ------------------------------------------------------------ */ - private int _state; - private boolean _initial; - private boolean _resumed; - private boolean _expired; - private volatile boolean _responseWrapped; - private long _timeoutMs=DEFAULT_TIMEOUT; - private AsyncEventState _event; - private volatile long _expireAt; - private volatile boolean _continuation; - - /* ------------------------------------------------------------ */ - protected AsyncContinuation() - { - _state=__IDLE; - _initial=true; - } + /* ------------------------------------------------------------ */ + private int _state; + private boolean _initial; + private boolean _resumed; + private boolean _expired; + private volatile boolean _responseWrapped; + private long _timeoutMs=DEFAULT_TIMEOUT; + private AsyncEventState _event; + private volatile long _expireAt; + private volatile boolean _continuation; + + /* ------------------------------------------------------------ */ + protected AsyncContinuation() + { + _state=__IDLE; + _initial=true; + } - /* ------------------------------------------------------------ */ - protected void setConnection(final AbstractHttpConnection connection) - { - synchronized(this) - { - _connection=connection; - } - } + /* ------------------------------------------------------------ */ + protected void setConnection(final AbstractHttpConnection connection) + { + synchronized(this) + { + _connection=connection; + } + } - /* ------------------------------------------------------------ */ - public void addListener(AsyncListener listener) - { - synchronized(this) - { - if (_asyncListeners==null) - _asyncListeners=new ArrayList<AsyncListener>(); - _asyncListeners.add(listener); - } - } + /* ------------------------------------------------------------ */ + public void addListener(AsyncListener listener) + { + synchronized(this) + { + if (_asyncListeners==null) + _asyncListeners=new ArrayList<AsyncListener>(); + _asyncListeners.add(listener); + } + } - /* ------------------------------------------------------------ */ - public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response) - { - synchronized(this) - { - // TODO handle the request/response ??? - if (_asyncListeners==null) - _asyncListeners=new ArrayList<AsyncListener>(); - _asyncListeners.add(listener); - } - } + /* ------------------------------------------------------------ */ + public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response) + { + synchronized(this) + { + // TODO handle the request/response ??? + if (_asyncListeners==null) + _asyncListeners=new ArrayList<AsyncListener>(); + _asyncListeners.add(listener); + } + } - /* ------------------------------------------------------------ */ - public void addContinuationListener(ContinuationListener listener) - { - synchronized(this) - { - if (_continuationListeners==null) - _continuationListeners=new ArrayList<ContinuationListener>(); - _continuationListeners.add(listener); - } - } + /* ------------------------------------------------------------ */ + public void addContinuationListener(ContinuationListener listener) + { + synchronized(this) + { + if (_continuationListeners==null) + _continuationListeners=new ArrayList<ContinuationListener>(); + _continuationListeners.add(listener); + } + } - /* ------------------------------------------------------------ */ - public void setTimeout(long ms) - { - synchronized(this) - { - _timeoutMs=ms; - } - } + /* ------------------------------------------------------------ */ + public void setTimeout(long ms) + { + synchronized(this) + { + _timeoutMs=ms; + } + } - /* ------------------------------------------------------------ */ - public long getTimeout() - { - synchronized(this) - { - return _timeoutMs; - } - } + /* ------------------------------------------------------------ */ + public long getTimeout() + { + synchronized(this) + { + return _timeoutMs; + } + } - /* ------------------------------------------------------------ */ - public AsyncEventState getAsyncEventState() - { - synchronized(this) - { - return _event; - } - } + /* ------------------------------------------------------------ */ + public AsyncEventState getAsyncEventState() + { + synchronized(this) + { + return _event; + } + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.continuation.Continuation#keepWrappers() - */ + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#keepWrappers() + */ - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped() - */ - public boolean isResponseWrapped() - { - return _responseWrapped; - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped() + */ + public boolean isResponseWrapped() + { + return _responseWrapped; + } - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see javax.servlet.ServletRequest#isInitial() - */ - public boolean isInitial() - { - synchronized(this) - { - return _initial; - } - } - - public boolean isContinuation() - { - return _continuation; - } - - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see javax.servlet.ServletRequest#isSuspended() - */ - public boolean isSuspended() - { - synchronized(this) - { - switch(_state) - { - case __ASYNCSTARTED: - case __REDISPATCHING: - case __COMPLETING: - case __ASYNCWAIT: - return true; - - default: - return false; - } - } - } - - /* ------------------------------------------------------------ */ - public boolean isSuspending() - { - synchronized(this) - { - switch(_state) - { - case __ASYNCSTARTED: - case __ASYNCWAIT: - return true; - - default: - return false; - } - } - } - - /* ------------------------------------------------------------ */ - public boolean isDispatchable() - { - synchronized(this) - { - switch(_state) - { - case __REDISPATCH: - case __REDISPATCHED: - case __REDISPATCHING: - case __COMPLETING: - return true; - - default: - return false; - } - } - } + /* ------------------------------------------------------------ */ + /* (non-Javadoc) + * @see javax.servlet.ServletRequest#isInitial() + */ + public boolean isInitial() + { + synchronized(this) + { + return _initial; + } + } + + public boolean isContinuation() + { + return _continuation; + } + + /* ------------------------------------------------------------ */ + /* (non-Javadoc) + * @see javax.servlet.ServletRequest#isSuspended() + */ + public boolean isSuspended() + { + synchronized(this) + { + switch(_state) + { + case __ASYNCSTARTED: + case __REDISPATCHING: + case __COMPLETING: + case __ASYNCWAIT: + return true; + + default: + return false; + } + } + } + + /* ------------------------------------------------------------ */ + public boolean isSuspending() + { + synchronized(this) + { + switch(_state) + { + case __ASYNCSTARTED: + case __ASYNCWAIT: + return true; + + default: + return false; + } + } + } + + /* ------------------------------------------------------------ */ + public boolean isDispatchable() + { + synchronized(this) + { + switch(_state) + { + case __REDISPATCH: + case __REDISPATCHED: + case __REDISPATCHING: + case __COMPLETING: + return true; + + default: + return false; + } + } + } - /* ------------------------------------------------------------ */ - @Override - public String toString() - { - synchronized (this) - { - return super.toString()+"@"+getStatusString(); - } - } + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + synchronized (this) + { + return super.toString()+"@"+getStatusString(); + } + } - /* ------------------------------------------------------------ */ - public String getStatusString() - { - synchronized (this) - { - return - ((_state==__IDLE)?"IDLE": - (_state==__DISPATCHED)?"DISPATCHED": - (_state==__ASYNCSTARTED)?"ASYNCSTARTED": - (_state==__ASYNCWAIT)?"ASYNCWAIT": - (_state==__REDISPATCHING)?"REDISPATCHING": - (_state==__REDISPATCH)?"REDISPATCH": - (_state==__REDISPATCHED)?"REDISPATCHED": - (_state==__COMPLETING)?"COMPLETING": - (_state==__UNCOMPLETED)?"UNCOMPLETED": - (_state==__COMPLETED)?"COMPLETE": - ("UNKNOWN?"+_state))+ - (_initial?",initial":"")+ - (_resumed?",resumed":"")+ - (_expired?",expired":""); - } - } + /* ------------------------------------------------------------ */ + public String getStatusString() + { + synchronized (this) + { + return + ((_state==__IDLE)?"IDLE": + (_state==__DISPATCHED)?"DISPATCHED": + (_state==__ASYNCSTARTED)?"ASYNCSTARTED": + (_state==__ASYNCWAIT)?"ASYNCWAIT": + (_state==__REDISPATCHING)?"REDISPATCHING": + (_state==__REDISPATCH)?"REDISPATCH": + (_state==__REDISPATCHED)?"REDISPATCHED": + (_state==__COMPLETING)?"COMPLETING": + (_state==__UNCOMPLETED)?"UNCOMPLETED": + (_state==__COMPLETED)?"COMPLETE": + ("UNKNOWN?"+_state))+ + (_initial?",initial":"")+ + (_resumed?",resumed":"")+ + (_expired?",expired":""); + } + } - /* ------------------------------------------------------------ */ - /** - * @return false if the handling of the request should not proceed - */ - protected boolean handling() - { - synchronized (this) - { - _continuation=false; - - switch(_state) - { - case __IDLE: - _initial=true; - _state=__DISPATCHED; - if (_lastAsyncListeners!=null) - _lastAsyncListeners.clear(); - if (_asyncListeners!=null) - _asyncListeners.clear(); - else - { - _asyncListeners=_lastAsyncListeners; - _lastAsyncListeners=null; - } - return true; - - case __COMPLETING: - _state=__UNCOMPLETED; - return false; + /* ------------------------------------------------------------ */ + /** + * @return false if the handling of the request should not proceed + */ + protected boolean handling() + { + synchronized (this) + { + _continuation=false; + + switch(_state) + { + case __IDLE: + _initial=true; + _state=__DISPATCHED; + if (_lastAsyncListeners!=null) + _lastAsyncListeners.clear(); + if (_asyncListeners!=null) + _asyncListeners.clear(); + else + { + _asyncListeners=_lastAsyncListeners; + _lastAsyncListeners=null; + } + return true; + + case __COMPLETING: + _state=__UNCOMPLETED; + return false; - case __ASYNCWAIT: - return false; - - case __REDISPATCH: - _state=__REDISPATCHED; - return true; + case __ASYNCWAIT: + return false; + + case __REDISPATCH: + _state=__REDISPATCHED; + return true; - default: - throw new IllegalStateException(this.getStatusString()); - } - } - } + default: + throw new IllegalStateException(this.getStatusString()); + } + } + } - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see javax.servlet.ServletRequest#suspend(long) - */ - private void doSuspend(final ServletContext context, - final ServletRequest request, - final ServletResponse response) - { - synchronized (this) - { - switch(_state) - { - case __DISPATCHED: - case __REDISPATCHED: - _resumed=false; - _expired=false; + /* ------------------------------------------------------------ */ + /* (non-Javadoc) + * @see javax.servlet.ServletRequest#suspend(long) + */ + private void doSuspend(final ServletContext context, + final ServletRequest request, + final ServletResponse response) + { + synchronized (this) + { + switch(_state) + { + case __DISPATCHED: + case __REDISPATCHED: + _resumed=false; + _expired=false; - if (_event==null || request!=_event.getSuppliedRequest() || response != _event.getSuppliedResponse() || context != _event.getServletContext()) - _event=new AsyncEventState(context,request,response); - else - { - _event._dispatchContext=null; - _event._pathInContext=null; - } - _state=__ASYNCSTARTED; - List<AsyncListener> recycle=_lastAsyncListeners; - _lastAsyncListeners=_asyncListeners; - _asyncListeners=recycle; - if (_asyncListeners!=null) - _asyncListeners.clear(); - break; + if (_event==null || request!=_event.getSuppliedRequest() || response != _event.getSuppliedResponse() || context != _event.getServletContext()) + _event=new AsyncEventState(context,request,response); + else + { + _event._dispatchContext=null; + _event._pathInContext=null; + } + _state=__ASYNCSTARTED; + List<AsyncListener> recycle=_lastAsyncListeners; + _lastAsyncListeners=_asyncListeners; + _asyncListeners=recycle; + if (_asyncListeners!=null) + _asyncListeners.clear(); + break; - default: - throw new IllegalStateException(this.getStatusString()); - } - } - - if (_lastAsyncListeners!=null) - { - for (AsyncListener listener : _lastAsyncListeners) - { - try - { - listener.onStartAsync(_event); - } - catch(Exception e) - { - LOG.warn("",e); - } - } - } - } + default: + throw new IllegalStateException(this.getStatusString()); + } + } + + if (_lastAsyncListeners!=null) + { + for (AsyncListener listener : _lastAsyncListeners) + { + try + { + listener.onStartAsync(_event); + } + catch(Exception e) + { + LOG.warn("",e); + } + } + } + } - /* ------------------------------------------------------------ */ - /** - * Signal that the HttpConnection has finished handling the request. - * For blocking connectors, this call may block if the request has - * been suspended (startAsync called). - * @return true if handling is complete, false if the request should - * be handled again (eg because of a resume that happened before unhandle was called) - */ - protected boolean unhandle() - { - synchronized (this) - { - switch(_state) - { - case __REDISPATCHED: - case __DISPATCHED: - _state=__UNCOMPLETED; - return true; + /* ------------------------------------------------------------ */ + /** + * Signal that the HttpConnection has finished handling the request. + * For blocking connectors, this call may block if the request has + * been suspended (startAsync called). + * @return true if handling is complete, false if the request should + * be handled again (eg because of a resume that happened before unhandle was called) + */ + protected boolean unhandle() + { + synchronized (this) + { + switch(_state) + { + case __REDISPATCHED: + case __DISPATCHED: + _state=__UNCOMPLETED; + return true; - case __IDLE: - throw new IllegalStateException(this.getStatusString()); + case __IDLE: + throw new IllegalStateException(this.getStatusString()); - case __ASYNCSTARTED: - _initial=false; - _state=__ASYNCWAIT; - scheduleTimeout(); // could block and change state. - if (_state==__ASYNCWAIT) - return true; - else if (_state==__COMPLETING) - { - _state=__UNCOMPLETED; - return true; - } - _initial=false; - _state=__REDISPATCHED; - return false; + case __ASYNCSTARTED: + _initial=false; + _state=__ASYNCWAIT; + scheduleTimeout(); // could block and change state. + if (_state==__ASYNCWAIT) + return true; + else if (_state==__COMPLETING) + { + _state=__UNCOMPLETED; + return true; + } + _initial=false; + _state=__REDISPATCHED; + return false; - case __REDISPATCHING: - _initial=false; - _state=__REDISPATCHED; - return false; + case __REDISPATCHING: + _initial=false; + _state=__REDISPATCHED; + return false; - case __COMPLETING: - _initial=false; - _state=__UNCOMPLETED; - return true; + case __COMPLETING: + _initial=false; + _state=__UNCOMPLETED; + return true; - default: - throw new IllegalStateException(this.getStatusString()); - } - } - } + default: + throw new IllegalStateException(this.getStatusString()); + } + } + } - /* ------------------------------------------------------------ */ - public void dispatch() - { - boolean dispatch=false; - synchronized (this) - { - switch(_state) - { - case __ASYNCSTARTED: - _state=__REDISPATCHING; - _resumed=true; - return; + /* ------------------------------------------------------------ */ + public void dispatch() + { + boolean dispatch=false; + synchronized (this) + { + switch(_state) + { + case __ASYNCSTARTED: + _state=__REDISPATCHING; + _resumed=true; + return; - case __ASYNCWAIT: - dispatch=!_expired; - _state=__REDISPATCH; - _resumed=true; - break; - - case __REDISPATCH: - return; - - default: - throw new IllegalStateException(this.getStatusString()); - } - } - - if (dispatch) - { - cancelTimeout(); - scheduleDispatch(); - } - } + case __ASYNCWAIT: + dispatch=!_expired; + _state=__REDISPATCH; + _resumed=true; + break; + + case __REDISPATCH: + return; + + default: + throw new IllegalStateException(this.getStatusString()); + } + } + + if (dispatch) + { + cancelTimeout(); + scheduleDispatch(); + } + } - /* ------------------------------------------------------------ */ - protected void expired() - { - final List<ContinuationListener> cListeners; - final List<AsyncListener> aListeners; - synchronized (this) - { - switch(_state) - { - case __ASYNCSTARTED: - case __ASYNCWAIT: - cListeners=_continuationListeners; - aListeners=_asyncListeners; - break; - default: - cListeners=null; - aListeners=null; - return; - } - _expired=true; - } - - if (aListeners!=null) - { - for (AsyncListener listener : aListeners) - { - try - { - listener.onTimeout(_event); - } - catch(Exception e) - { - LOG.debug("",e); - _connection.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,e); - break; - } - } - } - if (cListeners!=null) - { - for (ContinuationListener listener : cListeners) - { - try - { - listener.onTimeout(this); - } - catch(Exception e) - { - LOG.warn("",e); - } - } - } - - synchronized (this) - { - switch(_state) - { - case __ASYNCSTARTED: - case __ASYNCWAIT: - dispatch(); - break; - - default: - if (!_continuation) - _expired=false; - } - } + /* ------------------------------------------------------------ */ + protected void expired() + { + final List<ContinuationListener> cListeners; + final List<AsyncListener> aListeners; + synchronized (this) + { + switch(_state) + { + case __ASYNCSTARTED: + case __ASYNCWAIT: + cListeners=_continuationListeners; + aListeners=_asyncListeners; + break; + default: + cListeners=null; + aListeners=null; + return; + } + _expired=true; + } + + if (aListeners!=null) + { + for (AsyncListener listener : aListeners) + { + try + { + listener.onTimeout(_event); + } + catch(Exception e) + { + LOG.debug("",e); + _connection.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,e); + break; + } + } + } + if (cListeners!=null) + { + for (ContinuationListener listener : cListeners) + { + try + { + listener.onTimeout(this); + } + catch(Exception e) + { + LOG.warn("",e); + } + } + } + + synchronized (this) + { + switch(_state) + { + case __ASYNCSTARTED: + case __ASYNCWAIT: + dispatch(); + break; + + default: + if (!_continuation) + _expired=false; + } + } - scheduleDispatch(); - } - - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see javax.servlet.ServletRequest#complete() - */ - public void complete() - { - // just like resume, except don't set _resumed=true; - boolean dispatch=false; - synchronized (this) - { - switch(_state) - { - case __DISPATCHED: - case __REDISPATCHED: - throw new IllegalStateException(this.getStatusString()); + scheduleDispatch(); + } + + /* ------------------------------------------------------------ */ + /* (non-Javadoc) + * @see javax.servlet.ServletRequest#complete() + */ + public void complete() + { + // just like resume, except don't set _resumed=true; + boolean dispatch=false; + synchronized (this) + { + switch(_state) + { + case __DISPATCHED: + case __REDISPATCHED: + throw new IllegalStateException(this.getStatusString()); - case __ASYNCSTARTED: - _state=__COMPLETING; - return; - - case __ASYNCWAIT: - _state=__COMPLETING; - dispatch=!_expired; - break; - - default: - throw new IllegalStateException(this.getStatusString()); - } - } - - if (dispatch) - { - cancelTimeout(); - scheduleDispatch(); - } - } - - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see javax.servlet.ServletRequest#complete() - */ - public void errorComplete() - { - // just like complete except can overrule a prior dispatch call; - synchronized (this) - { - switch(_state) - { - case __REDISPATCHING: - case __ASYNCSTARTED: - _state=__COMPLETING; - _resumed=false; - return; - - case __COMPLETING: - return; - - default: - throw new IllegalStateException(this.getStatusString()); - } - } - } + case __ASYNCSTARTED: + _state=__COMPLETING; + return; + + case __ASYNCWAIT: + _state=__COMPLETING; + dispatch=!_expired; + break; + + default: + throw new IllegalStateException(this.getStatusString()); + } + } + + if (dispatch) + { + cancelTimeout(); + scheduleDispatch(); + } + } + + /* ------------------------------------------------------------ */ + /* (non-Javadoc) + * @see javax.servlet.ServletRequest#complete() + */ + public void errorComplete() + { + // just like complete except can overrule a prior dispatch call; + synchronized (this) + { + switch(_state) + { + case __REDISPATCHING: + case __ASYNCSTARTED: + _state=__COMPLETING; + _resumed=false; + return; + + case __COMPLETING: + return; + + default: + throw new IllegalStateException(this.getStatusString()); + } + } + } - /* ------------------------------------------------------------ */ - @Override - public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException - { - try - { - // TODO inject - return clazz.newInstance(); - } - catch(Exception e) - { - throw new ServletException(e); - } - } + /* ------------------------------------------------------------ */ + @Override + public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException + { + try + { + // TODO inject + return clazz.newInstance(); + } + catch(Exception e) + { + throw new ServletException(e); + } + } - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see javax.servlet.ServletRequest#complete() - */ - protected void doComplete(Throwable ex) - { - final List<ContinuationListener> cListeners; - final List<AsyncListener> aListeners; - synchronized (this) - { - switch(_state) - { - case __UNCOMPLETED: - _state=__COMPLETED; - cListeners=_continuationListeners; - aListeners=_asyncListeners; - break; - - default: - cListeners=null; - aListeners=null; - throw new IllegalStateException(this.getStatusString()); - } - } - - if (aListeners!=null) - { - for (AsyncListener listener : aListeners) - { - try - { - if (ex!=null) - { - _event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,ex); - _event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,ex.getMessage()); - listener.onError(_event); - } - else - listener.onComplete(_event); - } - catch(Exception e) - { - LOG.warn("",e); - } - } - } - if (cListeners!=null) - { - for (ContinuationListener listener : cListeners) - { - try - { - listener.onComplete(this); - } - catch(Exception e) - { - LOG.warn("",e); - } - } - } - } + /* ------------------------------------------------------------ */ + /* (non-Javadoc) + * @see javax.servlet.ServletRequest#complete() + */ + protected void doComplete(Throwable ex) + { + final List<ContinuationListener> cListeners; + final List<AsyncListener> aListeners; + synchronized (this) + { + switch(_state) + { + case __UNCOMPLETED: + _state=__COMPLETED; + cListeners=_continuationListeners; + aListeners=_asyncListeners; + break; + + default: + cListeners=null; + aListeners=null; + throw new IllegalStateException(this.getStatusString()); + } + } + + if (aListeners!=null) + { + for (AsyncListener listener : aListeners) + { + try + { + if (ex!=null) + { + _event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,ex); + _event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,ex.getMessage()); + listener.onError(_event); + } + else + listener.onComplete(_event); + } + catch(Exception e) + { + LOG.warn("",e); + } + } + } + if (cListeners!=null) + { + for (ContinuationListener listener : cListeners) + { + try + { + listener.onComplete(this); + } + catch(Exception e) + { + LOG.warn("",e); + } + } + } + } - /* ------------------------------------------------------------ */ - protected void recycle() - { - synchronized (this) - { - switch(_state) - { - case __DISPATCHED: - case __REDISPATCHED: - throw new IllegalStateException(getStatusString()); - default: - _state=__IDLE; - } - _initial = true; - _resumed=false; - _expired=false; - _responseWrapped=false; - cancelTimeout(); - _timeoutMs=DEFAULT_TIMEOUT; - _continuationListeners=null; - } - } - - /* ------------------------------------------------------------ */ - public void cancel() - { - synchronized (this) - { - cancelTimeout(); - _continuationListeners=null; - } - } + /* ------------------------------------------------------------ */ + protected void recycle() + { + synchronized (this) + { + switch(_state) + { + case __DISPATCHED: + case __REDISPATCHED: + throw new IllegalStateException(getStatusString()); + default: + _state=__IDLE; + } + _initial = true; + _resumed=false; + _expired=false; + _responseWrapped=false; + cancelTimeout(); + _timeoutMs=DEFAULT_TIMEOUT; + _continuationListeners=null; + } + } + + /* ------------------------------------------------------------ */ + public void cancel() + { + synchronized (this) + { + cancelTimeout(); + _continuationListeners=null; + } + } - /* ------------------------------------------------------------ */ - protected void scheduleDispatch() - { - EndPoint endp=_connection.getEndPoint(); - if (!endp.isBlocking()) - { - ((AsyncEndPoint)endp).asyncDispatch(); - } - } + /* ------------------------------------------------------------ */ + protected void scheduleDispatch() + { + EndPoint endp=_connection.getEndPoint(); + if (!endp.isBlocking()) + { + ((AsyncEndPoint)endp).asyncDispatch(); + } + } - /* ------------------------------------------------------------ */ - protected void scheduleTimeout() - { - EndPoint endp=_connection.getEndPoint(); - if (_timeoutMs>0) - { - if (endp.isBlocking()) - { - synchronized(this) - { - _expireAt = System.currentTimeMillis()+_timeoutMs; - long wait=_timeoutMs; - while (_expireAt>0 && wait>0 && _connection.getServer().isRunning()) - { - try - { - this.wait(wait); - } - catch (InterruptedException e) - { - LOG.trace("",e); - } - wait=_expireAt-System.currentTimeMillis(); - } + /* ------------------------------------------------------------ */ + protected void scheduleTimeout() + { + EndPoint endp=_connection.getEndPoint(); + if (_timeoutMs>0) + { + if (endp.isBlocking()) + { + synchronized(this) + { + _expireAt = System.currentTimeMillis()+_timeoutMs; + long wait=_timeoutMs; + while (_expireAt>0 && wait>0 && _connection.getServer().isRunning()) + { + try + { + this.wait(wait); + } + catch (InterruptedException e) + { + LOG.trace("",e); + } + wait=_expireAt-System.currentTimeMillis(); + } - if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning()) - { - expired(); - } - } - } - else - { - ((AsyncEndPoint)endp).scheduleTimeout(_event._timeout,_timeoutMs); - } - } - } + if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning()) + { + expired(); + } + } + } + else + { + ((AsyncEndPoint)endp).scheduleTimeout(_event._timeout,_timeoutMs); + } + } + } - /* ------------------------------------------------------------ */ - protected void cancelTimeout() - { - EndPoint endp=_connection.getEndPoint(); - if (endp.isBlocking()) - { - synchronized(this) - { - _expireAt=0; - this.notifyAll(); - } - } - else - { - final AsyncEventState event=_event; - if (event!=null) - { - ((AsyncEndPoint)endp).cancelTimeout(event._timeout); - } - } - } + /* ------------------------------------------------------------ */ + protected void cancelTimeout() + { + EndPoint endp=_connection.getEndPoint(); + if (endp.isBlocking()) + { + synchronized(this) + { + _expireAt=0; + this.notifyAll(); + } + } + else + { + final AsyncEventState event=_event; + if (event!=null) + { + ((AsyncEndPoint)endp).cancelTimeout(event._timeout); + } + } + } - /* ------------------------------------------------------------ */ - public boolean isCompleting() - { - synchronized (this) - { - return _state==__COMPLETING; - } - } - - /* ------------------------------------------------------------ */ - boolean isUncompleted() - { - synchronized (this) - { - return _state==__UNCOMPLETED; - } - } - - /* ------------------------------------------------------------ */ - public boolean isComplete() - { - synchronized (this) - { - return _state==__COMPLETED; - } - } + /* ------------------------------------------------------------ */ + public boolean isCompleting() + { + synchronized (this) + { + return _state==__COMPLETING; + } + } + + /* ------------------------------------------------------------ */ + boolean isUncompleted() + { + synchronized (this) + { + return _state==__UNCOMPLETED; + } + } + + /* ------------------------------------------------------------ */ + public boolean isComplete() + { + synchronized (this) + { + return _state==__COMPLETED; + } + } - /* ------------------------------------------------------------ */ - public boolean isAsyncStarted() - { - synchronized (this) - { - switch(_state) - { - case __ASYNCSTARTED: - case __REDISPATCHING: - case __REDISPATCH: - case __ASYNCWAIT: - return true; + /* ------------------------------------------------------------ */ + public boolean isAsyncStarted() + { + synchronized (this) + { + switch(_state) + { + case __ASYNCSTARTED: + case __REDISPATCHING: + case __REDISPATCH: + case __ASYNCWAIT: + return true; - default: - return false; - } - } - } + default: + return false; + } + } + } - /* ------------------------------------------------------------ */ - public boolean isAsync() - { - synchronized (this) - { - switch(_state) - { - case __IDLE: - case __DISPATCHED: - case __UNCOMPLETED: - case __COMPLETED: - return false; + /* ------------------------------------------------------------ */ + public boolean isAsync() + { + synchronized (this) + { + switch(_state) + { + case __IDLE: + case __DISPATCHED: + case __UNCOMPLETED: + case __COMPLETED: + return false; - default: - return true; - } - } - } + default: + return true; + } + } + } - /* ------------------------------------------------------------ */ - public void dispatch(ServletContext context, String path) - { - _event._dispatchContext=context; - _event.setPath(path); - dispatch(); - } + /* ------------------------------------------------------------ */ + public void dispatch(ServletContext context, String path) + { + _event._dispatchContext=context; + _event.setPath(path); + dispatch(); + } - /* ------------------------------------------------------------ */ - public void dispatch(String path) - { - _event.setPath(path); - dispatch(); - } + /* ------------------------------------------------------------ */ + public void dispatch(String path) + { + _event.setPath(path); + dispatch(); + } - /* ------------------------------------------------------------ */ - public Request getBaseRequest() - { - return _connection.getRequest(); - } - - /* ------------------------------------------------------------ */ - public ServletRequest getRequest() - { - if (_event!=null) - return _event.getSuppliedRequest(); - return _connection.getRequest(); - } + /* ------------------------------------------------------------ */ + public Request getBaseRequest() + { + return _connection.getRequest(); + } + + /* ------------------------------------------------------------ */ + public ServletRequest getRequest() + { + if (_event!=null) + return _event.getSuppliedRequest(); + return _connection.getRequest(); + } - /* ------------------------------------------------------------ */ - public ServletResponse getResponse() - { - if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null) - return _event.getSuppliedResponse(); - return _connection.getResponse(); - } + /* ------------------------------------------------------------ */ + public ServletResponse getResponse() + { + if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null) + return _event.getSuppliedResponse(); + return _connection.getResponse(); + } - /* ------------------------------------------------------------ */ - public void start(final Runnable run) - { - final AsyncEventState event=_event; - if (event!=null) - { - _connection.getServer().getThreadPool().dispatch(new Runnable() - { - public void run() - { - ((Context)event.getServletContext()).getContextHandler().handle(run); - } - }); - } - } + /* ------------------------------------------------------------ */ + public void start(final Runnable run) + { + final AsyncEventState event=_event; + if (event!=null) + { + _connection.getServer().getThreadPool().execute(new Runnable() + { + public void run() + { + ((Context)event.getServletContext()).getContextHandler().handle(run); + } + }); + } + } - /* ------------------------------------------------------------ */ - public boolean hasOriginalRequestAndResponse() - { - synchronized (this) - { - return (_event!=null && _event.getSuppliedRequest()==_connection._request && _event.getSuppliedResponse()==_connection._response); - } - } + /* ------------------------------------------------------------ */ + public boolean hasOriginalRequestAndResponse() + { + synchronized (this) + { + return (_event!=null && _event.getSuppliedRequest()==_connection._request && _event.getSuppliedResponse()==_connection._response); + } + } - /* ------------------------------------------------------------ */ - public ContextHandler getContextHandler() - { - final AsyncEventState event=_event; - if (event!=null) - return ((Context)event.getServletContext()).getContextHandler(); - return null; - } + /* ------------------------------------------------------------ */ + public ContextHandler getContextHandler() + { + final AsyncEventState event=_event; + if (event!=null) + return ((Context)event.getServletContext()).getContextHandler(); + return null; + } - /* ------------------------------------------------------------ */ - /** - * @see Continuation#isResumed() - */ - public boolean isResumed() - { - synchronized (this) - { - return _resumed; - } - } - /* ------------------------------------------------------------ */ - /** - * @see Continuation#isExpired() - */ - public boolean isExpired() - { - synchronized (this) - { - return _expired; - } - } + /* ------------------------------------------------------------ */ + /** + * @see Continuation#isResumed() + */ + public boolean isResumed() + { + synchronized (this) + { + return _resumed; + } + } + /* ------------------------------------------------------------ */ + /** + * @see Continuation#isExpired() + */ + public boolean isExpired() + { + synchronized (this) + { + return _expired; + } + } - /* ------------------------------------------------------------ */ - /** - * @see Continuation#resume() - */ - public void resume() - { - dispatch(); - } - + /* ------------------------------------------------------------ */ + /** + * @see Continuation#resume() + */ + public void resume() + { + dispatch(); + } + - /* ------------------------------------------------------------ */ - protected void startAsync(final ServletContext context, - final ServletRequest request, - final ServletResponse response) - { - synchronized (this) - { - _responseWrapped=!(response instanceof Response); - doSuspend(context,request,response); - if (request instanceof HttpServletRequest) - { - _event._pathInContext = URIUtil.addPaths(((HttpServletRequest)request).getServletPath(),((HttpServletRequest)request).getPathInfo()); - } - } - } + /* ------------------------------------------------------------ */ + protected void startAsync(final ServletContext context, + final ServletRequest request, + final ServletResponse response) + { + synchronized (this) + { + _responseWrapped=!(response instanceof Response); + doSuspend(context,request,response); + if (request instanceof HttpServletRequest) + { + _event._pathInContext = URIUtil.addPaths(((HttpServletRequest)request).getServletPath(),((HttpServletRequest)request).getPathInfo()); + } + } + } - /* ------------------------------------------------------------ */ - protected void startAsync() - { - _responseWrapped=false; - _continuation=false; - doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse()); - } + /* ------------------------------------------------------------ */ + protected void startAsync() + { + _responseWrapped=false; + _continuation=false; + doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse()); + } - - /* ------------------------------------------------------------ */ - /** - * @see Continuation#suspend() - */ - public void suspend(ServletResponse response) - { - _continuation=true; - _responseWrapped=!(response instanceof Response); - doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response); - } + + /* ------------------------------------------------------------ */ + /** + * @see Continuation#suspend() + */ + public void suspend(ServletResponse response) + { + _continuation=true; + _responseWrapped=!(response instanceof Response); + doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response); + } - /* ------------------------------------------------------------ */ - /** - * @see Continuation#suspend() - */ - public void suspend() - { - _responseWrapped=false; - _continuation=true; - doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse()); - } + /* ------------------------------------------------------------ */ + /** + * @see Continuation#suspend() + */ + public void suspend() + { + _responseWrapped=false; + _continuation=true; + doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse()); + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.continuation.Continuation#getServletResponse() - */ - public ServletResponse getServletResponse() - { - if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null) - return _event.getSuppliedResponse(); - return _connection.getResponse(); - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#getServletResponse() + */ + public ServletResponse getServletResponse() + { + if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null) + return _event.getSuppliedResponse(); + return _connection.getResponse(); + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String) - */ - public Object getAttribute(String name) - { - return _connection.getRequest().getAttribute(name); - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String) + */ + public Object getAttribute(String name) + { + return _connection.getRequest().getAttribute(name); + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String) - */ - public void removeAttribute(String name) - { - _connection.getRequest().removeAttribute(name); - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String) + */ + public void removeAttribute(String name) + { + _connection.getRequest().removeAttribute(name); + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object) - */ - public void setAttribute(String name, Object attribute) - { - _connection.getRequest().setAttribute(name,attribute); - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object) + */ + public void setAttribute(String name, Object attribute) + { + _connection.getRequest().setAttribute(name,attribute); + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.continuation.Continuation#undispatch() - */ - public void undispatch() - { - if (isSuspended()) - { - if (LOG.isDebugEnabled()) - throw new ContinuationThrowable(); - else - throw __exception; - } - throw new IllegalStateException("!suspended"); - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#undispatch() + */ + public void undispatch() + { + if (isSuspended()) + { + if (LOG.isDebugEnabled()) + throw new ContinuationThrowable(); + else + throw __exception; + } + throw new IllegalStateException("!suspended"); + } - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - public class AsyncTimeout extends Timeout.Task implements Runnable - { - @Override - public void expired() - { - AsyncContinuation.this.expired(); - } + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + public class AsyncTimeout extends Timeout.Task implements Runnable + { + @Override + public void expired() + { + AsyncContinuation.this.expired(); + } - @Override - public void run() - { - AsyncContinuation.this.expired(); - } - } + @Override + public void run() + { + AsyncContinuation.this.expired(); + } + } - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - public class AsyncEventState extends AsyncEvent - { - private final ServletContext _suspendedContext; - private ServletContext _dispatchContext; - private String _pathInContext; - private Timeout.Task _timeout= new AsyncTimeout(); - - public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response) - { - super(AsyncContinuation.this, request,response); - _suspendedContext=context; - // Get the base request So we can remember the initial paths - Request r=_connection.getRequest(); + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + public class AsyncEventState extends AsyncEvent + { + private final ServletContext _suspendedContext; + private ServletContext _dispatchContext; + private String _pathInContext; + private Timeout.Task _timeout= new AsyncTimeout(); + + public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response) + { + super(AsyncContinuation.this, request,response); + _suspendedContext=context; + // Get the base request So we can remember the initial paths + Request r=_connection.getRequest(); - // If we haven't been async dispatched before - if (r.getAttribute(AsyncContext.ASYNC_REQUEST_URI)==null) - { - // We are setting these attributes during startAsync, when the spec implies that - // they are only available after a call to AsyncContext.dispatch(...); - - // have we been forwarded before? - String uri=(String)r.getAttribute(RequestDispatcher.FORWARD_REQUEST_URI); - if (uri!=null) - { - r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,uri); - r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getAttribute(RequestDispatcher.FORWARD_CONTEXT_PATH)); - r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getAttribute(RequestDispatcher.FORWARD_SERVLET_PATH)); - r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getAttribute(RequestDispatcher.FORWARD_PATH_INFO)); - r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getAttribute(RequestDispatcher.FORWARD_QUERY_STRING)); - } - else - { - r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,r.getRequestURI()); - r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getContextPath()); - r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getServletPath()); - r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getPathInfo()); - r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getQueryString()); - } - } - } - - public ServletContext getSuspendedContext() - { - return _suspendedContext; - } - - public ServletContext getDispatchContext() - { - return _dispatchContext; - } - - public ServletContext getServletContext() - { - return _dispatchContext==null?_suspendedContext:_dispatchContext; - } - - public void setPath(String path) - { - _pathInContext=path; - } - - /* ------------------------------------------------------------ */ - /** - * @return The path in the context - */ - public String getPath() - { - return _pathInContext; - } - } + // If we haven't been async dispatched before + if (r.getAttribute(AsyncContext.ASYNC_REQUEST_URI)==null) + { + // We are setting these attributes during startAsync, when the spec implies that + // they are only available after a call to AsyncContext.dispatch(...); + + // have we been forwarded before? + String uri=(String)r.getAttribute(RequestDispatcher.FORWARD_REQUEST_URI); + if (uri!=null) + { + r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,uri); + r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getAttribute(RequestDispatcher.FORWARD_CONTEXT_PATH)); + r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getAttribute(RequestDispatcher.FORWARD_SERVLET_PATH)); + r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getAttribute(RequestDispatcher.FORWARD_PATH_INFO)); + r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getAttribute(RequestDispatcher.FORWARD_QUERY_STRING)); + } + else + { + r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,r.getRequestURI()); + r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getContextPath()); + r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getServletPath()); + r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getPathInfo()); + r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getQueryString()); + } + } + } + + public ServletContext getSuspendedContext() + { + return _suspendedContext; + } + + public ServletContext getDispatchContext() + { + return _dispatchContext; + } + + public ServletContext getServletContext() + { + return _dispatchContext==null?_suspendedContext:_dispatchContext; + } + + public void setPath(String path) + { + _pathInContext=path; + } + + /* ------------------------------------------------------------ */ + /** + * @return The path in the context + */ + public String getPath() + { + return _pathInContext; + } + } }
--- a/src/org/eclipse/jetty/server/Connector.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/Connector.java Sun Oct 02 20:38:06 2016 -0600 @@ -23,7 +23,6 @@ import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.thread.ThreadPool; /** HTTP Connector. * Implementations of this interface provide connectors for the HTTP protocol. @@ -267,7 +266,7 @@ /* ------------------------------------------------------------ */ /** Check if low on resources. * For most connectors, low resources is measured by calling - * {@link ThreadPool#isLowOnThreads()} on the connector threadpool + * {@link Server#isLowOnThreads()} on the connector threadpool * or the server threadpool if there is no connector threadpool. * <p> * For blocking connectors, low resources is used to trigger
--- a/src/org/eclipse/jetty/server/Server.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/Server.java Sun Oct 02 20:38:06 2016 -0600 @@ -21,6 +21,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Enumeration; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -40,9 +43,7 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ShutdownThread; -import org.eclipse.jetty.util.thread.ThreadPool; /* ------------------------------------------------------------ */ /** Jetty HTTP Servlet Server. @@ -60,7 +61,7 @@ private static final String __version = "8"; private final AttributesMap _attributes = new AttributesMap(); - private final ThreadPool _threadPool; + private final ThreadPoolExecutor _threadPool; private Connector[] _connectors; private boolean _sendServerVersion = true; //send Server: header private boolean _sendDateHeader = false; //send Date: header @@ -83,7 +84,7 @@ connector.setPort(port); setConnectors(new Connector[]{connector}); - _threadPool = new QueuedThreadPool(); + _threadPool = new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); addBean(_threadPool); } @@ -165,7 +166,7 @@ /** * @return Returns the threadPool. */ - public ThreadPool getThreadPool() + public ThreadPoolExecutor getThreadPool() { return _threadPool; } @@ -358,7 +359,8 @@ /* ------------------------------------------------------------ */ public void join() throws InterruptedException { - getThreadPool().join(); +// getThreadPool().join(); + _threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } /* ------------------------------------------------------------ */ @@ -515,6 +517,16 @@ public void setShutdown(boolean shutdown); } + + public final boolean isLowOnThreads() + { + ThreadPoolExecutor tpe = getThreadPool(); + // getActiveCount() locks the thread pool, so execute it last + return tpe.getPoolSize() == tpe.getMaximumPoolSize() && + tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount(); + } + + /* ------------------------------------------------------------ */ public static void main(String...args) throws Exception {
--- a/src/org/eclipse/jetty/server/bio/SocketConnector.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/bio/SocketConnector.java Sun Oct 02 20:38:06 2016 -0600 @@ -25,6 +25,8 @@ import java.net.SocketException; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.io.Buffer; @@ -56,270 +58,274 @@ */ public class SocketConnector extends AbstractConnector { - private static final Logger LOG = LoggerFactory.getLogger(SocketConnector.class); + private static final Logger LOG = LoggerFactory.getLogger(SocketConnector.class); - protected ServerSocket _serverSocket; - protected final Set<EndPoint> _connections; - protected volatile int _localPort=-1; + protected ServerSocket _serverSocket; + protected final Set<EndPoint> _connections; + protected volatile int _localPort=-1; - /* ------------------------------------------------------------ */ - /** Constructor. - * - */ - public SocketConnector() - { - _connections=new HashSet<EndPoint>(); - } + /* ------------------------------------------------------------ */ + /** Constructor. + * + */ + public SocketConnector() + { + _connections=new HashSet<EndPoint>(); + } - /* ------------------------------------------------------------ */ - public Object getConnection() - { - return _serverSocket; - } + /* ------------------------------------------------------------ */ + public Object getConnection() + { + return _serverSocket; + } - /* ------------------------------------------------------------ */ - public void open() throws IOException - { - // Create a new server socket and set to non blocking mode - if (_serverSocket==null || _serverSocket.isClosed()) - _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize()); - _serverSocket.setReuseAddress(getReuseAddress()); - _localPort=_serverSocket.getLocalPort(); - if (_localPort<=0) - throw new IllegalStateException("port not allocated for "+this); + /* ------------------------------------------------------------ */ + public void open() throws IOException + { + // Create a new server socket and set to non blocking mode + if (_serverSocket==null || _serverSocket.isClosed()) + _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize()); + _serverSocket.setReuseAddress(getReuseAddress()); + _localPort=_serverSocket.getLocalPort(); + if (_localPort<=0) + throw new IllegalStateException("port not allocated for "+this); - } + } - /* ------------------------------------------------------------ */ - protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException - { - ServerSocket ss= host==null? - new ServerSocket(port,backlog): - new ServerSocket(port,backlog,InetAddress.getByName(host)); + /* ------------------------------------------------------------ */ + protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException + { + ServerSocket ss= host==null? + new ServerSocket(port,backlog): + new ServerSocket(port,backlog,InetAddress.getByName(host)); - return ss; - } + return ss; + } - /* ------------------------------------------------------------ */ - public void close() throws IOException - { - if (_serverSocket!=null) - _serverSocket.close(); - _serverSocket=null; - _localPort=-2; - } + /* ------------------------------------------------------------ */ + public void close() throws IOException + { + if (_serverSocket!=null) + _serverSocket.close(); + _serverSocket=null; + _localPort=-2; + } - /* ------------------------------------------------------------ */ - @Override - public void accept(int acceptorID) - throws IOException, InterruptedException - { - Socket socket = _serverSocket.accept(); - configure(socket); + /* ------------------------------------------------------------ */ + @Override + public void accept(int acceptorID) + throws IOException, InterruptedException + { + Socket socket = _serverSocket.accept(); + configure(socket); - ConnectorEndPoint connection=new ConnectorEndPoint(socket); - connection.dispatch(); - } + ConnectorEndPoint connection=new ConnectorEndPoint(socket); + connection.dispatch(); + } - /* ------------------------------------------------------------------------------- */ - /** - * Allows subclass to override Conection if required. - */ - protected Connection newConnection(EndPoint endpoint) - { - return new BlockingHttpConnection(this, endpoint, getServer()); - } + /* ------------------------------------------------------------------------------- */ + /** + * Allows subclass to override Conection if required. + */ + protected Connection newConnection(EndPoint endpoint) + { + return new BlockingHttpConnection(this, endpoint, getServer()); + } - /* ------------------------------------------------------------------------------- */ - @Override - public void customize(EndPoint endpoint, Request request) - throws IOException - { - ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; - int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; - connection.setMaxIdleTime(lrmit); + /* ------------------------------------------------------------------------------- */ + @Override + public void customize(EndPoint endpoint, Request request) + throws IOException + { + ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; + int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; + connection.setMaxIdleTime(lrmit); - super.customize(endpoint, request); - } + super.customize(endpoint, request); + } - /* ------------------------------------------------------------------------------- */ - public int getLocalPort() - { - return _localPort; - } + /* ------------------------------------------------------------------------------- */ + public int getLocalPort() + { + return _localPort; + } - /* ------------------------------------------------------------------------------- */ - @Override - protected void doStart() throws Exception - { - _connections.clear(); - super.doStart(); - } + /* ------------------------------------------------------------------------------- */ + @Override + protected void doStart() throws Exception + { + _connections.clear(); + super.doStart(); + } - /* ------------------------------------------------------------------------------- */ - @Override - protected void doStop() throws Exception - { - super.doStop(); - Set<EndPoint> set = new HashSet<EndPoint>(); - synchronized(_connections) - { - set.addAll(_connections); - } - for (EndPoint endPoint : set) - { - ConnectorEndPoint connection = (ConnectorEndPoint)endPoint; - connection.close(); - } - } + /* ------------------------------------------------------------------------------- */ + @Override + protected void doStop() throws Exception + { + super.doStop(); + Set<EndPoint> set = new HashSet<EndPoint>(); + synchronized(_connections) + { + set.addAll(_connections); + } + for (EndPoint endPoint : set) + { + ConnectorEndPoint connection = (ConnectorEndPoint)endPoint; + connection.close(); + } + } - @Override - public void dump(Appendable out, String indent) throws IOException - { - super.dump(out, indent); - Set<EndPoint> connections = new HashSet<EndPoint>(); - synchronized (_connections) - { - connections.addAll(_connections); - } - AggregateLifeCycle.dump(out, indent, connections); - } + @Override + public void dump(Appendable out, String indent) throws IOException + { + super.dump(out, indent); + Set<EndPoint> connections = new HashSet<EndPoint>(); + synchronized (_connections) + { + connections.addAll(_connections); + } + AggregateLifeCycle.dump(out, indent, connections); + } - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint - { - volatile Connection _connection; - protected final Socket _socket; + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint + { + volatile Connection _connection; + protected final Socket _socket; - public ConnectorEndPoint(Socket socket) throws IOException - { - super(socket,_maxIdleTime); - _connection = newConnection(this); - _socket=socket; - } + public ConnectorEndPoint(Socket socket) throws IOException + { + super(socket,_maxIdleTime); + _connection = newConnection(this); + _socket=socket; + } - public Connection getConnection() - { - return _connection; - } + public Connection getConnection() + { + return _connection; + } - public void setConnection(Connection connection) - { - if (_connection!=connection && _connection!=null) - connectionUpgraded(_connection,connection); - _connection=connection; - } + public void setConnection(Connection connection) + { + if (_connection!=connection && _connection!=null) + connectionUpgraded(_connection,connection); + _connection=connection; + } - public void dispatch() throws IOException - { - if (getThreadPool()==null || !getThreadPool().dispatch(this)) - { - LOG.warn("dispatch failed for {}",_connection); - close(); - } - } + public void dispatch() throws IOException + { + ThreadPoolExecutor tpe = getThreadPool(); + if( tpe != null ) { + try { + tpe.execute(this); + return; + } catch(RejectedExecutionException e) {} + } + LOG.warn("dispatch failed for {}",_connection); + close(); + } - @Override - public int fill(Buffer buffer) throws IOException - { - int l = super.fill(buffer); - if (l<0) - { - if (!isInputShutdown()) - shutdownInput(); - if (isOutputShutdown()) - close(); - } - return l; - } + @Override + public int fill(Buffer buffer) throws IOException + { + int l = super.fill(buffer); + if (l<0) + { + if (!isInputShutdown()) + shutdownInput(); + if (isOutputShutdown()) + close(); + } + return l; + } - @Override - public void close() throws IOException - { - if (_connection instanceof AbstractHttpConnection) - ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel(); - super.close(); - } - - public void run() - { - try - { - connectionOpened(_connection); - synchronized(_connections) - { - _connections.add(this); - } + @Override + public void close() throws IOException + { + if (_connection instanceof AbstractHttpConnection) + ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel(); + super.close(); + } - while (isStarted() && !isClosed()) - { - if (_connection.isIdle()) - { - if (isLowResources()) - setMaxIdleTime(getLowResourcesMaxIdleTime()); - } + public void run() + { + try + { + connectionOpened(_connection); + synchronized(_connections) + { + _connections.add(this); + } + + while (isStarted() && !isClosed()) + { + if (_connection.isIdle()) + { + if (isLowResources()) + setMaxIdleTime(getLowResourcesMaxIdleTime()); + } - _connection=_connection.handle(); - } - } - catch (EofException e) - { - LOG.debug("EOF", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (SocketException e) - { - LOG.debug("EOF", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (HttpException e) - { - LOG.debug("BAD", e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch(Exception e) - { - LOG.warn("handle failed?",e); - try{close();} - catch(IOException e2){LOG.trace("",e2);} - } - finally - { - connectionClosed(_connection); - synchronized(_connections) - { - _connections.remove(this); - } + _connection=_connection.handle(); + } + } + catch (EofException e) + { + LOG.debug("EOF", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (SocketException e) + { + LOG.debug("EOF", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (HttpException e) + { + LOG.debug("BAD", e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch(Exception e) + { + LOG.warn("handle failed?",e); + try{close();} + catch(IOException e2){LOG.trace("",e2);} + } + finally + { + connectionClosed(_connection); + synchronized(_connections) + { + _connections.remove(this); + } - // wait for client to close, but if not, close ourselves. - try - { - if (!_socket.isClosed()) - { - long timestamp=System.currentTimeMillis(); - int max_idle=getMaxIdleTime(); + // wait for client to close, but if not, close ourselves. + try + { + if (!_socket.isClosed()) + { + long timestamp=System.currentTimeMillis(); + int max_idle=getMaxIdleTime(); - _socket.setSoTimeout(getMaxIdleTime()); - int c=0; - do - { - c = _socket.getInputStream().read(); - } - while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); - if (!_socket.isClosed()) - _socket.close(); - } - } - catch(IOException e) - { - LOG.trace("",e); - } - } - } - } + _socket.setSoTimeout(getMaxIdleTime()); + int c=0; + do + { + c = _socket.getInputStream().read(); + } + while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); + if (!_socket.isClosed()) + _socket.close(); + } + } + catch(IOException e) + { + LOG.trace("",e); + } + } + } + } }
--- a/src/org/eclipse/jetty/server/handler/HandlerCollection.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/handler/HandlerCollection.java Sun Oct 02 20:38:06 2016 -0600 @@ -193,7 +193,7 @@ for (int i=0;i<_handlers.length;i++) { final int h=i; - getServer().getThreadPool().dispatch( + getServer().getThreadPool().execute( new Runnable() { public void run()
--- a/src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java Sun Oct 02 20:38:06 2016 -0600 @@ -27,6 +27,7 @@ import java.nio.channels.SocketChannel; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.io.Buffer; @@ -83,7 +84,7 @@ protected void doStart() throws Exception { super.doStart(); - getThreadPool().dispatch(new Runnable() + getThreadPool().execute(new Runnable() { public void run() @@ -224,8 +225,9 @@ /* ------------------------------------------------------------ */ void dispatch() throws IOException { - if (!getThreadPool().dispatch(this)) - { + try { + getThreadPool().execute(this); + } catch(RejectedExecutionException e) { LOG.warn("dispatch failed for {}",_connection); super.close(); } @@ -278,7 +280,7 @@ _idleTimestamp=System.currentTimeMillis(); if (_connection.isIdle()) { - if (getServer().getThreadPool().isLowOnThreads()) + if (getServer().isLowOnThreads()) { int lrmit = getLowResourcesMaxIdleTime(); if (lrmit>=0 && _timeout!= lrmit)
--- a/src/org/eclipse/jetty/server/nio/InheritedChannelConnector.java Sun Oct 02 16:17:38 2016 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,75 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.server.nio; - -import java.io.IOException; -import java.nio.channels.Channel; -import java.nio.channels.ServerSocketChannel; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of the SelectChannelConnector which first tries to - * inherit from a channel provided by the system. If there is no inherited - * channel available, or if the inherited channel provided not usable, then - * it will fall back upon normal ServerSocketChannel creation. - * <p> - * Note that System.inheritedChannel() is only available from Java 1.5 onwards. - * Trying to use this class under Java 1.4 will be the same as using a normal - * SelectChannelConnector. - * <p> - * Use it with xinetd/inetd, to launch an instance of Jetty on demand. The port - * used to access pages on the Jetty instance is the same as the port used to - * launch Jetty. - * - * @author athena - */ -public class InheritedChannelConnector extends SelectChannelConnector -{ - private static final Logger LOG = LoggerFactory.getLogger(InheritedChannelConnector.class); - - /* ------------------------------------------------------------ */ - @Override - public void open() throws IOException - { - synchronized(this) - { - try - { - Channel channel = System.inheritedChannel(); - if ( channel instanceof ServerSocketChannel ) - _acceptChannel = (ServerSocketChannel)channel; - else - LOG.warn("Unable to use System.inheritedChannel() [" +channel+ "]. Trying a new ServerSocketChannel at " + getHost() + ":" + getPort()); - - if ( _acceptChannel != null ) - _acceptChannel.configureBlocking(true); - } - catch(NoSuchMethodError e) - { - LOG.warn("Need at least Java 5 to use socket inherited from xinetd/inetd."); - } - - if (_acceptChannel == null) - super.open(); - } - } - -}
--- a/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java Sun Oct 02 20:38:06 2016 -0600 @@ -36,7 +36,6 @@ import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.eclipse.jetty.server.AsyncHttpConnection; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.util.thread.ThreadPool; /* ------------------------------------------------------------------------------- */ /** @@ -283,12 +282,9 @@ private final class ConnectorSelectorManager extends SelectorManager { @Override - public boolean dispatch(Runnable task) + public void execute(Runnable task) { - ThreadPool pool=getThreadPool(); - if (pool==null) - pool=getServer().getThreadPool(); - return pool.dispatch(task); + getThreadPool().execute(task); } @Override
--- a/src/org/eclipse/jetty/util/BlockingArrayQueue.java Sun Oct 02 16:17:38 2016 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,704 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util; - -import java.util.AbstractList; -import java.util.Collection; -import java.util.NoSuchElementException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - - -/* ------------------------------------------------------------ */ -/** Queue backed by a circular array. - * - * This queue is uses a variant of the two lock queue algorithm to - * provide an efficient queue or list backed by a growable circular - * array. This queue also has a partial implementation of - * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and - * {@link #poll(long, TimeUnit)} methods. - * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is - * able to grow and provides a blocking put call. - * <p> - * The queue has both a capacity (the size of the array currently allocated) - * and a limit (the maximum size that may be allocated), which defaults to - * {@link Integer#MAX_VALUE}. - * - * @param <E> The element type - */ -public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E> -{ - public final int DEFAULT_CAPACITY=128; - public final int DEFAULT_GROWTH=64; - private final int _limit; - private final AtomicInteger _size=new AtomicInteger(); - private final int _growCapacity; - - private volatile int _capacity; - private Object[] _elements; - - private final ReentrantLock _headLock = new ReentrantLock(); - private final Condition _notEmpty = _headLock.newCondition(); - private int _head; - - // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing - // TODO verify this has benefits - private long _space0; - private long _space1; - private long _space2; - private long _space3; - private long _space4; - private long _space5; - private long _space6; - private long _space7; - - private final ReentrantLock _tailLock = new ReentrantLock(); - private int _tail; - - - /* ------------------------------------------------------------ */ - /** Create a growing partially blocking Queue - * - */ - public BlockingArrayQueue() - { - _elements=new Object[DEFAULT_CAPACITY]; - _growCapacity=DEFAULT_GROWTH; - _capacity=_elements.length; - _limit=Integer.MAX_VALUE; - } - - /* ------------------------------------------------------------ */ - /** Create a fixed size partially blocking Queue - * @param limit The initial capacity and the limit. - */ - public BlockingArrayQueue(int limit) - { - _elements=new Object[limit]; - _capacity=_elements.length; - _growCapacity=-1; - _limit=limit; - } - - /* ------------------------------------------------------------ */ - /** Create a growing partially blocking Queue. - * @param capacity Initial capacity - * @param growBy Incremental capacity. - */ - public BlockingArrayQueue(int capacity,int growBy) - { - _elements=new Object[capacity]; - _capacity=_elements.length; - _growCapacity=growBy; - _limit=Integer.MAX_VALUE; - } - - /* ------------------------------------------------------------ */ - /** Create a growing limited partially blocking Queue. - * @param capacity Initial capacity - * @param growBy Incremental capacity. - * @param limit maximum capacity. - */ - public BlockingArrayQueue(int capacity,int growBy,int limit) - { - if (capacity>limit) - throw new IllegalArgumentException(); - - _elements=new Object[capacity]; - _capacity=_elements.length; - _growCapacity=growBy; - _limit=limit; - } - - /* ------------------------------------------------------------ */ - public int getCapacity() - { - return _capacity; - } - - /* ------------------------------------------------------------ */ - public int getLimit() - { - return _limit; - } - - /* ------------------------------------------------------------ */ - @Override - public boolean add(E e) - { - return offer(e); - } - - /* ------------------------------------------------------------ */ - public E element() - { - E e = peek(); - if (e==null) - throw new NoSuchElementException(); - return e; - } - - /* ------------------------------------------------------------ */ - @SuppressWarnings("unchecked") - public E peek() - { - if (_size.get() == 0) - return null; - - E e = null; - _headLock.lock(); // Size cannot shrink - try - { - if (_size.get() > 0) - e = (E)_elements[_head]; - } - finally - { - _headLock.unlock(); - } - - return e; - } - - /* ------------------------------------------------------------ */ - public boolean offer(E e) - { - if (e == null) - throw new NullPointerException(); - - boolean not_empty=false; - _tailLock.lock(); // size cannot grow... only shrink - try - { - if (_size.get() >= _limit) - return false; - - // should we expand array? - if (_size.get()==_capacity) - { - _headLock.lock(); // Need to grow array - try - { - if (!grow()) - return false; - } - finally - { - _headLock.unlock(); - } - } - - // add the element - _elements[_tail]=e; - _tail=(_tail+1)%_capacity; - - not_empty=0==_size.getAndIncrement(); - - } - finally - { - _tailLock.unlock(); - } - - if (not_empty) - { - _headLock.lock(); - try - { - _notEmpty.signal(); - } - finally - { - _headLock.unlock(); - } - } - - return true; - } - - - /* ------------------------------------------------------------ */ - @SuppressWarnings("unchecked") - public E poll() - { - if (_size.get() == 0) - return null; - - E e = null; - _headLock.lock(); // Size cannot shrink - try - { - if (_size.get() > 0) - { - final int head=_head; - e = (E)_elements[head]; - _elements[head]=null; - _head=(head+1)%_capacity; - - if (_size.decrementAndGet()>0) - _notEmpty.signal(); - } - } - finally - { - _headLock.unlock(); - } - - return e; - } - - /* ------------------------------------------------------------ */ - /** - * Retrieves and removes the head of this queue, waiting - * if no elements are present on this queue. - * @return the head of this queue - * @throws InterruptedException if interrupted while waiting. - */ - @SuppressWarnings("unchecked") - public E take() throws InterruptedException - { - E e = null; - _headLock.lockInterruptibly(); // Size cannot shrink - try - { - try - { - while (_size.get() == 0) - { - _notEmpty.await(); - } - } - catch (InterruptedException ie) - { - _notEmpty.signal(); - throw ie; - } - - final int head=_head; - e = (E)_elements[head]; - _elements[head]=null; - _head=(head+1)%_capacity; - - if (_size.decrementAndGet()>0) - _notEmpty.signal(); - } - finally - { - _headLock.unlock(); - } - - return e; - } - - /* ------------------------------------------------------------ */ - /** - * Retrieves and removes the head of this queue, waiting - * if necessary up to the specified wait time if no elements are - * present on this queue. - * @param time how long to wait before giving up, in units of - * <tt>unit</tt> - * @param unit a <tt>TimeUnit</tt> determining how to interpret the - * <tt>timeout</tt> parameter - * @return the head of this queue, or <tt>null</tt> if the - * specified waiting time elapses before an element is present. - * @throws InterruptedException if interrupted while waiting. - */ - @SuppressWarnings("unchecked") - public E poll(long time, TimeUnit unit) throws InterruptedException - { - - E e = null; - - long nanos = unit.toNanos(time); - - _headLock.lockInterruptibly(); // Size cannot shrink - try - { - try - { - while (_size.get() == 0) - { - if (nanos<=0) - return null; - nanos = _notEmpty.awaitNanos(nanos); - } - } - catch (InterruptedException ie) - { - _notEmpty.signal(); - throw ie; - } - - e = (E)_elements[_head]; - _elements[_head]=null; - _head=(_head+1)%_capacity; - - if (_size.decrementAndGet()>0) - _notEmpty.signal(); - } - finally - { - _headLock.unlock(); - } - - return e; - } - - /* ------------------------------------------------------------ */ - public E remove() - { - E e=poll(); - if (e==null) - throw new NoSuchElementException(); - return e; - } - - /* ------------------------------------------------------------ */ - @Override - public void clear() - { - _tailLock.lock(); - try - { - _headLock.lock(); - try - { - _head=0; - _tail=0; - _size.set(0); - } - finally - { - _headLock.unlock(); - } - } - finally - { - _tailLock.unlock(); - } - } - - /* ------------------------------------------------------------ */ - @Override - public boolean isEmpty() - { - return _size.get()==0; - } - - /* ------------------------------------------------------------ */ - @Override - public int size() - { - return _size.get(); - } - - /* ------------------------------------------------------------ */ - @SuppressWarnings("unchecked") - @Override - public E get(int index) - { - _tailLock.lock(); - try - { - _headLock.lock(); - try - { - if (index<0 || index>=_size.get()) - throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); - int i = _head+index; - if (i>=_capacity) - i-=_capacity; - return (E)_elements[i]; - } - finally - { - _headLock.unlock(); - } - } - finally - { - _tailLock.unlock(); - } - } - - /* ------------------------------------------------------------ */ - @Override - public E remove(int index) - { - _tailLock.lock(); - try - { - _headLock.lock(); - try - { - - if (index<0 || index>=_size.get()) - throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); - - int i = _head+index; - if (i>=_capacity) - i-=_capacity; - @SuppressWarnings("unchecked") - E old=(E)_elements[i]; - - if (i<_tail) - { - System.arraycopy(_elements,i+1,_elements,i,_tail-i); - _tail--; - _size.decrementAndGet(); - } - else - { - System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1); - if (_tail>0) - { - _elements[_capacity]=_elements[0]; - System.arraycopy(_elements,1,_elements,0,_tail-1); - _tail--; - } - else - _tail=_capacity-1; - - _size.decrementAndGet(); - } - - return old; - } - finally - { - _headLock.unlock(); - } - } - finally - { - _tailLock.unlock(); - } - } - - /* ------------------------------------------------------------ */ - @Override - public E set(int index, E e) - { - if (e == null) - throw new NullPointerException(); - - _tailLock.lock(); - try - { - _headLock.lock(); - try - { - - if (index<0 || index>=_size.get()) - throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); - - int i = _head+index; - if (i>=_capacity) - i-=_capacity; - @SuppressWarnings("unchecked") - E old=(E)_elements[i]; - _elements[i]=e; - return old; - } - finally - { - _headLock.unlock(); - } - } - finally - { - _tailLock.unlock(); - } - } - - /* ------------------------------------------------------------ */ - @Override - public void add(int index, E e) - { - if (e == null) - throw new NullPointerException(); - - _tailLock.lock(); - try - { - _headLock.lock(); - try - { - - if (index<0 || index>_size.get()) - throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); - - if (index==_size.get()) - { - add(e); - } - else - { - if (_tail==_head) - if (!grow()) - throw new IllegalStateException("full"); - - int i = _head+index; - if (i>=_capacity) - i-=_capacity; - - _size.incrementAndGet(); - _tail=(_tail+1)%_capacity; - - - if (i<_tail) - { - System.arraycopy(_elements,i,_elements,i+1,_tail-i); - _elements[i]=e; - } - else - { - if (_tail>0) - { - System.arraycopy(_elements,0,_elements,1,_tail); - _elements[0]=_elements[_capacity-1]; - } - - System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1); - _elements[i]=e; - } - } - } - finally - { - _headLock.unlock(); - } - } - finally - { - _tailLock.unlock(); - } - } - - /* ------------------------------------------------------------ */ - private boolean grow() - { - if (_growCapacity<=0) - return false; - - _tailLock.lock(); - try - { - _headLock.lock(); - try - { - final int head=_head; - final int tail=_tail; - final int new_tail; - - Object[] elements=new Object[_capacity+_growCapacity]; - - if (head<tail) - { - new_tail=tail-head; - System.arraycopy(_elements,head,elements,0,new_tail); - } - else if (head>tail || _size.get()>0) - { - new_tail=_capacity+tail-head; - int cut=_capacity-head; - System.arraycopy(_elements,head,elements,0,cut); - System.arraycopy(_elements,0,elements,cut,tail); - } - else - { - new_tail=0; - } - - _elements=elements; - _capacity=_elements.length; - _head=0; - _tail=new_tail; - return true; - } - finally - { - _headLock.unlock(); - } - } - finally - { - _tailLock.unlock(); - } - - } - - /* ------------------------------------------------------------ */ - public int drainTo(Collection<? super E> c) - { - throw new UnsupportedOperationException(); - } - - /* ------------------------------------------------------------ */ - public int drainTo(Collection<? super E> c, int maxElements) - { - throw new UnsupportedOperationException(); - } - - /* ------------------------------------------------------------ */ - public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException - { - throw new UnsupportedOperationException(); - } - - /* ------------------------------------------------------------ */ - public void put(E o) throws InterruptedException - { - if (!add(o)) - throw new IllegalStateException("full"); - } - - /* ------------------------------------------------------------ */ - public int remainingCapacity() - { - _tailLock.lock(); - try - { - _headLock.lock(); - try - { - return getCapacity()-size(); - } - finally - { - _headLock.unlock(); - } - } - finally - { - _tailLock.unlock(); - } - } - - - /* ------------------------------------------------------------ */ - long sumOfSpace() - { - // this method exists to stop clever optimisers removing the spacers - return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++; - } -}
--- a/src/org/eclipse/jetty/util/thread/ExecutorThreadPool.java Sun Oct 02 16:17:38 2016 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,184 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util.thread; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.util.component.AbstractLifeCycle; -import org.eclipse.jetty.util.component.LifeCycle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* ------------------------------------------------------------ */ -/** - * Jetty ThreadPool using java 5 ThreadPoolExecutor - * This class wraps a {@link ExecutorService} as a {@link ThreadPool} and - * {@link LifeCycle} interfaces so that it may be used by the Jetty <code>org.eclipse.jetty.server.Server</code> - */ -public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle -{ - private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadPool.class); - private final ExecutorService _executor; - - /* ------------------------------------------------------------ */ - public ExecutorThreadPool(ExecutorService executor) - { - _executor = executor; - } - - /* ------------------------------------------------------------ */ - /** - * Wraps an {@link ThreadPoolExecutor}. - * Max pool size is 256, pool thread timeout after 60 seconds and - * an unbounded {@link LinkedBlockingQueue} is used for the job queue; - */ - public ExecutorThreadPool() - { - // Using an unbounded queue makes the maxThreads parameter useless - // Refer to ThreadPoolExecutor javadocs for details - this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>())); - } - - /* ------------------------------------------------------------ */ - /** - * Wraps an {@link ThreadPoolExecutor}. - * Max pool size is 256, pool thread timeout after 60 seconds, and core pool size is 32 when queueSize >= 0. - * @param queueSize can be -1 for using an unbounded {@link LinkedBlockingQueue}, 0 for using a - * {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size. - */ - public ExecutorThreadPool(int queueSize) - { - this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) : - queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) : - new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize))); - } - - /* ------------------------------------------------------------ */ - /** - * Wraps an {@link ThreadPoolExecutor} using - * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue; - * @param corePoolSize must be equal to maximumPoolSize - * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle, in milliseconds - */ - public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime) - { - this(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS); - } - - /* ------------------------------------------------------------ */ - /** - * Wraps an {@link ThreadPoolExecutor} using - * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue. - * @param corePoolSize must be equal to maximumPoolSize - * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle - * @param unit the unit for the keepAliveTime - */ - public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) - { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>()); - } - - /* ------------------------------------------------------------ */ - - /** - * Wraps an {@link ThreadPoolExecutor} - * @param corePoolSize the number of threads to keep in the pool, even if they are idle - * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle - * @param unit the unit for the keepAliveTime - * @param workQueue the queue to use for holding tasks before they are executed - */ - public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) - { - this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue)); - } - - /* ------------------------------------------------------------ */ - public boolean dispatch(Runnable job) - { - try - { - _executor.execute(job); - return true; - } - catch(RejectedExecutionException e) - { - LOG.warn("",e); - return false; - } - } - - /* ------------------------------------------------------------ */ - public int getIdleThreads() - { - if (_executor instanceof ThreadPoolExecutor) - { - final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; - return tpe.getPoolSize() - tpe.getActiveCount(); - } - return -1; - } - - /* ------------------------------------------------------------ */ - public int getThreads() - { - if (_executor instanceof ThreadPoolExecutor) - { - final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; - return tpe.getPoolSize(); - } - return -1; - } - - /* ------------------------------------------------------------ */ - public boolean isLowOnThreads() - { - if (_executor instanceof ThreadPoolExecutor) - { - final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; - // getActiveCount() locks the thread pool, so execute it last - return tpe.getPoolSize() == tpe.getMaximumPoolSize() && - tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount(); - } - return false; - } - - /* ------------------------------------------------------------ */ - public void join() throws InterruptedException - { - _executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - } - - /* ------------------------------------------------------------ */ - @Override - protected void doStop() throws Exception - { - super.doStop(); - _executor.shutdownNow(); - } -}
--- a/src/org/eclipse/jetty/util/thread/QueuedThreadPool.java Sun Oct 02 16:17:38 2016 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,677 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - - -package org.eclipse.jetty.util.thread; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.eclipse.jetty.util.BlockingArrayQueue; -import org.eclipse.jetty.util.component.AbstractLifeCycle; -import org.eclipse.jetty.util.component.AggregateLifeCycle; -import org.eclipse.jetty.util.component.Dumpable; -import org.eclipse.jetty.util.component.LifeCycle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor, Dumpable -{ - private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class); - - private final AtomicInteger _threadsStarted = new AtomicInteger(); - private final AtomicInteger _threadsIdle = new AtomicInteger(); - private final AtomicLong _lastShrink = new AtomicLong(); - private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); - private final Object _joinLock = new Object(); - private BlockingQueue<Runnable> _jobs; - private String _name; - private int _maxIdleTimeMs=60000; - private int _maxThreads=254; - private int _minThreads=8; - private int _maxQueued=-1; - private int _priority=Thread.NORM_PRIORITY; - private boolean _daemon=false; - private int _maxStopTime=100; - private boolean _detailedDump=false; - - /* ------------------------------------------------------------------- */ - /** Construct - */ - public QueuedThreadPool() - { - _name="qtp"+super.hashCode(); - } - - /* ------------------------------------------------------------------- */ - /** Construct - */ - public QueuedThreadPool(int maxThreads) - { - this(); - setMaxThreads(maxThreads); - } - - /* ------------------------------------------------------------------- */ - /** Construct - */ - public QueuedThreadPool(BlockingQueue<Runnable> jobQ) - { - this(); - _jobs=jobQ; - _jobs.clear(); - } - - - /* ------------------------------------------------------------ */ - @Override - protected void doStart() throws Exception - { - super.doStart(); - _threadsStarted.set(0); - - if (_jobs==null) - { - _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) - :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); - } - - int threads=_threadsStarted.get(); - while (isRunning() && threads<_minThreads) - { - startThread(threads); - threads=_threadsStarted.get(); - } - } - - /* ------------------------------------------------------------ */ - @Override - protected void doStop() throws Exception - { - super.doStop(); - long start=System.currentTimeMillis(); - - // let jobs complete naturally for a while - while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) - Thread.sleep(1); - - // kill queued jobs and flush out idle jobs - _jobs.clear(); - Runnable noop = new Runnable(){public void run(){}}; - for (int i=_threadsIdle.get();i-->0;) - _jobs.offer(noop); - Thread.yield(); - - // interrupt remaining threads - if (_threadsStarted.get()>0) - for (Thread thread : _threads) - thread.interrupt(); - - // wait for remaining threads to die - while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) - { - Thread.sleep(1); - } - Thread.yield(); - int size=_threads.size(); - if (size>0) - { - LOG.warn(size+" threads could not be stopped"); - - if (size==1 || LOG.isDebugEnabled()) - { - for (Thread unstopped : _threads) - { - LOG.info("Couldn't stop "+unstopped); - for (StackTraceElement element : unstopped.getStackTrace()) - { - LOG.info(" at "+element); - } - } - } - } - - synchronized (_joinLock) - { - _joinLock.notifyAll(); - } - } - - /* ------------------------------------------------------------ */ - /** - * Delegated to the named or anonymous Pool. - */ - public void setDaemon(boolean daemon) - { - _daemon=daemon; - } - - /* ------------------------------------------------------------ */ - /** Set the maximum thread idle time. - * Threads that are idle for longer than this period may be - * stopped. - * Delegated to the named or anonymous Pool. - * @see #getMaxIdleTimeMs - * @param maxIdleTimeMs Max idle time in ms. - */ - public void setMaxIdleTimeMs(int maxIdleTimeMs) - { - _maxIdleTimeMs=maxIdleTimeMs; - } - - /* ------------------------------------------------------------ */ - /** - * @param stopTimeMs maximum total time that stop() will wait for threads to die. - */ - public void setMaxStopTimeMs(int stopTimeMs) - { - _maxStopTime = stopTimeMs; - } - - /* ------------------------------------------------------------ */ - /** Set the maximum number of threads. - * Delegated to the named or anonymous Pool. - * @see #getMaxThreads - * @param maxThreads maximum number of threads. - */ - public void setMaxThreads(int maxThreads) - { - _maxThreads=maxThreads; - if (_minThreads>_maxThreads) - _minThreads=_maxThreads; - } - - /* ------------------------------------------------------------ */ - /** Set the minimum number of threads. - * Delegated to the named or anonymous Pool. - * @see #getMinThreads - * @param minThreads minimum number of threads - */ - public void setMinThreads(int minThreads) - { - _minThreads=minThreads; - - if (_minThreads>_maxThreads) - _maxThreads=_minThreads; - - int threads=_threadsStarted.get(); - while (isStarted() && threads<_minThreads) - { - startThread(threads); - threads=_threadsStarted.get(); - } - } - - /* ------------------------------------------------------------ */ - /** - * @param name Name of the BoundedThreadPool to use when naming Threads. - */ - public void setName(String name) - { - if (isRunning()) - throw new IllegalStateException("started"); - _name= name; - } - - /* ------------------------------------------------------------ */ - /** Set the priority of the pool threads. - * @param priority the new thread priority. - */ - public void setThreadsPriority(int priority) - { - _priority=priority; - } - - /* ------------------------------------------------------------ */ - /** - * @return maximum queue size - */ - public int getMaxQueued() - { - return _maxQueued; - } - - /* ------------------------------------------------------------ */ - /** - * @param max job queue size - */ - public void setMaxQueued(int max) - { - if (isRunning()) - throw new IllegalStateException("started"); - _maxQueued=max; - } - - /* ------------------------------------------------------------ */ - /** Get the maximum thread idle time. - * Delegated to the named or anonymous Pool. - * @see #setMaxIdleTimeMs - * @return Max idle time in ms. - */ - public int getMaxIdleTimeMs() - { - return _maxIdleTimeMs; - } - - /* ------------------------------------------------------------ */ - /** - * @return maximum total time that stop() will wait for threads to die. - */ - public int getMaxStopTimeMs() - { - return _maxStopTime; - } - - /* ------------------------------------------------------------ */ - /** Set the maximum number of threads. - * Delegated to the named or anonymous Pool. - * @see #setMaxThreads - * @return maximum number of threads. - */ - public int getMaxThreads() - { - return _maxThreads; - } - - /* ------------------------------------------------------------ */ - /** Get the minimum number of threads. - * Delegated to the named or anonymous Pool. - * @see #setMinThreads - * @return minimum number of threads. - */ - public int getMinThreads() - { - return _minThreads; - } - - /* ------------------------------------------------------------ */ - /** - * @return The name of the BoundedThreadPool. - */ - public String getName() - { - return _name; - } - - /* ------------------------------------------------------------ */ - /** Get the priority of the pool threads. - * @return the priority of the pool threads. - */ - public int getThreadsPriority() - { - return _priority; - } - - /* ------------------------------------------------------------ */ - /** - * Delegated to the named or anonymous Pool. - */ - public boolean isDaemon() - { - return _daemon; - } - - /* ------------------------------------------------------------ */ - public boolean isDetailedDump() - { - return _detailedDump; - } - - /* ------------------------------------------------------------ */ - public void setDetailedDump(boolean detailedDump) - { - _detailedDump = detailedDump; - } - - /* ------------------------------------------------------------ */ - public boolean dispatch(Runnable job) - { - if (isRunning()) - { - final int jobQ = _jobs.size(); - final int idle = getIdleThreads(); - if(_jobs.offer(job)) - { - // If we had no idle threads or the jobQ is greater than the idle threads - if (idle==0 || jobQ>idle) - { - int threads=_threadsStarted.get(); - if (threads<_maxThreads) - startThread(threads); - } - return true; - } - } - LOG.debug("Dispatched {} to stopped {}",job,this); - return false; - } - - /* ------------------------------------------------------------ */ - public void execute(Runnable job) - { - if (!dispatch(job)) - throw new RejectedExecutionException(); - } - - /* ------------------------------------------------------------ */ - /** - * Blocks until the thread pool is {@link LifeCycle#stop stopped}. - */ - public void join() throws InterruptedException - { - synchronized (_joinLock) - { - while (isRunning()) - _joinLock.wait(); - } - - while (isStopping()) - Thread.sleep(1); - } - - /* ------------------------------------------------------------ */ - /** - * @return The total number of threads currently in the pool - */ - public int getThreads() - { - return _threadsStarted.get(); - } - - /* ------------------------------------------------------------ */ - /** - * @return The number of idle threads in the pool - */ - public int getIdleThreads() - { - return _threadsIdle.get(); - } - - /* ------------------------------------------------------------ */ - /** - * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs - */ - public boolean isLowOnThreads() - { - return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); - } - - /* ------------------------------------------------------------ */ - private boolean startThread(int threads) - { - final int next=threads+1; - if (!_threadsStarted.compareAndSet(threads,next)) - return false; - - boolean started=false; - try - { - Thread thread=newThread(_runnable); - thread.setDaemon(_daemon); - thread.setPriority(_priority); - thread.setName(_name+"-"+thread.getId()); - _threads.add(thread); - - thread.start(); - started=true; - } - finally - { - if (!started) - _threadsStarted.decrementAndGet(); - } - return started; - } - - /* ------------------------------------------------------------ */ - protected Thread newThread(Runnable runnable) - { - return new Thread(runnable); - } - - - /* ------------------------------------------------------------ */ - public String dump() - { - return AggregateLifeCycle.dump(this); - } - - /* ------------------------------------------------------------ */ - public void dump(Appendable out, String indent) throws IOException - { - List<Object> dump = new ArrayList<Object>(getMaxThreads()); - for (final Thread thread: _threads) - { - final StackTraceElement[] trace=thread.getStackTrace(); - boolean inIdleJobPoll=false; - // trace can be null on early java 6 jvms - if (trace != null) - { - for (StackTraceElement t : trace) - { - if ("idleJobPoll".equals(t.getMethodName())) - { - inIdleJobPoll = true; - break; - } - } - } - final boolean idle=inIdleJobPoll; - - if (_detailedDump) - { - dump.add(new Dumpable() - { - public void dump(Appendable out, String indent) throws IOException - { - out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); - if (!idle) - AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); - } - - public String dump() - { - return null; - } - }); - } - else - { - dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); - } - } - - AggregateLifeCycle.dumpObject(out,this); - AggregateLifeCycle.dump(out,indent,dump); - - } - - - /* ------------------------------------------------------------ */ - @Override - public String toString() - { - return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; - } - - /* ------------------------------------------------------------ */ - private Runnable idleJobPoll() throws InterruptedException - { - return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); - } - - /* ------------------------------------------------------------ */ - private Runnable _runnable = new Runnable() - { - public void run() - { - boolean shrink=false; - try - { - Runnable job=_jobs.poll(); - while (isRunning()) - { - // Job loop - while (job!=null && isRunning()) - { - runJob(job); - job=_jobs.poll(); - } - - // Idle loop - try - { - _threadsIdle.incrementAndGet(); - - while (isRunning() && job==null) - { - if (_maxIdleTimeMs<=0) - job=_jobs.take(); - else - { - // maybe we should shrink? - final int size=_threadsStarted.get(); - if (size>_minThreads) - { - long last=_lastShrink.get(); - long now=System.currentTimeMillis(); - if (last==0 || (now-last)>_maxIdleTimeMs) - { - shrink=_lastShrink.compareAndSet(last,now) && - _threadsStarted.compareAndSet(size,size-1); - if (shrink) - return; - } - } - job=idleJobPoll(); - } - } - } - finally - { - _threadsIdle.decrementAndGet(); - } - } - } - catch(InterruptedException e) - { - LOG.trace("",e); - } - catch(Exception e) - { - LOG.warn("",e); - } - finally - { - if (!shrink) - _threadsStarted.decrementAndGet(); - _threads.remove(Thread.currentThread()); - } - } - }; - - /* ------------------------------------------------------------ */ - /** - * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> - * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> - * - * @param job the job to run - */ - protected void runJob(Runnable job) - { - job.run(); - } - - /* ------------------------------------------------------------ */ - /** - * @return the job queue - */ - protected BlockingQueue<Runnable> getQueue() - { - return _jobs; - } - - /* ------------------------------------------------------------ */ - /** - * @param id The thread ID to stop. - * @return true if the thread was found and stopped. - * @deprecated Use {@link #interruptThread(long)} in preference - */ - @Deprecated - public boolean stopThread(long id) - { - for (Thread thread: _threads) - { - if (thread.getId()==id) - { - thread.stop(); - return true; - } - } - return false; - } - - /* ------------------------------------------------------------ */ - /** - * @param id The thread ID to interrupt. - * @return true if the thread was found and interrupted. - */ - public boolean interruptThread(long id) - { - for (Thread thread: _threads) - { - if (thread.getId()==id) - { - thread.interrupt(); - return true; - } - } - return false; - } - - /* ------------------------------------------------------------ */ - /** - * @param id The thread ID to interrupt. - * @return true if the thread was found and interrupted. - */ - public String dumpThread(long id) - { - for (Thread thread: _threads) - { - if (thread.getId()==id) - { - StringBuilder buf = new StringBuilder(); - buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); - for (StackTraceElement element : thread.getStackTrace()) - buf.append(" at ").append(element.toString()).append('\n'); - return buf.toString(); - } - } - return null; - } -}
--- a/src/org/eclipse/jetty/util/thread/ThreadPool.java Sun Oct 02 16:17:38 2016 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,57 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util.thread; - -import org.eclipse.jetty.util.component.LifeCycle; - -/* ------------------------------------------------------------ */ -/** ThreadPool. - * - * - */ -public interface ThreadPool -{ - /* ------------------------------------------------------------ */ - public abstract boolean dispatch(Runnable job); - - /* ------------------------------------------------------------ */ - /** - * Blocks until the thread pool is {@link LifeCycle#stop stopped}. - */ - public void join() throws InterruptedException; - - /* ------------------------------------------------------------ */ - /** - * @return The total number of threads currently in the pool - */ - public int getThreads(); - - /* ------------------------------------------------------------ */ - /** - * @return The number of idle threads in the pool - */ - public int getIdleThreads(); - - /* ------------------------------------------------------------ */ - /** - * @return True if the pool is low on threads - */ - public boolean isLowOnThreads(); - -}