view src/org/eclipse/jetty/io/nio/SelectorManager.java @ 970:d82eb99e8df6

remove ConnectorSelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 14 Oct 2016 05:24:18 -0600
parents 0650077fcd6c
children f997df37cec1
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io.nio;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channel;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/* ------------------------------------------------------------ */
/**
 * The Selector Manager manages and number of SelectSets to allow
 * NIO scheduling to scale to large numbers of connections.
 * <p>
 */
public final class SelectorManager extends AbstractLifeCycle implements Dumpable
{
	public static final Logger LOG = LoggerFactory.getLogger("org.eclipse.jetty.io.nio");

	private final SelectChannelConnector scc;
	private volatile long _now = System.currentTimeMillis();
	private SaneSelector _selector;

	/* ------------------------------------------------------------ */
	/**
	 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
	 * @see #setLowResourcesMaxIdleTime(long)
	 */
	public SelectorManager(SelectChannelConnector scc)
	{
		this.scc = scc;
	}

	/* ------------------------------------------------------------ */
	/** Register a channel
	 * @param channel
	 */
	public final void register(SocketChannel channel)
	{
/*
		SelectSet set = _selectSet;
		if (set!=null)
		{
			set.addChange(channel);
		}
*/
		try {
			SelectionKey key = _selector.register(channel,0,null);
			SelectChannelEndPoint endpoint = new SelectChannelEndPoint(channel,this,key, scc.getMaxIdleTime());
			key.attach(endpoint);
			_selector.update();
//System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq b");
			endpoint.schedule();
		} catch(IOException e) {
			LOG.warn("",e);
			try {
				channel.close();
			} catch(IOException e2) {
				LOG.warn("",e2);
			}
		}
	}


	public void execute(Runnable task) {
		scc.server.threadPool.execute(task);
	}


	@Override
	protected void doStart() throws Exception
	{
		_selector = new SaneSelector();

		super.doStart();

		// start a thread to Select
		execute(new Runnable()
		{
			public void run()
			{
				String name=Thread.currentThread().getName();
				try
				{
					if (_selector==null)
						return;

					Thread.currentThread().setName(name+" Selector");
					LOG.debug("Starting {} on {}",Thread.currentThread(),this);
					while (isRunning())
					{
						try
						{
							doSelect();
						}
						catch(IOException e)
						{
							LOG.trace("",e);
						}
						catch(Exception e)
						{
							LOG.warn("",e);
						}
					}
				}
				finally
				{
					_selector = null;
					LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
					Thread.currentThread().setName(name);
				}
			}

		});
	}


	@Override
	protected synchronized void doStop() throws Exception
	{
		if (_selector!=null)
		{
			// close endpoints and selector
			for (SelectionKey key : _selector.keys())
			{
				EndPoint endpoint = (EndPoint)key.attachment();
				try
				{
					endpoint.close();
				}
				catch(IOException e)
				{
					LOG.trace("",e);
				}
			}
	
			try
			{
				_selector.close();
			}
			catch (IOException e)
			{
				LOG.trace("",e);
			}
		}
		super.doStop();
	}

	public AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) {
		return scc.newConnection(channel,endpoint);
	}

	@Override
	public String dump()
	{
		return AggregateLifeCycle.dump(this);
	}

	@Override
	public void dump(Appendable out, String indent) throws IOException
	{
		AggregateLifeCycle.dumpObject(out,this);
//		AggregateLifeCycle.dump(out,indent,Collections.emptyList());
	}


	// from SelectSet

	private void doSelect() throws IOException
	{
		try
		{
			_selector.select();

			// Look for things to do
			for (SelectionKey key: _selector.selectedKeys())
			{
				try
				{
					if (!key.isValid())
					{
						key.cancel();
						continue;
					}

					if (key.isReadable()||key.isWritable()) {
						SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
						endpoint.schedule();
					}
				}
				catch (CancelledKeyException e)
				{
					// LOG.trace("",e);
					LOG.warn("",e);
				}
				catch (Exception e)
				{
					LOG.warn("",e);
				}
			}

			// Everything always handled
			_selector.selectedKeys().clear();

			_now = System.currentTimeMillis();
		}
		catch (ClosedSelectorException e)
		{
			if (isRunning())
				LOG.warn("",e);
			else
				LOG.trace("",e);
		}
		catch (CancelledKeyException e)
		{
			LOG.trace("",e);
		}
	}

	public final long getNow()
	{
		return _now;
	}

	final SaneSelector getSelector()
	{
		return _selector;
	}

	@Override
	public final String toString()
	{
		SaneSelector selector=_selector;
		return String.format("%s keys=%d selected=%d",
				super.toString(),
				selector != null && selector.isOpen() ? selector.keys().size() : -1,
				selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
	}

}