view src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 958:fc521d2f098e

simplify SelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 13 Oct 2016 18:48:20 -0600
parents 1094975d013b
children 7b94f5b33c64
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io.nio;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;

import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/* ------------------------------------------------------------ */
/**
 * An Endpoint that can be scheduled by {@link SelectorManager}.
 */
public final class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
{
	public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");

	private final SelectorManager.SelectSet _selectSet;
	private final SelectorManager _manager;
	private  SelectionKey _key;
	private final Runnable _handler = new Runnable()
		{
			public void run() { handle(); }
		};

	/** The desired value for {@link SelectionKey#interestOps()} */
	private int _interestOps;

	/**
	 * The connection instance is the handler for any IO activity on the endpoint.
	 * There is a different type of connection for HTTP, AJP, WebSocket and
	 * ProxyConnect.   The connection may change for an SCEP as it is upgraded
	 * from HTTP to proxy connect or websocket.
	 */
	private volatile AsyncConnection _connection;

	private static final int STATE_NEEDS_DISPATCH = -1;
	private static final int STATE_UNDISPATCHED = 0;
	private static final int STATE_DISPATCHED = 1;
	private int _state;
	
	/** true if the last write operation succeed and wrote all offered bytes */
	private volatile boolean _writable = true;

	/** True if a thread has is blocked in {@link #blockReadable(long)} */
	private boolean _readBlocked;

	/** True if a thread has is blocked in {@link #blockWritable(long)} */
	private boolean _writeBlocked;

	/** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
	private boolean _open;
	
	private boolean _ishut;

	public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
		throws IOException
	{
		super(channel, maxIdleTime);

		_manager = selectSet.getManager();
		_selectSet = selectSet;
		_state = STATE_UNDISPATCHED;
		_open = true;
		_key = key;
	}

	@Override
	public Connection getConnection()
	{
		return _connection;
	}

	@Override
	public void setConnection(Connection connection)
	{
		_connection = (AsyncConnection)connection;
	}

	/* ------------------------------------------------------------ */
	/** Called by selectSet to schedule handling
	 *
	 */
	public synchronized void schedule()
	{
		// If there is no key, then do nothing
		if (_key == null || !_key.isValid())
		{
			_readBlocked = false;
			_writeBlocked = false;
			this.notifyAll();
			return;
		}

		// If there are threads dispatched reading and writing
		if (_readBlocked || _writeBlocked)
		{
			// assert _dispatched;
			if (_readBlocked && _key.isReadable())
				_readBlocked=false;
			if (_writeBlocked && _key.isWritable())
				_writeBlocked = false;

			// wake them up is as good as a dispatched.
			this.notifyAll();

			// we are not interested in further selecting
			_key.interestOps(0);
			if (_state<STATE_DISPATCHED)
				updateKey();
			return;
		}

		// Remove writeable op
		if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
		{
			// Remove writeable op
			_interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
			_key.interestOps(_interestOps);
			_writable = true; // Once writable is in ops, only removed with dispatch.
		}

		// If dispatched, then deregister interest
		if (_state>=STATE_DISPATCHED)
			_key.interestOps(0);
		else
		{
			// other wise do the dispatch
			dispatch();
		}
	}

	@Override
	public synchronized void dispatch()
	{
		if (_state<=STATE_UNDISPATCHED)
		{
			_state = STATE_DISPATCHED;
			try {
				_manager.execute(_handler);
			} catch(RejectedExecutionException e) {
				_state = STATE_NEEDS_DISPATCH;
				LOG.warn("Dispatched Failed! "+this+" to "+_manager);
				updateKey();
			}
		}
	}

	/* ------------------------------------------------------------ */
	/**
	 * Called when a dispatched thread is no longer handling the endpoint.
	 * The selection key operations are updated.
	 * @return If false is returned, the endpoint has been redispatched and
	 * thread must keep handling the endpoint.
	 */
	private synchronized void undispatch()
	{
		_state = STATE_UNDISPATCHED;
		updateKey();
	}

	@Override
	public int fill(Buffer buffer) throws IOException
	{
		int fill=super.fill(buffer);
		return fill;
	}

	@Override
	public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
	{
		int l = super.flush(header, buffer, trailer);

		// If there was something to write and it wasn't written, then we are not writable.
		if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
		{
			synchronized (this)
			{   
				_writable = false;
				if (_state<STATE_DISPATCHED)
					updateKey();
			}
		}
		else if (l>0)
		{
			_writable = true;
		}
		return l;
	}

	@Override
	public int flush(Buffer buffer) throws IOException
	{
		int l = super.flush(buffer);

		// If there was something to write and it wasn't written, then we are not writable.
		if (l==0 && buffer!=null && buffer.hasContent())
		{
			synchronized (this)
			{   
				_writable = false;
				if (_state<STATE_DISPATCHED)
					updateKey();
			}
		}
		else if (l>0)
		{
			_writable = true;
		}

		return l;
	}

	/* ------------------------------------------------------------ */
	/*
	 * Allows thread to block waiting for further events.
	 */
	@Override
	public boolean blockReadable(long timeoutMs) throws IOException
	{
		synchronized (this)
		{
			if (isInputShutdown())
				throw new EofException();

			long now = _selectSet.getNow();
			long end = now+timeoutMs;
			try
			{
				_readBlocked = true;
				while (!isInputShutdown() && _readBlocked)
				{
					try
					{
						updateKey();
						this.wait(timeoutMs>0?(end-now):10000);
					}
					catch (final InterruptedException e)
					{
						LOG.warn("",e);
					}
					finally
					{
						now=_selectSet.getNow();
					}

					if (_readBlocked && timeoutMs>0 && now>=end)
						return false;
				}
			}
			finally
			{
				_readBlocked = false;
			}
		}
		return true;
	}

