Mercurial Hosting > luan
diff src/org/eclipse/jetty/io/nio/SelectorManager.java @ 865:6b210bb66c63
remove ThreadPool
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 20:38:06 -0600 |
parents | 8e9db0bbf4f9 |
children | 54308d65265a |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/io/nio/SelectorManager.java Sun Oct 02 16:17:38 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java Sun Oct 02 20:38:06 2016 -0600 @@ -58,977 +58,974 @@ */ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { - public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); + public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); - private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); - private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); - private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); - private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); + private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); + private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); + private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); + private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); - private int _maxIdleTime; - private int _lowResourcesMaxIdleTime; - private long _lowResourcesConnections; - private SelectSet[] _selectSet; - private int _selectSets=1; - private volatile int _set=0; - private boolean _deferringInterestedOps0=true; - private int _selectorPriorityDelta=0; + private int _maxIdleTime; + private int _lowResourcesMaxIdleTime; + private long _lowResourcesConnections; + private SelectSet[] _selectSet; + private int _selectSets=1; + private volatile int _set=0; + private boolean _deferringInterestedOps0=true; + private int _selectorPriorityDelta=0; - /* ------------------------------------------------------------ */ - /** - * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. - * @see #setLowResourcesMaxIdleTime(long) - */ - public void setMaxIdleTime(long maxIdleTime) - { - _maxIdleTime=(int)maxIdleTime; - } + /* ------------------------------------------------------------ */ + /** + * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. + * @see #setLowResourcesMaxIdleTime(long) + */ + public void setMaxIdleTime(long maxIdleTime) + { + _maxIdleTime=(int)maxIdleTime; + } - /* ------------------------------------------------------------ */ - /** - * @param selectSets number of select sets to create - */ - public void setSelectSets(int selectSets) - { - long lrc = _lowResourcesConnections * _selectSets; - _selectSets=selectSets; - _lowResourcesConnections=lrc/_selectSets; - } + /* ------------------------------------------------------------ */ + /** + * @param selectSets number of select sets to create + */ + public void setSelectSets(int selectSets) + { + long lrc = _lowResourcesConnections * _selectSets; + _selectSets=selectSets; + _lowResourcesConnections=lrc/_selectSets; + } - /* ------------------------------------------------------------ */ - /** - * @return the max idle time - */ - public long getMaxIdleTime() - { - return _maxIdleTime; - } + /* ------------------------------------------------------------ */ + /** + * @return the max idle time + */ + public long getMaxIdleTime() + { + return _maxIdleTime; + } - /* ------------------------------------------------------------ */ - /** - * @return the number of select sets in use - */ - public int getSelectSets() - { - return _selectSets; - } + /* ------------------------------------------------------------ */ + /** + * @return the number of select sets in use + */ + public int getSelectSets() + { + return _selectSets; + } - /* ------------------------------------------------------------ */ - /** - * @param i - * @return The select set - */ - public SelectSet getSelectSet(int i) - { - return _selectSet[i]; - } + /* ------------------------------------------------------------ */ + /** + * @param i + * @return The select set + */ + public SelectSet getSelectSet(int i) + { + return _selectSet[i]; + } - /* ------------------------------------------------------------ */ - /** Register a channel - * @param channel - * @param att Attached Object - */ - public void register(SocketChannel channel, Object att) - { - // The ++ increment here is not atomic, but it does not matter. - // so long as the value changes sometimes, then connections will - // be distributed over the available sets. + /* ------------------------------------------------------------ */ + /** Register a channel + * @param channel + * @param att Attached Object + */ + public void register(SocketChannel channel, Object att) + { + // The ++ increment here is not atomic, but it does not matter. + // so long as the value changes sometimes, then connections will + // be distributed over the available sets. - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet[] sets=_selectSet; - if (sets!=null) - { - SelectSet set=sets[s]; - set.addChange(channel,att); - set.wakeup(); - } - } + int s=_set++; + if (s<0) + s=-s; + s=s%_selectSets; + SelectSet[] sets=_selectSet; + if (sets!=null) + { + SelectSet set=sets[s]; + set.addChange(channel,att); + set.wakeup(); + } + } - /* ------------------------------------------------------------ */ - /** Register a channel - * @param channel - */ - public void register(SocketChannel channel) - { - // The ++ increment here is not atomic, but it does not matter. - // so long as the value changes sometimes, then connections will - // be distributed over the available sets. + /* ------------------------------------------------------------ */ + /** Register a channel + * @param channel + */ + public void register(SocketChannel channel) + { + // The ++ increment here is not atomic, but it does not matter. + // so long as the value changes sometimes, then connections will + // be distributed over the available sets. - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet[] sets=_selectSet; - if (sets!=null) - { - SelectSet set=sets[s]; - set.addChange(channel); - set.wakeup(); - } - } + int s=_set++; + if (s<0) + s=-s; + s=s%_selectSets; + SelectSet[] sets=_selectSet; + if (sets!=null) + { + SelectSet set=sets[s]; + set.addChange(channel); + set.wakeup(); + } + } - /* ------------------------------------------------------------ */ - /** Register a {@link ServerSocketChannel} - * @param acceptChannel - */ - public void register(ServerSocketChannel acceptChannel) - { - int s=_set++; - if (s<0) - s=-s; - s=s%_selectSets; - SelectSet set=_selectSet[s]; - set.addChange(acceptChannel); - set.wakeup(); - } + /* ------------------------------------------------------------ */ + /** Register a {@link ServerSocketChannel} + * @param acceptChannel + */ + public void register(ServerSocketChannel acceptChannel) + { + int s=_set++; + if (s<0) + s=-s; + s=s%_selectSets; + SelectSet set=_selectSet[s]; + set.addChange(acceptChannel); + set.wakeup(); + } - /* ------------------------------------------------------------ */ - /** - * @return delta The value to add to the selector thread priority. - */ - public int getSelectorPriorityDelta() - { - return _selectorPriorityDelta; - } + /* ------------------------------------------------------------ */ + /** + * @return delta The value to add to the selector thread priority. + */ + public int getSelectorPriorityDelta() + { + return _selectorPriorityDelta; + } - /* ------------------------------------------------------------ */ - /** Set the selector thread priorty delta. - * @param delta The value to add to the selector thread priority. - */ - public void setSelectorPriorityDelta(int delta) - { - _selectorPriorityDelta=delta; - } + /* ------------------------------------------------------------ */ + /** Set the selector thread priorty delta. + * @param delta The value to add to the selector thread priority. + */ + public void setSelectorPriorityDelta(int delta) + { + _selectorPriorityDelta=delta; + } - /* ------------------------------------------------------------ */ - /** - * @return the lowResourcesConnections - */ - public long getLowResourcesConnections() - { - return _lowResourcesConnections*_selectSets; - } + /* ------------------------------------------------------------ */ + /** + * @return the lowResourcesConnections + */ + public long getLowResourcesConnections() + { + return _lowResourcesConnections*_selectSets; + } - /* ------------------------------------------------------------ */ - /** - * Set the number of connections, which if exceeded places this manager in low resources state. - * This is not an exact measure as the connection count is averaged over the select sets. - * @param lowResourcesConnections the number of connections - * @see #setLowResourcesMaxIdleTime(long) - */ - public void setLowResourcesConnections(long lowResourcesConnections) - { - _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; - } + /* ------------------------------------------------------------ */ + /** + * Set the number of connections, which if exceeded places this manager in low resources state. + * This is not an exact measure as the connection count is averaged over the select sets. + * @param lowResourcesConnections the number of connections + * @see #setLowResourcesMaxIdleTime(long) + */ + public void setLowResourcesConnections(long lowResourcesConnections) + { + _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; + } - /* ------------------------------------------------------------ */ - /** - * @return the lowResourcesMaxIdleTime - */ - public long getLowResourcesMaxIdleTime() - { - return _lowResourcesMaxIdleTime; - } + /* ------------------------------------------------------------ */ + /** + * @return the lowResourcesMaxIdleTime + */ + public long getLowResourcesMaxIdleTime() + { + return _lowResourcesMaxIdleTime; + } - /* ------------------------------------------------------------ */ - /** - * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} - * @see #setMaxIdleTime(long) - */ - public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) - { - _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; - } + /* ------------------------------------------------------------ */ + /** + * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} + * @see #setMaxIdleTime(long) + */ + public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) + { + _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; + } - /* ------------------------------------------------------------------------------- */ - public abstract boolean dispatch(Runnable task); + /* ------------------------------------------------------------------------------- */ + public abstract void execute(Runnable task); - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see org.eclipse.component.AbstractLifeCycle#doStart() - */ - @Override - protected void doStart() throws Exception - { - _selectSet = new SelectSet[_selectSets]; - for (int i=0;i<_selectSet.length;i++) - _selectSet[i]= new SelectSet(i); + /* ------------------------------------------------------------ */ + /* (non-Javadoc) + * @see org.eclipse.component.AbstractLifeCycle#doStart() + */ + @Override + protected void doStart() throws Exception + { + _selectSet = new SelectSet[_selectSets]; + for (int i=0;i<_selectSet.length;i++) + _selectSet[i]= new SelectSet(i); - super.doStart(); + super.doStart(); - // start a thread to Select - for (int i=0;i<getSelectSets();i++) - { - final int id=i; - boolean selecting=dispatch(new Runnable() - { - public void run() - { - String name=Thread.currentThread().getName(); - int priority=Thread.currentThread().getPriority(); - try - { - SelectSet[] sets=_selectSet; - if (sets==null) - return; - SelectSet set=sets[id]; + // start a thread to Select + for (int i=0;i<getSelectSets();i++) + { + final int id=i; + execute(new Runnable() + { + public void run() + { + String name=Thread.currentThread().getName(); + int priority=Thread.currentThread().getPriority(); + try + { + SelectSet[] sets=_selectSet; + if (sets==null) + return; + SelectSet set=sets[id]; - Thread.currentThread().setName(name+" Selector"+id); - if (getSelectorPriorityDelta()!=0) - Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta()); - LOG.debug("Starting {} on {}",Thread.currentThread(),this); - while (isRunning()) - { - try - { - set.doSelect(); - } - catch(IOException e) - { - LOG.trace("",e); - } - catch(Exception e) - { - LOG.warn("",e); - } - } - } - finally - { - LOG.debug("Stopped {} on {}",Thread.currentThread(),this); - Thread.currentThread().setName(name); - if (getSelectorPriorityDelta()!=0) - Thread.currentThread().setPriority(priority); - } - } + Thread.currentThread().setName(name+" Selector"+id); + if (getSelectorPriorityDelta()!=0) + Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta()); + LOG.debug("Starting {} on {}",Thread.currentThread(),this); + while (isRunning()) + { + try + { + set.doSelect(); + } + catch(IOException e) + { + LOG.trace("",e); + } + catch(Exception e) + { + LOG.warn("",e); + } + } + } + finally + { + LOG.debug("Stopped {} on {}",Thread.currentThread(),this); + Thread.currentThread().setName(name); + if (getSelectorPriorityDelta()!=0) + Thread.currentThread().setPriority(priority); + } + } - }); - - if (!selecting) - throw new IllegalStateException("!Selecting"); - } - } + }); + } + } - /* ------------------------------------------------------------------------------- */ - @Override - protected void doStop() throws Exception - { - SelectSet[] sets= _selectSet; - _selectSet=null; - if (sets!=null) - { - for (SelectSet set : sets) - { - if (set!=null) - set.stop(); - } - } - super.doStop(); - } + /* ------------------------------------------------------------------------------- */ + @Override + protected void doStop() throws Exception + { + SelectSet[] sets= _selectSet; + _selectSet=null; + if (sets!=null) + { + for (SelectSet set : sets) + { + if (set!=null) + set.stop(); + } + } + super.doStop(); + } - /* ------------------------------------------------------------ */ - /** - * @param endpoint - */ - protected abstract void endPointClosed(SelectChannelEndPoint endpoint); + /* ------------------------------------------------------------ */ + /** + * @param endpoint + */ + protected abstract void endPointClosed(SelectChannelEndPoint endpoint); - /* ------------------------------------------------------------ */ - /** - * @param endpoint - */ - protected abstract void endPointOpened(SelectChannelEndPoint endpoint); + /* ------------------------------------------------------------ */ + /** + * @param endpoint + */ + protected abstract void endPointOpened(SelectChannelEndPoint endpoint); - /* ------------------------------------------------------------ */ - protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); + /* ------------------------------------------------------------ */ + protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); - /* ------------------------------------------------------------------------------- */ - public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); + /* ------------------------------------------------------------------------------- */ + public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); - /* ------------------------------------------------------------ */ - /** - * Create a new end point - * @param channel - * @param selectSet - * @param sKey the selection key - * @return the new endpoint {@link SelectChannelEndPoint} - * @throws IOException - */ - protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; + /* ------------------------------------------------------------ */ + /** + * Create a new end point + * @param channel + * @param selectSet + * @param sKey the selection key + * @return the new endpoint {@link SelectChannelEndPoint} + * @throws IOException + */ + protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; - /* ------------------------------------------------------------------------------- */ - protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) - { - LOG.warn(ex+","+channel+","+attachment); - LOG.debug("",ex); - } + /* ------------------------------------------------------------------------------- */ + protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) + { + LOG.warn(ex+","+channel+","+attachment); + LOG.debug("",ex); + } - /* ------------------------------------------------------------ */ - public String dump() - { - return AggregateLifeCycle.dump(this); - } + /* ------------------------------------------------------------ */ + public String dump() + { + return AggregateLifeCycle.dump(this); + } - /* ------------------------------------------------------------ */ - public void dump(Appendable out, String indent) throws IOException - { - AggregateLifeCycle.dumpObject(out,this); - AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); - } + /* ------------------------------------------------------------ */ + public void dump(Appendable out, String indent) throws IOException + { + AggregateLifeCycle.dumpObject(out,this); + AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); + } - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - public class SelectSet implements Dumpable - { - private final int _setID; - private final Timeout _timeout; + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + public class SelectSet implements Dumpable + { + private final int _setID; + private final Timeout _timeout; - private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); + private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); - private volatile Selector _selector; + private volatile Selector _selector; - private volatile Thread _selecting; - private int _busySelects; - private long _monitorNext; - private boolean _pausing; - private boolean _paused; - private volatile long _idleTick; - private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); + private volatile Thread _selecting; + private int _busySelects; + private long _monitorNext; + private boolean _pausing; + private boolean _paused; + private volatile long _idleTick; + private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); - /* ------------------------------------------------------------ */ - SelectSet(int acceptorID) throws Exception - { - _setID=acceptorID; + /* ------------------------------------------------------------ */ + SelectSet(int acceptorID) throws Exception + { + _setID=acceptorID; - _idleTick = System.currentTimeMillis(); - _timeout = new Timeout(this); - _timeout.setDuration(0L); + _idleTick = System.currentTimeMillis(); + _timeout = new Timeout(this); + _timeout.setDuration(0L); - // create a selector; - _selector = Selector.open(); - _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; - } + // create a selector; + _selector = Selector.open(); + _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; + } - /* ------------------------------------------------------------ */ - public void addChange(Object change) - { - _changes.add(change); - } + /* ------------------------------------------------------------ */ + public void addChange(Object change) + { + _changes.add(change); + } - /* ------------------------------------------------------------ */ - public void addChange(SelectableChannel channel, Object att) - { - if (att==null) - addChange(channel); - else if (att instanceof EndPoint) - addChange(att); - else - addChange(new ChannelAndAttachment(channel,att)); - } + /* ------------------------------------------------------------ */ + public void addChange(SelectableChannel channel, Object att) + { + if (att==null) + addChange(channel); + else if (att instanceof EndPoint) + addChange(att); + else + addChange(new ChannelAndAttachment(channel,att)); + } - /* ------------------------------------------------------------ */ - /** - * Select and dispatch tasks found from changes and the selector. - * - * @throws IOException - */ - public void doSelect() throws IOException - { - try - { - _selecting=Thread.currentThread(); - final Selector selector=_selector; - // Stopped concurrently ? - if (selector == null) - return; + /* ------------------------------------------------------------ */ + /** + * Select and dispatch tasks found from changes and the selector. + * + * @throws IOException + */ + public void doSelect() throws IOException + { + try + { + _selecting=Thread.currentThread(); + final Selector selector=_selector; + // Stopped concurrently ? + if (selector == null) + return; - // Make any key changes required - Object change; - int changes=_changes.size(); - while (changes-->0 && (change=_changes.poll())!=null) - { - Channel ch=null; - SelectionKey key=null; + // Make any key changes required + Object change; + int changes=_changes.size(); + while (changes-->0 && (change=_changes.poll())!=null) + { + Channel ch=null; + SelectionKey key=null; - try - { - if (change instanceof EndPoint) - { - // Update the operations for a key. - SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; - ch=endpoint.getChannel(); - endpoint.doUpdateKey(); - } - else if (change instanceof ChannelAndAttachment) - { - // finish accepting/connecting this connection - final ChannelAndAttachment asc = (ChannelAndAttachment)change; - final SelectableChannel channel=asc._channel; - ch=channel; - final Object att = asc._attachment; + try + { + if (change instanceof EndPoint) + { + // Update the operations for a key. + SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; + ch=endpoint.getChannel(); + endpoint.doUpdateKey(); + } + else if (change instanceof ChannelAndAttachment) + { + // finish accepting/connecting this connection + final ChannelAndAttachment asc = (ChannelAndAttachment)change; + final SelectableChannel channel=asc._channel; + ch=channel; + final Object att = asc._attachment; - if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) - { - key = channel.register(selector,SelectionKey.OP_READ,att); - SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else if (channel.isOpen()) - { - key = channel.register(selector,SelectionKey.OP_CONNECT,att); - } - } - else if (change instanceof SocketChannel) - { - // Newly registered channel - final SocketChannel channel=(SocketChannel)change; - ch=channel; - key = channel.register(selector,SelectionKey.OP_READ,null); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else if (change instanceof ChangeTask) - { - ((Runnable)change).run(); - } - else if (change instanceof Runnable) - { - dispatch((Runnable)change); - } - else - throw new IllegalArgumentException(change.toString()); - } - catch (CancelledKeyException e) - { - LOG.trace("",e); - } - catch (Throwable e) - { - if (isRunning()) - LOG.warn("",e); - else - LOG.debug("",e); + if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) + { + key = channel.register(selector,SelectionKey.OP_READ,att); + SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); + key.attach(endpoint); + endpoint.schedule(); + } + else if (channel.isOpen()) + { + key = channel.register(selector,SelectionKey.OP_CONNECT,att); + } + } + else if (change instanceof SocketChannel) + { + // Newly registered channel + final SocketChannel channel=(SocketChannel)change; + ch=channel; + key = channel.register(selector,SelectionKey.OP_READ,null); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); + key.attach(endpoint); + endpoint.schedule(); + } + else if (change instanceof ChangeTask) + { + ((Runnable)change).run(); + } + else if (change instanceof Runnable) + { + execute((Runnable)change); + } + else + throw new IllegalArgumentException(change.toString()); + } + catch (CancelledKeyException e) + { + LOG.trace("",e); + } + catch (Throwable e) + { + if (isRunning()) + LOG.warn("",e); + else + LOG.debug("",e); - try - { - if (ch!=null) - ch.close(); - } - catch(IOException e2) - { - LOG.debug("",e2); - } - } - } + try + { + if (ch!=null) + ch.close(); + } + catch(IOException e2) + { + LOG.debug("",e2); + } + } + } - // Do and instant select to see if any connections can be handled. - int selected=selector.selectNow(); + // Do and instant select to see if any connections can be handled. + int selected=selector.selectNow(); - long now=System.currentTimeMillis(); + long now=System.currentTimeMillis(); - // if no immediate things to do - if (selected==0 && selector.selectedKeys().isEmpty()) - { - // If we are in pausing mode - if (_pausing) - { - try - { - Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop - } - catch(InterruptedException e) - { - LOG.trace("",e); - } - now=System.currentTimeMillis(); - } + // if no immediate things to do + if (selected==0 && selector.selectedKeys().isEmpty()) + { + // If we are in pausing mode + if (_pausing) + { + try + { + Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop + } + catch(InterruptedException e) + { + LOG.trace("",e); + } + now=System.currentTimeMillis(); + } - // workout how long to wait in select - _timeout.setNow(now); - long to_next_timeout=_timeout.getTimeToNext(); + // workout how long to wait in select + _timeout.setNow(now); + long to_next_timeout=_timeout.getTimeToNext(); - long wait = _changes.size()==0?__IDLE_TICK:0L; - if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) - wait = to_next_timeout; + long wait = _changes.size()==0?__IDLE_TICK:0L; + if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) + wait = to_next_timeout; - // If we should wait with a select - if (wait>0) - { - long before=now; - selector.select(wait); - now = System.currentTimeMillis(); - _timeout.setNow(now); + // If we should wait with a select + if (wait>0) + { + long before=now; + selector.select(wait); + now = System.currentTimeMillis(); + _timeout.setNow(now); - // If we are monitoring for busy selector - // and this select did not wait more than 1ms - if (__MONITOR_PERIOD>0 && now-before <=1) - { - // count this as a busy select and if there have been too many this monitor cycle - if (++_busySelects>__MAX_SELECTS) - { - // Start injecting pauses - _pausing=true; + // If we are monitoring for busy selector + // and this select did not wait more than 1ms + if (__MONITOR_PERIOD>0 && now-before <=1) + { + // count this as a busy select and if there have been too many this monitor cycle + if (++_busySelects>__MAX_SELECTS) + { + // Start injecting pauses + _pausing=true; - // if this is the first pause - if (!_paused) - { - // Log and dump some status - _paused=true; - LOG.warn("Selector {} is too busy, pausing!",this); - } - } - } - } - } + // if this is the first pause + if (!_paused) + { + // Log and dump some status + _paused=true; + LOG.warn("Selector {} is too busy, pausing!",this); + } + } + } + } + } - // have we been destroyed while sleeping - if (_selector==null || !selector.isOpen()) - return; + // have we been destroyed while sleeping + if (_selector==null || !selector.isOpen()) + return; - // Look for things to do - for (SelectionKey key: selector.selectedKeys()) - { - SocketChannel channel=null; + // Look for things to do + for (SelectionKey key: selector.selectedKeys()) + { + SocketChannel channel=null; - try - { - if (!key.isValid()) - { - key.cancel(); - SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); - if (endpoint != null) - endpoint.doUpdateKey(); - continue; - } + try + { + if (!key.isValid()) + { + key.cancel(); + SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); + if (endpoint != null) + endpoint.doUpdateKey(); + continue; + } - Object att = key.attachment(); - if (att instanceof SelectChannelEndPoint) - { - if (key.isReadable()||key.isWritable()) - ((SelectChannelEndPoint)att).schedule(); - } - else if (key.isConnectable()) - { - // Complete a connection of a registered channel - channel = (SocketChannel)key.channel(); - boolean connected=false; - try - { - connected=channel.finishConnect(); - } - catch(Exception e) - { - connectionFailed(channel,e,att); - } - finally - { - if (connected) - { - key.interestOps(SelectionKey.OP_READ); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - endpoint.schedule(); - } - else - { - key.cancel(); - channel.close(); - } - } - } - else - { - // Wrap readable registered channel in an endpoint - channel = (SocketChannel)key.channel(); - SelectChannelEndPoint endpoint = createEndPoint(channel,key); - key.attach(endpoint); - if (key.isReadable()) - endpoint.schedule(); - } - key = null; - } - catch (CancelledKeyException e) - { - LOG.trace("",e); - } - catch (Exception e) - { - if (isRunning()) - LOG.warn("",e); - else - LOG.trace("",e); + Object att = key.attachment(); + if (att instanceof SelectChannelEndPoint) + { + if (key.isReadable()||key.isWritable()) + ((SelectChannelEndPoint)att).schedule(); + } + else if (key.isConnectable()) + { + // Complete a connection of a registered channel + channel = (SocketChannel)key.channel(); + boolean connected=false; + try + { + connected=channel.finishConnect(); + } + catch(Exception e) + { + connectionFailed(channel,e,att); + } + finally + { + if (connected) + { + key.interestOps(SelectionKey.OP_READ); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); + key.attach(endpoint); + endpoint.schedule(); + } + else + { + key.cancel(); + channel.close(); + } + } + } + else + { + // Wrap readable registered channel in an endpoint + channel = (SocketChannel)key.channel(); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); + key.attach(endpoint); + if (key.isReadable()) + endpoint.schedule(); + } + key = null; + } + catch (CancelledKeyException e) + { + LOG.trace("",e); + } + catch (Exception e) + { + if (isRunning()) + LOG.warn("",e); + else + LOG.trace("",e); - try - { - if (channel!=null) - channel.close(); - } - catch(IOException e2) - { - LOG.debug("",e2); - } + try + { + if (channel!=null) + channel.close(); + } + catch(IOException e2) + { + LOG.debug("",e2); + } - if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) - key.cancel(); - } - } + if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) + key.cancel(); + } + } - // Everything always handled - selector.selectedKeys().clear(); + // Everything always handled + selector.selectedKeys().clear(); - now=System.currentTimeMillis(); - _timeout.setNow(now); - Task task = _timeout.expired(); - while (task!=null) - { - if (task instanceof Runnable) - dispatch((Runnable)task); - task = _timeout.expired(); - } + now=System.currentTimeMillis(); + _timeout.setNow(now); + Task task = _timeout.expired(); + while (task!=null) + { + if (task instanceof Runnable) + execute((Runnable)task); + task = _timeout.expired(); + } - // Idle tick - if (now-_idleTick>__IDLE_TICK) - { - _idleTick=now; + // Idle tick + if (now-_idleTick>__IDLE_TICK) + { + _idleTick=now; - final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) - ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) - :now; + final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) + ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) + :now; - dispatch(new Runnable() - { - public void run() - { - for (SelectChannelEndPoint endp:_endPoints.keySet()) - { - endp.checkIdleTimestamp(idle_now); - } - } - public String toString() {return "Idle-"+super.toString();} - }); + execute(new Runnable() + { + public void run() + { + for (SelectChannelEndPoint endp:_endPoints.keySet()) + { + endp.checkIdleTimestamp(idle_now); + } + } + public String toString() {return "Idle-"+super.toString();} + }); - } + } - // Reset busy select monitor counts - if (__MONITOR_PERIOD>0 && now>_monitorNext) - { - _busySelects=0; - _pausing=false; - _monitorNext=now+__MONITOR_PERIOD; + // Reset busy select monitor counts + if (__MONITOR_PERIOD>0 && now>_monitorNext) + { + _busySelects=0; + _pausing=false; + _monitorNext=now+__MONITOR_PERIOD; - } - } - catch (ClosedSelectorException e) - { - if (isRunning()) - LOG.warn("",e); - else - LOG.trace("",e); - } - catch (CancelledKeyException e) - { - LOG.trace("",e); - } - finally - { - _selecting=null; - } - } + } + } + catch (ClosedSelectorException e) + { + if (isRunning()) + LOG.warn("",e); + else + LOG.trace("",e); + } + catch (CancelledKeyException e) + { + LOG.trace("",e); + } + finally + { + _selecting=null; + } + } - /* ------------------------------------------------------------ */ - private void renewSelector() - { - try - { - synchronized (this) - { - Selector selector=_selector; - if (selector==null) - return; - final Selector new_selector = Selector.open(); - for (SelectionKey k: selector.keys()) - { - if (!k.isValid() || k.interestOps()==0) - continue; + /* ------------------------------------------------------------ */ + private void renewSelector() + { + try + { + synchronized (this) + { + Selector selector=_selector; + if (selector==null) + return; + final Selector new_selector = Selector.open(); + for (SelectionKey k: selector.keys()) + { + if (!k.isValid() || k.interestOps()==0) + continue; - final SelectableChannel channel = k.channel(); - final Object attachment = k.attachment(); + final SelectableChannel channel = k.channel(); + final Object attachment = k.attachment(); - if (attachment==null) - addChange(channel); - else - addChange(channel,attachment); - } - _selector.close(); - _selector=new_selector; - } - } - catch(IOException e) - { - throw new RuntimeException("recreating selector",e); - } - } + if (attachment==null) + addChange(channel); + else + addChange(channel,attachment); + } + _selector.close(); + _selector=new_selector; + } + } + catch(IOException e) + { + throw new RuntimeException("recreating selector",e); + } + } - /* ------------------------------------------------------------ */ - public SelectorManager getManager() - { - return SelectorManager.this; - } + /* ------------------------------------------------------------ */ + public SelectorManager getManager() + { + return SelectorManager.this; + } - /* ------------------------------------------------------------ */ - public long getNow() - { - return _timeout.getNow(); - } + /* ------------------------------------------------------------ */ + public long getNow() + { + return _timeout.getNow(); + } - /* ------------------------------------------------------------ */ - /** - * @param task The task to timeout. If it implements Runnable, then - * expired will be called from a dispatched thread. - * - * @param timeoutMs - */ - public void scheduleTimeout(Timeout.Task task, long timeoutMs) - { - if (!(task instanceof Runnable)) - throw new IllegalArgumentException("!Runnable"); - _timeout.schedule(task, timeoutMs); - } + /* ------------------------------------------------------------ */ + /** + * @param task The task to timeout. If it implements Runnable, then + * expired will be called from a dispatched thread. + * + * @param timeoutMs + */ + public void scheduleTimeout(Timeout.Task task, long timeoutMs) + { + if (!(task instanceof Runnable)) + throw new IllegalArgumentException("!Runnable"); + _timeout.schedule(task, timeoutMs); + } - /* ------------------------------------------------------------ */ - public void cancelTimeout(Timeout.Task task) - { - task.cancel(); - } + /* ------------------------------------------------------------ */ + public void cancelTimeout(Timeout.Task task) + { + task.cancel(); + } - /* ------------------------------------------------------------ */ - public void wakeup() - { - try - { - Selector selector = _selector; - if (selector!=null) - selector.wakeup(); - } - catch(Exception e) - { - addChange(new ChangeTask() - { - public void run() - { - renewSelector(); - } - }); + /* ------------------------------------------------------------ */ + public void wakeup() + { + try + { + Selector selector = _selector; + if (selector!=null) + selector.wakeup(); + } + catch(Exception e) + { + addChange(new ChangeTask() + { + public void run() + { + renewSelector(); + } + }); - renewSelector(); - } - } + renewSelector(); + } + } - /* ------------------------------------------------------------ */ - private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException - { - SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); - LOG.debug("created {}",endp); - endPointOpened(endp); - _endPoints.put(endp,this); - return endp; - } + /* ------------------------------------------------------------ */ + private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException + { + SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); + LOG.debug("created {}",endp); + endPointOpened(endp); + _endPoints.put(endp,this); + return endp; + } - /* ------------------------------------------------------------ */ - public void destroyEndPoint(SelectChannelEndPoint endp) - { - LOG.debug("destroyEndPoint {}",endp); - _endPoints.remove(endp); - endPointClosed(endp); - } + /* ------------------------------------------------------------ */ + public void destroyEndPoint(SelectChannelEndPoint endp) + { + LOG.debug("destroyEndPoint {}",endp); + _endPoints.remove(endp); + endPointClosed(endp); + } - /* ------------------------------------------------------------ */ - Selector getSelector() - { - return _selector; - } + /* ------------------------------------------------------------ */ + Selector getSelector() + { + return _selector; + } - /* ------------------------------------------------------------ */ - void stop() throws Exception - { - // Spin for a while waiting for selector to complete - // to avoid unneccessary closed channel exceptions - try - { - for (int i=0;i<100 && _selecting!=null;i++) - { - wakeup(); - Thread.sleep(10); - } - } - catch(Exception e) - { - LOG.trace("",e); - } + /* ------------------------------------------------------------ */ + void stop() throws Exception + { + // Spin for a while waiting for selector to complete + // to avoid unneccessary closed channel exceptions + try + { + for (int i=0;i<100 && _selecting!=null;i++) + { + wakeup(); + Thread.sleep(10); + } + } + catch(Exception e) + { + LOG.trace("",e); + } - // close endpoints and selector - synchronized (this) - { - Selector selector=_selector; - for (SelectionKey key:selector.keys()) - { - if (key==null) - continue; - Object att=key.attachment(); - if (att instanceof EndPoint) - { - EndPoint endpoint = (EndPoint)att; - try - { - endpoint.close(); - } - catch(IOException e) - { - LOG.trace("",e); - } - } - } + // close endpoints and selector + synchronized (this) + { + Selector selector=_selector; + for (SelectionKey key:selector.keys()) + { + if (key==null) + continue; + Object att=key.attachment(); + if (att instanceof EndPoint) + { + EndPoint endpoint = (EndPoint)att; + try + { + endpoint.close(); + } + catch(IOException e) + { + LOG.trace("",e); + } + } + } - _timeout.cancelAll(); - try - { - selector=_selector; - if (selector != null) - selector.close(); - } - catch (IOException e) - { - LOG.trace("",e); - } - _selector=null; - } - } + _timeout.cancelAll(); + try + { + selector=_selector; + if (selector != null) + selector.close(); + } + catch (IOException e) + { + LOG.trace("",e); + } + _selector=null; + } + } - /* ------------------------------------------------------------ */ - public String dump() - { - return AggregateLifeCycle.dump(this); - } + /* ------------------------------------------------------------ */ + public String dump() + { + return AggregateLifeCycle.dump(this); + } - /* ------------------------------------------------------------ */ - public void dump(Appendable out, String indent) throws IOException - { - out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); + /* ------------------------------------------------------------ */ + public void dump(Appendable out, String indent) throws IOException + { + out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); - Thread selecting = _selecting; + Thread selecting = _selecting; - Object where = "not selecting"; - StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); - if (trace!=null) - { - for (StackTraceElement t:trace) - if (t.getClassName().startsWith("org.eclipse.jetty.")) - { - where=t; - break; - } - } + Object where = "not selecting"; + StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); + if (trace!=null) + { + for (StackTraceElement t:trace) + if (t.getClassName().startsWith("org.eclipse.jetty.")) + { + where=t; + break; + } + } - Selector selector=_selector; - if (selector!=null) - { - final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); - dump.add(where); + Selector selector=_selector; + if (selector!=null) + { + final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); + dump.add(where); - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); - addChange(new ChangeTask() - { - public void run() - { - dumpKeyState(dump); - latch.countDown(); - } - }); + addChange(new ChangeTask() + { + public void run() + { + dumpKeyState(dump); + latch.countDown(); + } + }); - try - { - latch.await(5,TimeUnit.SECONDS); - } - catch(InterruptedException e) - { - LOG.trace("",e); - } + try + { + latch.await(5,TimeUnit.SECONDS); + } + catch(InterruptedException e) + { + LOG.trace("",e); + } - AggregateLifeCycle.dump(out,indent,dump); - } - } + AggregateLifeCycle.dump(out,indent,dump); + } + } - /* ------------------------------------------------------------ */ - public void dumpKeyState(List<Object> dumpto) - { - Selector selector=_selector; - Set<SelectionKey> keys = selector.keys(); - dumpto.add(selector + " keys=" + keys.size()); - for (SelectionKey key: keys) - { - if (key.isValid()) - dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); - else - dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); - } - } + /* ------------------------------------------------------------ */ + public void dumpKeyState(List<Object> dumpto) + { + Selector selector=_selector; + Set<SelectionKey> keys = selector.keys(); + dumpto.add(selector + " keys=" + keys.size()); + for (SelectionKey key: keys) + { + if (key.isValid()) + dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); + else + dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); + } + } - /* ------------------------------------------------------------ */ - public String toString() - { - Selector selector=_selector; - return String.format("%s keys=%d selected=%d", - super.toString(), - selector != null && selector.isOpen() ? selector.keys().size() : -1, - selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); - } - } + /* ------------------------------------------------------------ */ + public String toString() + { + Selector selector=_selector; + return String.format("%s keys=%d selected=%d", + super.toString(), + selector != null && selector.isOpen() ? selector.keys().size() : -1, + selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); + } + } - /* ------------------------------------------------------------ */ - private static class ChannelAndAttachment - { - final SelectableChannel _channel; - final Object _attachment; + /* ------------------------------------------------------------ */ + private static class ChannelAndAttachment + { + final SelectableChannel _channel; + final Object _attachment; - public ChannelAndAttachment(SelectableChannel channel, Object attachment) - { - super(); - _channel = channel; - _attachment = attachment; - } - } + public ChannelAndAttachment(SelectableChannel channel, Object attachment) + { + super(); + _channel = channel; + _attachment = attachment; + } + } - /* ------------------------------------------------------------ */ - public boolean isDeferringInterestedOps0() - { - return _deferringInterestedOps0; - } + /* ------------------------------------------------------------ */ + public boolean isDeferringInterestedOps0() + { + return _deferringInterestedOps0; + } - /* ------------------------------------------------------------ */ - public void setDeferringInterestedOps0(boolean deferringInterestedOps0) - { - _deferringInterestedOps0 = deferringInterestedOps0; - } + /* ------------------------------------------------------------ */ + public void setDeferringInterestedOps0(boolean deferringInterestedOps0) + { + _deferringInterestedOps0 = deferringInterestedOps0; + } - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - private interface ChangeTask extends Runnable - {} + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private interface ChangeTask extends Runnable + {} }