Mercurial Hosting > luan
view src/org/eclipse/jetty/io/nio/SelectorManager.java @ 961:790c01734386
make SelectChannelEndPoint._key final
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 13 Oct 2016 22:03:24 -0600 |
parents | 3cd4c706a61f |
children | 94498d6daf5b |
line wrap: on
line source
// // ======================================================================== // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. // // The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php // // You may elect to redistribute this code under either of these licenses. // ======================================================================== // package org.eclipse.jetty.io.nio; import java.io.IOException; import java.nio.channels.CancelledKeyException; import java.nio.channels.Channel; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /* ------------------------------------------------------------ */ /** * The Selector Manager manages and number of SelectSets to allow * NIO scheduling to scale to large numbers of connections. * <p> */ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable { public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); private int _maxIdleTime; private long _lowResourcesConnections; private SelectSet _selectSet; /* ------------------------------------------------------------ */ /** * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. * @see #setLowResourcesMaxIdleTime(long) */ public void setMaxIdleTime(int maxIdleTime) { _maxIdleTime = maxIdleTime; } /* ------------------------------------------------------------ */ /** Register a channel * @param channel */ public void register(SocketChannel channel) { SelectSet set = _selectSet; if (set!=null) { set.addChange(channel); } } /* ------------------------------------------------------------ */ /** * @return the lowResourcesConnections */ public long getLowResourcesConnections() { return _lowResourcesConnections; } /* ------------------------------------------------------------ */ /** * Set the number of connections, which if exceeded places this manager in low resources state. * This is not an exact measure as the connection count is averaged over the select sets. * @param lowResourcesConnections the number of connections * @see #setLowResourcesMaxIdleTime(long) */ public void setLowResourcesConnections(long lowResourcesConnections) { _lowResourcesConnections = lowResourcesConnections; } public abstract void execute(Runnable task); /* ------------------------------------------------------------ */ /* (non-Javadoc) * @see org.eclipse.component.AbstractLifeCycle#doStart() */ @Override protected void doStart() throws Exception { _selectSet = new SelectSet(); super.doStart(); // start a thread to Select execute(new Runnable() { public void run() { String name=Thread.currentThread().getName(); try { SelectSet set = _selectSet; if (set==null) return; Thread.currentThread().setName(name+" Selector"); LOG.debug("Starting {} on {}",Thread.currentThread(),this); while (isRunning()) { try { set.doSelect(); } catch(IOException e) { LOG.trace("",e); } catch(Exception e) { LOG.warn("",e); } } } finally { LOG.debug("Stopped {} on {}",Thread.currentThread(),this); Thread.currentThread().setName(name); } } }); } @Override protected void doStop() throws Exception { SelectSet set = _selectSet; _selectSet = null; if (set!=null) { set.stop(); } super.doStop(); } public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint); @Override public String dump() { return AggregateLifeCycle.dump(this); } @Override public void dump(Appendable out, String indent) throws IOException { AggregateLifeCycle.dumpObject(out,this); AggregateLifeCycle.dump(out,indent,Collections.singletonList(_selectSet)); } public final class SelectSet implements Dumpable { private volatile long _now = System.currentTimeMillis(); private final SaneSelector _selector; private SelectSet() throws IOException { _selector = new SaneSelector(); } private void addChange(SocketChannel channel) { try { //System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq a"); // SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null); SelectionKey key = _selector.register(channel,0,null); SelectChannelEndPoint endpoint = new SelectChannelEndPoint(channel,this,key, _maxIdleTime); endpoint.setConnection(newConnection(channel,endpoint)); key.attach(endpoint); key.interestOps(SelectionKey.OP_READ); _selector.update(); //System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq b"); endpoint.schedule(); } catch(IOException e) { LOG.warn("",e); try { channel.close(); } catch(IOException e2) { LOG.warn("",e2); } } } private void doSelect() throws IOException { try { _selector.select(); // Look for things to do for (SelectionKey key: _selector.selectedKeys()) { try { if (!key.isValid()) { key.cancel(); SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); if (endpoint != null) endpoint.doUpdateKey(); continue; } if (key.isReadable()||key.isWritable()) { SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); endpoint.schedule(); } } catch (CancelledKeyException e) { LOG.trace("",e); } catch (Exception e) { if (isRunning()) LOG.warn("",e); else LOG.trace("",e); } } // Everything always handled _selector.selectedKeys().clear(); _now = System.currentTimeMillis(); } catch (ClosedSelectorException e) { if (isRunning()) LOG.warn("",e); else LOG.trace("",e); } catch (CancelledKeyException e) { LOG.trace("",e); } } public SelectorManager getManager() { return SelectorManager.this; } public long getNow() { return _now; } SaneSelector getSelector() { return _selector; } private synchronized void stop() throws Exception { // close endpoints and selector for (SelectionKey key : _selector.keys()) { EndPoint endpoint = (EndPoint)key.attachment(); try { endpoint.close(); } catch(IOException e) { LOG.trace("",e); } } try { _selector.close(); } catch (IOException e) { LOG.trace("",e); } } @Override public String dump() { return AggregateLifeCycle.dump(this); } @Override public void dump(Appendable out, String indent) throws IOException { out.append(String.valueOf(this)).append("\n"); AggregateLifeCycle.dump(out,indent,Collections.emptyList()); } @Override public String toString() { SaneSelector selector=_selector; return String.format("%s keys=%d selected=%d", super.toString(), selector != null && selector.isOpen() ? selector.keys().size() : -1, selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); } } }