Mercurial Hosting > luan
view src/org/eclipse/jetty/io/nio/SelectorManager.java @ 947:64f3d8dae31d
simplify SelectChannelEndPoint
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Tue, 11 Oct 2016 21:33:40 -0600 |
parents | 1d24b6e422fa |
children | f5aefdc4a81a |
line wrap: on
line source
// // ======================================================================== // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. // // The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php // // You may elect to redistribute this code under either of these licenses. // ======================================================================== // package org.eclipse.jetty.io.nio; import java.io.IOException; import java.nio.channels.CancelledKeyException; import java.nio.channels.Channel; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /* ------------------------------------------------------------ */ /** * The Selector Manager manages and number of SelectSets to allow * NIO scheduling to scale to large numbers of connections. * <p> */ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); private int _maxIdleTime; private long _lowResourcesConnections; private SelectSet[] _selectSet; private int _selectSets=1; private volatile int _set=0; /* ------------------------------------------------------------ */ /** * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. * @see #setLowResourcesMaxIdleTime(long) */ public void setMaxIdleTime(long maxIdleTime) { _maxIdleTime=(int)maxIdleTime; } /* ------------------------------------------------------------ */ /** * @param selectSets number of select sets to create */ public void setSelectSets(int selectSets) { long lrc = _lowResourcesConnections * _selectSets; _selectSets=selectSets; _lowResourcesConnections=lrc/_selectSets; } /* ------------------------------------------------------------ */ /** * @return the max idle time */ public long getMaxIdleTime() { return _maxIdleTime; } /* ------------------------------------------------------------ */ /** * @return the number of select sets in use */ public int getSelectSets() { return _selectSets; } /* ------------------------------------------------------------ */ /** * @param i * @return The select set */ public SelectSet getSelectSet(int i) { return _selectSet[i]; } /* ------------------------------------------------------------ */ /** Register a channel * @param channel */ public void register(SocketChannel channel) { // The ++ increment here is not atomic, but it does not matter. // so long as the value changes sometimes, then connections will // be distributed over the available sets. int s=_set++; if (s<0) s=-s; s=s%_selectSets; SelectSet[] sets=_selectSet; if (sets!=null) { SelectSet set=sets[s]; set.addChange(channel); set.wakeup(); } } /* ------------------------------------------------------------ */ /** Register a {@link ServerSocketChannel} * @param acceptChannel */ public void register(ServerSocketChannel acceptChannel) { int s=_set++; if (s<0) s=-s; s=s%_selectSets; SelectSet set=_selectSet[s]; set.addChange(acceptChannel); set.wakeup(); } /* ------------------------------------------------------------ */ /** * @return the lowResourcesConnections */ public long getLowResourcesConnections() { return _lowResourcesConnections*_selectSets; } /* ------------------------------------------------------------ */ /** * Set the number of connections, which if exceeded places this manager in low resources state. * This is not an exact measure as the connection count is averaged over the select sets. * @param lowResourcesConnections the number of connections * @see #setLowResourcesMaxIdleTime(long) */ public void setLowResourcesConnections(long lowResourcesConnections) { _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; } public abstract void execute(Runnable task); /* ------------------------------------------------------------ */ /* (non-Javadoc) * @see org.eclipse.component.AbstractLifeCycle#doStart() */ @Override protected void doStart() throws Exception { _selectSet = new SelectSet[_selectSets]; for (int i=0;i<_selectSet.length;i++) _selectSet[i]= new SelectSet(i); super.doStart(); // start a thread to Select for (int i=0;i<getSelectSets();i++) { final int id=i; execute(new Runnable() { public void run() { String name=Thread.currentThread().getName(); try { SelectSet[] sets=_selectSet; if (sets==null) return; SelectSet set=sets[id]; Thread.currentThread().setName(name+" Selector"+id); LOG.debug("Starting {} on {}",Thread.currentThread(),this); while (isRunning()) { try { set.doSelect(); } catch(IOException e) { LOG.trace("",e); } catch(Exception e) { LOG.warn("",e); } } } finally { LOG.debug("Stopped {} on {}",Thread.currentThread(),this); Thread.currentThread().setName(name); } } }); } } /* ------------------------------------------------------------------------------- */ @Override protected void doStop() throws Exception { SelectSet[] sets= _selectSet; _selectSet=null; if (sets!=null) { for (SelectSet set : sets) { if (set!=null) set.stop(); } } super.doStop(); } /* ------------------------------------------------------------ */ /** * @param endpoint */ protected abstract void endPointClosed(SelectChannelEndPoint endpoint); /* ------------------------------------------------------------------------------- */ public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); /* ------------------------------------------------------------ */ /** * Create a new end point * @param channel * @param selectSet * @param sKey the selection key * @return the new endpoint {@link SelectChannelEndPoint} * @throws IOException */ protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; /* ------------------------------------------------------------------------------- */ protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) { LOG.warn(ex+","+channel+","+attachment); LOG.debug("",ex); } /* ------------------------------------------------------------ */ public String dump() { return AggregateLifeCycle.dump(this); } /* ------------------------------------------------------------ */ public void dump(Appendable out, String indent) throws IOException { AggregateLifeCycle.dumpObject(out,this); AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); } /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ public class SelectSet implements Dumpable { private final int _setID; private volatile long _now = System.currentTimeMillis(); private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); private volatile Selector _selector; private volatile Thread _selecting; private int _busySelects; private long _monitorNext; private boolean _pausing; private boolean _paused; private volatile long _idleTick; private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); SelectSet(int acceptorID) throws Exception { _setID=acceptorID; _idleTick = System.currentTimeMillis(); // create a selector; _selector = Selector.open(); _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; } public void addChange(Object change) { _changes.add(change); } /* ------------------------------------------------------------ */ /** * Select and dispatch tasks found from changes and the selector. * * @throws IOException */ public void doSelect() throws IOException { try { _selecting=Thread.currentThread(); final Selector selector=_selector; // Stopped concurrently ? if (selector == null) return; // Make any key changes required Object change; int changes=_changes.size(); while (changes-->0 && (change=_changes.poll())!=null) { Channel ch=null; SelectionKey key=null; try { if (change instanceof EndPoint) { // Update the operations for a key. SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; ch=endpoint.getChannel(); endpoint.doUpdateKey(); } else if (change instanceof SocketChannel) { // Newly registered channel final SocketChannel channel=(SocketChannel)change; ch=channel; key = channel.register(selector,SelectionKey.OP_READ,null); SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); endpoint.schedule(); } else if (change instanceof Runnable) { execute((Runnable)change); } else throw new IllegalArgumentException(change.toString()); } catch (CancelledKeyException e) { LOG.trace("",e); } catch (Throwable e) { if (isRunning()) LOG.warn("",e); else LOG.debug("",e); try { if (ch!=null) ch.close(); } catch(IOException e2) { LOG.debug("",e2); } } } // Do and instant select to see if any connections can be handled. int selected=selector.selectNow(); _now = System.currentTimeMillis(); // if no immediate things to do if (selected==0 && selector.selectedKeys().isEmpty()) { // If we are in pausing mode if (_pausing) { try { Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop } catch(InterruptedException e) { LOG.trace("",e); } _now = System.currentTimeMillis(); } // workout how long to wait in select long wait = _changes.size()==0?__IDLE_TICK:0L; // If we should wait with a select if (wait>0) { long before = _now; selector.select(wait); _now = System.currentTimeMillis(); // If we are monitoring for busy selector // and this select did not wait more than 1ms if (__MONITOR_PERIOD>0 && _now-before <=1) { // count this as a busy select and if there have been too many this monitor cycle if (++_busySelects>__MAX_SELECTS) { // Start injecting pauses _pausing=true; // if this is the first pause if (!_paused) { // Log and dump some status _paused=true; LOG.warn("Selector {} is too busy, pausing!",this); } } } } } // have we been destroyed while sleeping if (_selector==null || !selector.isOpen()) return; // Look for things to do for (SelectionKey key: selector.selectedKeys()) { SocketChannel channel=null; try { if (!key.isValid()) { key.cancel(); SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); if (endpoint != null) endpoint.doUpdateKey(); continue; } Object att = key.attachment(); if (att instanceof SelectChannelEndPoint) { if (key.isReadable()||key.isWritable()) ((SelectChannelEndPoint)att).schedule(); } else if (key.isConnectable()) { // Complete a connection of a registered channel channel = (SocketChannel)key.channel(); boolean connected=false; try { connected=channel.finishConnect(); } catch(Exception e) { connectionFailed(channel,e,att); } finally { if (connected) { key.interestOps(SelectionKey.OP_READ); SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); endpoint.schedule(); } else { key.cancel(); channel.close(); } } } else { // Wrap readable registered channel in an endpoint channel = (SocketChannel)key.channel(); SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); if (key.isReadable()) endpoint.schedule(); } key = null; } catch (CancelledKeyException e) { LOG.trace("",e); } catch (Exception e) { if (isRunning()) LOG.warn("",e); else LOG.trace("",e); try { if (channel!=null) channel.close(); } catch(IOException e2) { LOG.debug("",e2); } if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) key.cancel(); } } // Everything always handled selector.selectedKeys().clear(); _now = System.currentTimeMillis(); // Idle tick if (_now-_idleTick>__IDLE_TICK) { _idleTick = _now; final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) ?(_now+_maxIdleTime) :_now; execute(new Runnable() { public void run() { for (SelectChannelEndPoint endp:_endPoints.keySet()) { endp.checkIdleTimestamp(idle_now); } } public String toString() {return "Idle-"+super.toString();} }); } // Reset busy select monitor counts if (__MONITOR_PERIOD>0 && _now>_monitorNext) { _busySelects=0; _pausing=false; _monitorNext=_now+__MONITOR_PERIOD; } } catch (ClosedSelectorException e) { if (isRunning()) LOG.warn("",e); else LOG.trace("",e); } catch (CancelledKeyException e) { LOG.trace("",e); } finally { _selecting=null; } } public SelectorManager getManager() { return SelectorManager.this; } public long getNow() { return _now; } public void wakeup() { Selector selector = _selector; if (selector!=null) selector.wakeup(); } private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException { SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); LOG.debug("created {}",endp); _endPoints.put(endp,this); return endp; } public void destroyEndPoint(SelectChannelEndPoint endp) { LOG.debug("destroyEndPoint {}",endp); _endPoints.remove(endp); endPointClosed(endp); } Selector getSelector() { return _selector; } void stop() throws Exception { // Spin for a while waiting for selector to complete // to avoid unneccessary closed channel exceptions try { for (int i=0;i<100 && _selecting!=null;i++) { wakeup(); Thread.sleep(10); } } catch(Exception e) { LOG.warn("",e); } // close endpoints and selector synchronized (this) { Selector selector=_selector; for (SelectionKey key:selector.keys()) { if (key==null) continue; Object att=key.attachment(); if (att instanceof EndPoint) { EndPoint endpoint = (EndPoint)att; try { endpoint.close(); } catch(IOException e) { LOG.trace("",e); } } } try { selector=_selector; if (selector != null) selector.close(); } catch (IOException e) { LOG.trace("",e); } _selector = null; } } public String dump() { return AggregateLifeCycle.dump(this); } public void dump(Appendable out, String indent) throws IOException { out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); AggregateLifeCycle.dump(out,indent,Collections.emptyList()); } public String toString() { Selector selector=_selector; return String.format("%s keys=%d selected=%d", super.toString(), selector != null && selector.isOpen() ? selector.keys().size() : -1, selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); } } }