diff src/org/eclipse/jetty/io/nio/SelectorManager.java @ 802:3428c60d7cfc

replace jetty jars with source
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 07 Sep 2016 21:15:48 -0600
parents
children 8e9db0bbf4f9
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java	Wed Sep 07 21:15:48 2016 -0600
@@ -0,0 +1,1034 @@
+//
+//  ========================================================================
+//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
+//  ------------------------------------------------------------------------
+//  All rights reserved. This program and the accompanying materials
+//  are made available under the terms of the Eclipse Public License v1.0
+//  and Apache License v2.0 which accompanies this distribution.
+//
+//      The Eclipse Public License is available at
+//      http://www.eclipse.org/legal/epl-v10.html
+//
+//      The Apache License v2.0 is available at
+//      http://www.opensource.org/licenses/apache2.0.php
+//
+//  You may elect to redistribute this code under either of these licenses.
+//  ========================================================================
+//
+
+package org.eclipse.jetty.io.nio;
+
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.io.AsyncEndPoint;
+import org.eclipse.jetty.io.ConnectedEndPoint;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.util.TypeUtil;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.AggregateLifeCycle;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Timeout;
+import org.eclipse.jetty.util.thread.Timeout.Task;
+
+
+/* ------------------------------------------------------------ */
+/**
+ * The Selector Manager manages and number of SelectSets to allow
+ * NIO scheduling to scale to large numbers of connections.
+ * <p>
+ */
+public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
+{
+    public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
+
+    private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
+    private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
+    private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
+    private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
+
+    private int _maxIdleTime;
+    private int _lowResourcesMaxIdleTime;
+    private long _lowResourcesConnections;
+    private SelectSet[] _selectSet;
+    private int _selectSets=1;
+    private volatile int _set=0;
+    private boolean _deferringInterestedOps0=true;
+    private int _selectorPriorityDelta=0;
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
+     * @see #setLowResourcesMaxIdleTime(long)
+     */
+    public void setMaxIdleTime(long maxIdleTime)
+    {
+        _maxIdleTime=(int)maxIdleTime;
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @param selectSets number of select sets to create
+     */
+    public void setSelectSets(int selectSets)
+    {
+        long lrc = _lowResourcesConnections * _selectSets;
+        _selectSets=selectSets;
+        _lowResourcesConnections=lrc/_selectSets;
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @return the max idle time
+     */
+    public long getMaxIdleTime()
+    {
+        return _maxIdleTime;
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @return the number of select sets in use
+     */
+    public int getSelectSets()
+    {
+        return _selectSets;
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @param i
+     * @return The select set
+     */
+    public SelectSet getSelectSet(int i)
+    {
+        return _selectSet[i];
+    }
+
+    /* ------------------------------------------------------------ */
+    /** Register a channel
+     * @param channel
+     * @param att Attached Object
+     */
+    public void register(SocketChannel channel, Object att)
+    {
+        // The ++ increment here is not atomic, but it does not matter.
+        // so long as the value changes sometimes, then connections will
+        // be distributed over the available sets.
+
+        int s=_set++;
+        if (s<0)
+            s=-s;
+        s=s%_selectSets;
+        SelectSet[] sets=_selectSet;
+        if (sets!=null)
+        {
+            SelectSet set=sets[s];
+            set.addChange(channel,att);
+            set.wakeup();
+        }
+    }
+
+
+    /* ------------------------------------------------------------ */
+    /** Register a channel
+     * @param channel
+     */
+    public void register(SocketChannel channel)
+    {
+        // The ++ increment here is not atomic, but it does not matter.
+        // so long as the value changes sometimes, then connections will
+        // be distributed over the available sets.
+
+        int s=_set++;
+        if (s<0)
+            s=-s;
+        s=s%_selectSets;
+        SelectSet[] sets=_selectSet;
+        if (sets!=null)
+        {
+            SelectSet set=sets[s];
+            set.addChange(channel);
+            set.wakeup();
+        }
+    }
+
+    /* ------------------------------------------------------------ */
+    /** Register a {@link ServerSocketChannel}
+     * @param acceptChannel
+     */
+    public void register(ServerSocketChannel acceptChannel)
+    {
+        int s=_set++;
+        if (s<0)
+            s=-s;
+        s=s%_selectSets;
+        SelectSet set=_selectSet[s];
+        set.addChange(acceptChannel);
+        set.wakeup();
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @return delta The value to add to the selector thread priority.
+     */
+    public int getSelectorPriorityDelta()
+    {
+        return _selectorPriorityDelta;
+    }
+
+    /* ------------------------------------------------------------ */
+    /** Set the selector thread priorty delta.
+     * @param delta The value to add to the selector thread priority.
+     */
+    public void setSelectorPriorityDelta(int delta)
+    {
+        _selectorPriorityDelta=delta;
+    }
+
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @return the lowResourcesConnections
+     */
+    public long getLowResourcesConnections()
+    {
+        return _lowResourcesConnections*_selectSets;
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * Set the number of connections, which if exceeded places this manager in low resources state.
+     * This is not an exact measure as the connection count is averaged over the select sets.
+     * @param lowResourcesConnections the number of connections
+     * @see #setLowResourcesMaxIdleTime(long)
+     */
+    public void setLowResourcesConnections(long lowResourcesConnections)
+    {
+        _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @return the lowResourcesMaxIdleTime
+     */
+    public long getLowResourcesMaxIdleTime()
+    {
+        return _lowResourcesMaxIdleTime;
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
+     * @see #setMaxIdleTime(long)
+     */
+    public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
+    {
+        _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
+    }
+
+
+    /* ------------------------------------------------------------------------------- */
+    public abstract boolean dispatch(Runnable task);
+
+    /* ------------------------------------------------------------ */
+    /* (non-Javadoc)
+     * @see org.eclipse.component.AbstractLifeCycle#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception
+    {
+        _selectSet = new SelectSet[_selectSets];
+        for (int i=0;i<_selectSet.length;i++)
+            _selectSet[i]= new SelectSet(i);
+
+        super.doStart();
+
+        // start a thread to Select
+        for (int i=0;i<getSelectSets();i++)
+        {
+            final int id=i;
+            boolean selecting=dispatch(new Runnable()
+            {
+                public void run()
+                {
+                    String name=Thread.currentThread().getName();
+                    int priority=Thread.currentThread().getPriority();
+                    try
+                    {
+                        SelectSet[] sets=_selectSet;
+                        if (sets==null)
+                            return;
+                        SelectSet set=sets[id];
+
+                        Thread.currentThread().setName(name+" Selector"+id);
+                        if (getSelectorPriorityDelta()!=0)
+                            Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
+                        LOG.debug("Starting {} on {}",Thread.currentThread(),this);
+                        while (isRunning())
+                        {
+                            try
+                            {
+                                set.doSelect();
+                            }
+                            catch(IOException e)
+                            {
+                                LOG.ignore(e);
+                            }
+                            catch(Exception e)
+                            {
+                                LOG.warn(e);
+                            }
+                        }
+                    }
+                    finally
+                    {
+                        LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
+                        Thread.currentThread().setName(name);
+                        if (getSelectorPriorityDelta()!=0)
+                            Thread.currentThread().setPriority(priority);
+                    }
+                }
+
+            });
+
+            if (!selecting)
+                throw new IllegalStateException("!Selecting");
+        }
+    }
+
+
+    /* ------------------------------------------------------------------------------- */
+    @Override
+    protected void doStop() throws Exception
+    {
+        SelectSet[] sets= _selectSet;
+        _selectSet=null;
+        if (sets!=null)
+        {
+            for (SelectSet set : sets)
+            {
+                if (set!=null)
+                    set.stop();
+            }
+        }
+        super.doStop();
+    }
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @param endpoint
+     */
+    protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
+
+    /* ------------------------------------------------------------ */
+    /**
+     * @param endpoint
+     */
+    protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
+
+    /* ------------------------------------------------------------ */
+    protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
+
+    /* ------------------------------------------------------------------------------- */
+    public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
+
+    /* ------------------------------------------------------------ */
+    /**
+     * Create a new end point
+     * @param channel
+     * @param selectSet
+     * @param sKey the selection key
+     * @return the new endpoint {@link SelectChannelEndPoint}
+     * @throws IOException
+     */
+    protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
+
+    /* ------------------------------------------------------------------------------- */
+    protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
+    {
+        LOG.warn(ex+","+channel+","+attachment);
+        LOG.debug(ex);
+    }
+
+    /* ------------------------------------------------------------ */
+    public String dump()
+    {
+        return AggregateLifeCycle.dump(this);
+    }
+
+    /* ------------------------------------------------------------ */
+    public void dump(Appendable out, String indent) throws IOException
+    {
+        AggregateLifeCycle.dumpObject(out,this);
+        AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
+    }
+
+
+    /* ------------------------------------------------------------------------------- */
+    /* ------------------------------------------------------------------------------- */
+    /* ------------------------------------------------------------------------------- */
+    public class SelectSet implements Dumpable
+    {
+        private final int _setID;
+        private final Timeout _timeout;
+
+        private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
+
+        private volatile Selector _selector;
+
+        private volatile Thread _selecting;
+        private int _busySelects;
+        private long _monitorNext;
+        private boolean _pausing;
+        private boolean _paused;
+        private volatile long _idleTick;
+        private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
+
+        /* ------------------------------------------------------------ */
+        SelectSet(int acceptorID) throws Exception
+        {
+            _setID=acceptorID;
+
+            _idleTick = System.currentTimeMillis();
+            _timeout = new Timeout(this);
+            _timeout.setDuration(0L);
+
+            // create a selector;
+            _selector = Selector.open();
+            _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
+        }
+
+        /* ------------------------------------------------------------ */
+        public void addChange(Object change)
+        {
+            _changes.add(change);
+        }
+
+        /* ------------------------------------------------------------ */
+        public void addChange(SelectableChannel channel, Object att)
+        {
+            if (att==null)
+                addChange(channel);
+            else if (att instanceof EndPoint)
+                addChange(att);
+            else
+                addChange(new ChannelAndAttachment(channel,att));
+        }
+
+        /* ------------------------------------------------------------ */
+        /**
+         * Select and dispatch tasks found from changes and the selector.
+         *
+         * @throws IOException
+         */
+        public void doSelect() throws IOException
+        {
+            try
+            {
+                _selecting=Thread.currentThread();
+                final Selector selector=_selector;
+                // Stopped concurrently ?
+                if (selector == null)
+                    return;
+
+                // Make any key changes required
+                Object change;
+                int changes=_changes.size();
+                while (changes-->0 && (change=_changes.poll())!=null)
+                {
+                    Channel ch=null;
+                    SelectionKey key=null;
+
+                    try
+                    {
+                        if (change instanceof EndPoint)
+                        {
+                            // Update the operations for a key.
+                            SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
+                            ch=endpoint.getChannel();
+                            endpoint.doUpdateKey();
+                        }
+                        else if (change instanceof ChannelAndAttachment)
+                        {
+                            // finish accepting/connecting this connection
+                            final ChannelAndAttachment asc = (ChannelAndAttachment)change;
+                            final SelectableChannel channel=asc._channel;
+                            ch=channel;
+                            final Object att = asc._attachment;
+
+                            if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
+                            {
+                                key = channel.register(selector,SelectionKey.OP_READ,att);
+                                SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
+                                key.attach(endpoint);
+                                endpoint.schedule();
+                            }
+                            else if (channel.isOpen())
+                            {
+                                key = channel.register(selector,SelectionKey.OP_CONNECT,att);
+                            }
+                        }
+                        else if (change instanceof SocketChannel)
+                        {
+                            // Newly registered channel
+                            final SocketChannel channel=(SocketChannel)change;
+                            ch=channel;
+                            key = channel.register(selector,SelectionKey.OP_READ,null);
+                            SelectChannelEndPoint endpoint = createEndPoint(channel,key);
+                            key.attach(endpoint);
+                            endpoint.schedule();
+                        }
+                        else if (change instanceof ChangeTask)
+                        {
+                            ((Runnable)change).run();
+                        }
+                        else if (change instanceof Runnable)
+                        {
+                            dispatch((Runnable)change);
+                        }
+                        else
+                            throw new IllegalArgumentException(change.toString());
+                    }
+                    catch (CancelledKeyException e)
+                    {
+                        LOG.ignore(e);
+                    }
+                    catch (Throwable e)
+                    {
+                        if (isRunning())
+                            LOG.warn(e);
+                        else
+                            LOG.debug(e);
+
+                        try
+                        {
+                            if (ch!=null)
+                                ch.close();
+                        }
+                        catch(IOException e2)
+                        {
+                            LOG.debug(e2);
+                        }
+                    }
+                }
+
+
+                // Do and instant select to see if any connections can be handled.
+                int selected=selector.selectNow();
+
+                long now=System.currentTimeMillis();
+
+                // if no immediate things to do
+                if (selected==0 && selector.selectedKeys().isEmpty())
+                {
+                    // If we are in pausing mode
+                    if (_pausing)
+                    {
+                        try
+                        {
+                            Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of  busy loop
+                        }
+                        catch(InterruptedException e)
+                        {
+                            LOG.ignore(e);
+                        }
+                        now=System.currentTimeMillis();
+                    }
+
+                    // workout how long to wait in select
+                    _timeout.setNow(now);
+                    long to_next_timeout=_timeout.getTimeToNext();
+
+                    long wait = _changes.size()==0?__IDLE_TICK:0L;
+                    if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
+                        wait = to_next_timeout;
+
+                    // If we should wait with a select
+                    if (wait>0)
+                    {
+                        long before=now;
+                        selector.select(wait);
+                        now = System.currentTimeMillis();
+                        _timeout.setNow(now);
+
+                        // If we are monitoring for busy selector
+                        // and this select did not wait more than 1ms
+                        if (__MONITOR_PERIOD>0 && now-before <=1)
+                        {
+                            // count this as a busy select and if there have been too many this monitor cycle
+                            if (++_busySelects>__MAX_SELECTS)
+                            {
+                                // Start injecting pauses
+                                _pausing=true;
+
+                                // if this is the first pause
+                                if (!_paused)
+                                {
+                                    // Log and dump some status
+                                    _paused=true;
+                                    LOG.warn("Selector {} is too busy, pausing!",this);
+                                }
+                            }
+                        }
+                    }
+                }
+
+                // have we been destroyed while sleeping
+                if (_selector==null || !selector.isOpen())
+                    return;
+
+                // Look for things to do
+                for (SelectionKey key: selector.selectedKeys())
+                {
+                    SocketChannel channel=null;
+
+                    try
+                    {
+                        if (!key.isValid())
+                        {
+                            key.cancel();
+                            SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
+                            if (endpoint != null)
+                                endpoint.doUpdateKey();
+                            continue;
+                        }
+
+                        Object att = key.attachment();
+                        if (att instanceof SelectChannelEndPoint)
+                        {
+                            if (key.isReadable()||key.isWritable())
+                                ((SelectChannelEndPoint)att).schedule();
+                        }
+                        else if (key.isConnectable())
+                        {
+                            // Complete a connection of a registered channel
+                            channel = (SocketChannel)key.channel();
+                            boolean connected=false;
+                            try
+                            {
+                                connected=channel.finishConnect();
+                            }
+                            catch(Exception e)
+                            {
+                                connectionFailed(channel,e,att);
+                            }
+                            finally
+                            {
+                                if (connected)
+                                {
+                                    key.interestOps(SelectionKey.OP_READ);
+                                    SelectChannelEndPoint endpoint = createEndPoint(channel,key);
+                                    key.attach(endpoint);
+                                    endpoint.schedule();
+                                }
+                                else
+                                {
+                                    key.cancel();
+                                    channel.close();
+                                }
+                            }
+                        }
+                        else
+                        {
+                            // Wrap readable registered channel in an endpoint
+                            channel = (SocketChannel)key.channel();
+                            SelectChannelEndPoint endpoint = createEndPoint(channel,key);
+                            key.attach(endpoint);
+                            if (key.isReadable())
+                                endpoint.schedule();
+                        }
+                        key = null;
+                    }
+                    catch (CancelledKeyException e)
+                    {
+                        LOG.ignore(e);
+                    }
+                    catch (Exception e)
+                    {
+                        if (isRunning())
+                            LOG.warn(e);
+                        else
+                            LOG.ignore(e);
+
+                        try
+                        {
+                            if (channel!=null)
+                                channel.close();
+                        }
+                        catch(IOException e2)
+                        {
+                            LOG.debug(e2);
+                        }
+
+                        if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
+                            key.cancel();
+                    }
+                }
+
+                // Everything always handled
+                selector.selectedKeys().clear();
+
+                now=System.currentTimeMillis();
+                _timeout.setNow(now);
+                Task task = _timeout.expired();
+                while (task!=null)
+                {
+                    if (task instanceof Runnable)
+                        dispatch((Runnable)task);
+                    task = _timeout.expired();
+                }
+
+                // Idle tick
+                if (now-_idleTick>__IDLE_TICK)
+                {
+                    _idleTick=now;
+
+                    final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
+                        ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
+                        :now;
+
+                    dispatch(new Runnable()
+                    {
+                        public void run()
+                        {
+                            for (SelectChannelEndPoint endp:_endPoints.keySet())
+                            {
+                                endp.checkIdleTimestamp(idle_now);
+                            }
+                        }
+                        public String toString() {return "Idle-"+super.toString();}
+                    });
+
+                }
+
+                // Reset busy select monitor counts
+                if (__MONITOR_PERIOD>0 && now>_monitorNext)
+                {
+                    _busySelects=0;
+                    _pausing=false;
+                    _monitorNext=now+__MONITOR_PERIOD;
+
+                }
+            }
+            catch (ClosedSelectorException e)
+            {
+                if (isRunning())
+                    LOG.warn(e);
+                else
+                    LOG.ignore(e);
+            }
+            catch (CancelledKeyException e)
+            {
+                LOG.ignore(e);
+            }
+            finally
+            {
+                _selecting=null;
+            }
+        }
+
+
+        /* ------------------------------------------------------------ */
+        private void renewSelector()
+        {
+            try
+            {
+                synchronized (this)
+                {
+                    Selector selector=_selector;
+                    if (selector==null)
+                        return;
+                    final Selector new_selector = Selector.open();
+                    for (SelectionKey k: selector.keys())
+                    {
+                        if (!k.isValid() || k.interestOps()==0)
+                            continue;
+
+                        final SelectableChannel channel = k.channel();
+                        final Object attachment = k.attachment();
+
+                        if (attachment==null)
+                            addChange(channel);
+                        else
+                            addChange(channel,attachment);
+                    }
+                    _selector.close();
+                    _selector=new_selector;
+                }
+            }
+            catch(IOException e)
+            {
+                throw new RuntimeException("recreating selector",e);
+            }
+        }
+
+        /* ------------------------------------------------------------ */
+        public SelectorManager getManager()
+        {
+            return SelectorManager.this;
+        }
+
+        /* ------------------------------------------------------------ */
+        public long getNow()
+        {
+            return _timeout.getNow();
+        }
+
+        /* ------------------------------------------------------------ */
+        /**
+         * @param task The task to timeout. If it implements Runnable, then
+         * expired will be called from a dispatched thread.
+         *
+         * @param timeoutMs
+         */
+        public void scheduleTimeout(Timeout.Task task, long timeoutMs)
+        {
+            if (!(task instanceof Runnable))
+                throw new IllegalArgumentException("!Runnable");
+            _timeout.schedule(task, timeoutMs);
+        }
+
+        /* ------------------------------------------------------------ */
+        public void cancelTimeout(Timeout.Task task)
+        {
+            task.cancel();
+        }
+
+        /* ------------------------------------------------------------ */
+        public void wakeup()
+        {
+            try
+            {
+                Selector selector = _selector;
+                if (selector!=null)
+                    selector.wakeup();
+            }
+            catch(Exception e)
+            {
+                addChange(new ChangeTask()
+                {
+                    public void run()
+                    {
+                        renewSelector();
+                    }
+                });
+
+                renewSelector();
+            }
+        }
+
+        /* ------------------------------------------------------------ */
+        private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
+        {
+            SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
+            LOG.debug("created {}",endp);
+            endPointOpened(endp);
+            _endPoints.put(endp,this);
+            return endp;
+        }
+
+        /* ------------------------------------------------------------ */
+        public void destroyEndPoint(SelectChannelEndPoint endp)
+        {
+            LOG.debug("destroyEndPoint {}",endp);
+            _endPoints.remove(endp);
+            endPointClosed(endp);
+        }
+
+        /* ------------------------------------------------------------ */
+        Selector getSelector()
+        {
+            return _selector;
+        }
+
+        /* ------------------------------------------------------------ */
+        void stop() throws Exception
+        {
+            // Spin for a while waiting for selector to complete
+            // to avoid unneccessary closed channel exceptions
+            try
+            {
+                for (int i=0;i<100 && _selecting!=null;i++)
+                {
+                    wakeup();
+                    Thread.sleep(10);
+                }
+            }
+            catch(Exception e)
+            {
+                LOG.ignore(e);
+            }
+
+            // close endpoints and selector
+            synchronized (this)
+            {
+                Selector selector=_selector;
+                for (SelectionKey key:selector.keys())
+                {
+                    if (key==null)
+                        continue;
+                    Object att=key.attachment();
+                    if (att instanceof EndPoint)
+                    {
+                        EndPoint endpoint = (EndPoint)att;
+                        try
+                        {
+                            endpoint.close();
+                        }
+                        catch(IOException e)
+                        {
+                            LOG.ignore(e);
+                        }
+                    }
+                }
+
+
+                _timeout.cancelAll();
+                try
+                {
+                    selector=_selector;
+                    if (selector != null)
+                        selector.close();
+                }
+                catch (IOException e)
+                {
+                    LOG.ignore(e);
+                }
+                _selector=null;
+            }
+        }
+
+        /* ------------------------------------------------------------ */
+        public String dump()
+        {
+            return AggregateLifeCycle.dump(this);
+        }
+
+        /* ------------------------------------------------------------ */
+        public void dump(Appendable out, String indent) throws IOException
+        {
+            out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
+
+            Thread selecting = _selecting;
+
+            Object where = "not selecting";
+            StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
+            if (trace!=null)
+            {
+                for (StackTraceElement t:trace)
+                    if (t.getClassName().startsWith("org.eclipse.jetty."))
+                    {
+                        where=t;
+                        break;
+                    }
+            }
+
+            Selector selector=_selector;
+            if (selector!=null)
+            {
+                final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
+                dump.add(where);
+
+                final CountDownLatch latch = new CountDownLatch(1);
+
+                addChange(new ChangeTask()
+                {
+                    public void run()
+                    {
+                        dumpKeyState(dump);
+                        latch.countDown();
+                    }
+                });
+
+                try
+                {
+                    latch.await(5,TimeUnit.SECONDS);
+                }
+                catch(InterruptedException e)
+                {
+                    LOG.ignore(e);
+                }
+
+                AggregateLifeCycle.dump(out,indent,dump);
+            }
+        }
+
+        /* ------------------------------------------------------------ */
+        public void dumpKeyState(List<Object> dumpto)
+        {
+            Selector selector=_selector;
+            Set<SelectionKey> keys = selector.keys();
+            dumpto.add(selector + " keys=" + keys.size());
+            for (SelectionKey key: keys)
+            {
+                if (key.isValid())
+                    dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
+                else
+                    dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
+            }
+        }
+
+        /* ------------------------------------------------------------ */
+        public String toString()
+        {
+            Selector selector=_selector;
+            return String.format("%s keys=%d selected=%d",
+                    super.toString(),
+                    selector != null && selector.isOpen() ? selector.keys().size() : -1,
+                    selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
+        }
+    }
+
+    /* ------------------------------------------------------------ */
+    private static class ChannelAndAttachment
+    {
+        final SelectableChannel _channel;
+        final Object _attachment;
+
+        public ChannelAndAttachment(SelectableChannel channel, Object attachment)
+        {
+            super();
+            _channel = channel;
+            _attachment = attachment;
+        }
+    }
+
+    /* ------------------------------------------------------------ */
+    public boolean isDeferringInterestedOps0()
+    {
+        return _deferringInterestedOps0;
+    }
+
+    /* ------------------------------------------------------------ */
+    public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
+    {
+        _deferringInterestedOps0 = deferringInterestedOps0;
+    }
+
+
+    /* ------------------------------------------------------------ */
+    /* ------------------------------------------------------------ */
+    /* ------------------------------------------------------------ */
+    private interface ChangeTask extends Runnable
+    {}
+
+}