view src/org/eclipse/jetty/io/nio/SelectorManager.java @ 960:3cd4c706a61f

simplify ChannelEndPoint
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 13 Oct 2016 21:29:19 -0600
parents 7b94f5b33c64
children 94498d6daf5b
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io.nio;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channel;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/* ------------------------------------------------------------ */
/**
 * The Selector Manager manages and number of SelectSets to allow
 * NIO scheduling to scale to large numbers of connections.
 * <p>
 */
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
{
	public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");

	private int _maxIdleTime;
	private long _lowResourcesConnections;
	private SelectSet _selectSet;

	/* ------------------------------------------------------------ */
	/**
	 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
	 * @see #setLowResourcesMaxIdleTime(long)
	 */
	public void setMaxIdleTime(int maxIdleTime)
	{
		_maxIdleTime = maxIdleTime;
	}

	/* ------------------------------------------------------------ */
	/** Register a channel
	 * @param channel
	 */
	public void register(SocketChannel channel)
	{
		SelectSet set = _selectSet;
		if (set!=null)
		{
			set.addChange(channel);
		}
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return the lowResourcesConnections
	 */
	public long getLowResourcesConnections()
	{
		return _lowResourcesConnections;
	}

	/* ------------------------------------------------------------ */
	/**
	 * Set the number of connections, which if exceeded places this manager in low resources state.
	 * This is not an exact measure as the connection count is averaged over the select sets.
	 * @param lowResourcesConnections the number of connections
	 * @see #setLowResourcesMaxIdleTime(long)
	 */
	public void setLowResourcesConnections(long lowResourcesConnections)
	{
		_lowResourcesConnections = lowResourcesConnections;
	}


	public abstract void execute(Runnable task);

	/* ------------------------------------------------------------ */
	/* (non-Javadoc)
	 * @see org.eclipse.component.AbstractLifeCycle#doStart()
	 */
	@Override
	protected void doStart() throws Exception
	{
		_selectSet = new SelectSet();

		super.doStart();

		// start a thread to Select
		execute(new Runnable()
		{
			public void run()
			{
				String name=Thread.currentThread().getName();
				try
				{
					SelectSet set = _selectSet;
					if (set==null)
						return;

					Thread.currentThread().setName(name+" Selector");
					LOG.debug("Starting {} on {}",Thread.currentThread(),this);
					while (isRunning())
					{
						try
						{
							set.doSelect();
						}
						catch(IOException e)
						{
							LOG.trace("",e);
						}
						catch(Exception e)
						{
							LOG.warn("",e);
						}
					}
				}
				finally
				{
					LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
					Thread.currentThread().setName(name);
				}
			}

		});
	}


	@Override
	protected void doStop() throws Exception
	{
		SelectSet set = _selectSet;
		_selectSet = null;
		if (set!=null)
		{
			set.stop();
		}
		super.doStop();
	}

	public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);

	@Override
	public String dump()
	{
		return AggregateLifeCycle.dump(this);
	}

	@Override
	public void dump(Appendable out, String indent) throws IOException
	{
		AggregateLifeCycle.dumpObject(out,this);
		AggregateLifeCycle.dump(out,indent,Collections.singletonList(_selectSet));
	}


	public final class SelectSet implements Dumpable
	{
		private volatile long _now = System.currentTimeMillis();

		private final SaneSelector _selector;

		private SelectSet() throws IOException
		{
			_selector = new SaneSelector();
		}

		private void addChange(SocketChannel channel)
		{
			try {
//System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq a");
//				SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null);
				SelectionKey key = _selector.register(channel,0,null);

				SelectChannelEndPoint endpoint = new SelectChannelEndPoint(channel,this,key, _maxIdleTime);
				endpoint.setConnection(newConnection(channel,endpoint));

				key.attach(endpoint);
				key.interestOps(SelectionKey.OP_READ);
				_selector.update();
//System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq b");
				endpoint.schedule();
			} catch(IOException e) {
				LOG.warn("",e);
				try {
					channel.close();
				} catch(IOException e2) {
					LOG.warn("",e2);
				}
			}
		}

		private void doSelect() throws IOException
		{
			try
			{
				_selector.select();

				// Look for things to do
				for (SelectionKey key: _selector.selectedKeys())
				{
					try
					{
						if (!key.isValid())
						{
							key.cancel();
							SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
							if (endpoint != null)
								endpoint.doUpdateKey();
							continue;
						}

						if (key.isReadable()||key.isWritable()) {
							SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
							endpoint.schedule();
						}
					}
					catch (CancelledKeyException e)
					{
						LOG.trace("",e);
					}
					catch (Exception e)
					{
						if (isRunning())
							LOG.warn("",e);
						else
							LOG.trace("",e);
					}
				}

				// Everything always handled
				_selector.selectedKeys().clear();

				_now = System.currentTimeMillis();
			}
			catch (ClosedSelectorException e)
			{
				if (isRunning())
					LOG.warn("",e);
				else
					LOG.trace("",e);
			}
			catch (CancelledKeyException e)
			{
				LOG.trace("",e);
			}
		}

		public SelectorManager getManager()
		{
			return SelectorManager.this;
		}

		public long getNow()
		{
			return _now;
		}

		SaneSelector getSelector()
		{
			return _selector;
		}

		private synchronized void stop() throws Exception
		{
			// close endpoints and selector
			for (SelectionKey key : _selector.keys())
			{
				EndPoint endpoint = (EndPoint)key.attachment();
				try
				{
					endpoint.close();
				}
				catch(IOException e)
				{
					LOG.trace("",e);
				}
			}

			try
			{
				_selector.close();
			}
			catch (IOException e)
			{
				LOG.trace("",e);
			}
		}

		@Override
		public String dump()
		{
			return AggregateLifeCycle.dump(this);
		}

		@Override
		public void dump(Appendable out, String indent) throws IOException
		{
			out.append(String.valueOf(this)).append("\n");
			AggregateLifeCycle.dump(out,indent,Collections.emptyList());
		}

		@Override
		public String toString()
		{
			SaneSelector selector=_selector;
			return String.format("%s keys=%d selected=%d",
					super.toString(),
					selector != null && selector.isOpen() ? selector.keys().size() : -1,
					selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
		}
	}

}