	/* ------------------------------------------------------------ */
	/*
	 * Allows thread to block waiting for further events.
	 */
	@Override
	public boolean blockWritable(long timeoutMs) throws IOException
	{
		synchronized (this)
		{
			if (isOutputShutdown())
				throw new EofException();

			long now=_selectSet.getNow();
			long end=now+timeoutMs;
			try
			{
				_writeBlocked = true;
				while (_writeBlocked && !isOutputShutdown())
				{
					try
					{
						updateKey();
						this.wait(timeoutMs>0?(end-now):10000);
					}
					catch (final InterruptedException e)
					{
						LOG.warn("",e);
					}
					finally
					{
						now=_selectSet.getNow();
					}
					if (_writeBlocked && timeoutMs>0 && now>=end)
						return false;
				}
			}
			finally
			{
				_writeBlocked = false;
			}
		}
		return true;
	}

	@Override
	public boolean hasProgressed()
	{
		return false;
	}

	/* ------------------------------------------------------------ */
	/**
	 * Updates selection key. Adds operations types to the selection key as needed. No operations
	 * are removed as this is only done during dispatch. This method records the new key and
	 * schedules a call to doUpdateKey to do the keyChange
	 */
	private void updateKey()
	{
		final boolean changed;
		synchronized (this)
		{
			int current_ops=-1;
			if (getChannel().isOpen())
			{
				boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
				boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);

				_interestOps =
					((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
				|   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
				try
				{
					current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
				}
				catch(Exception e)
				{
					_key = null;
					LOG.trace("",e);
				}
			}
			changed = _interestOps!=current_ops;
		}

		if(changed)
		{
			doUpdateKey();
			_selectSet.getSelector().update();
		}
	}


	/* ------------------------------------------------------------ */
	/**
	 * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
	 */
	synchronized void doUpdateKey()
	{
		if (getChannel().isOpen())
		{
			if (_interestOps>0)
			{
				if (_key==null || !_key.isValid())
				{
					SelectableChannel sc = (SelectableChannel)getChannel();
					if (sc.isRegistered())
					{
						updateKey();
					}
					else
					{
						try
						{
							_key = _selectSet.getSelector().register(sc,_interestOps,this);
						}
						catch (Exception e)
						{
							LOG.trace("",e);
							if (_key!=null && _key.isValid())
							{
								_key.cancel();
							}

							if (_open)
							{
								_selectSet.destroyEndPoint(this);
							}
							_open = false;
							_key = null;
						}
					}
				}
				else
				{
					_key.interestOps(_interestOps);
				}
			}
			else
			{
				if (_key!=null && _key.isValid())
					_key.interestOps(0);
				else
					_key = null;
			}
		}
		else
		{
			if (_key!=null && _key.isValid())
				_key.cancel();

			if (_open)
			{
				_open = false;
				_selectSet.destroyEndPoint(this);
			}
			_key = null;
		}
	}

	private void handle()
	{
		boolean dispatched = true;
		try
		{
			try
			{
				while(true)
				{
					final AsyncConnection next = (AsyncConnection)_connection.handle();
					if (next!=_connection)
					{
						LOG.debug("{} replaced {}",next,_connection);
						_connection=next;
						continue;
					}
					break;
				}
			}
			catch (ClosedChannelException e)
			{
				LOG.trace("",e);
			}
			catch (EofException e)
			{
				LOG.debug("EOF", e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			catch (IOException e)
			{
				LOG.warn(e.toString());
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			catch (Throwable e)
			{
				LOG.warn("handle failed", e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			finally
			{
				if (!_ishut && isInputShutdown() && isOpen())
				{
					_ishut = true;
					try
					{
						_connection.onInputShutdown();
					}
					catch(Throwable x)
					{
						LOG.warn("onInputShutdown failed", x);
						try{close();}
						catch(IOException e2){LOG.trace("",e2);}
					}
					finally
					{
						updateKey();
					}
				}
				undispatch();
				dispatched = false;
			}
		}
		finally
		{
			if (dispatched)
			{
				undispatch();
			}
		}
	}

	/* ------------------------------------------------------------ */
	/*
	 * @see org.eclipse.io.nio.ChannelEndPoint#close()
	 */
	@Override
	public void close() throws IOException
	{
		try
		{
			super.close();
		}
		catch (IOException e)
		{
			LOG.trace("",e);
		}
		finally
		{
			updateKey();
		}
	}

	@Override
	public String toString()
	{
		// Do NOT use synchronized (this)
		// because it's very easy to deadlock when debugging is enabled.
		// We do a best effort to print the right toString() and that's it.
		SelectionKey key = _key;
		String keyString = "";
		if (key != null)
		{
			if (key.isValid())
			{
				if (key.isReadable())
					keyString += "r";
				if (key.isWritable())
					keyString += "w";
			}
			else
			{
				keyString += "!";
			}
		}
		else
		{
			keyString += "-";
		}
		return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
				hashCode(),
				_socket.getRemoteSocketAddress(),
				_socket.getLocalSocketAddress(),
				_state,
				isOpen(),
				isInputShutdown(),
				isOutputShutdown(),
				_readBlocked,
				_writeBlocked,
				_writable,
				_interestOps,
				keyString,
				_connection);
	}

	/* ------------------------------------------------------------ */
	/**
	 * Don't set the SoTimeout
	 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
	 */
	@Override
	public void setMaxIdleTime(int timeMs) throws IOException
	{
		_maxIdleTime = timeMs;
	}

}