view src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 972:5ee36654b383

simplify AbstractHttpConnection
author Franklin Schmidt <fschmidt@gmail.com>
date Sat, 15 Oct 2016 22:42:05 -0600
parents 0650077fcd6c
children 35d04ac3fd0b
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io.nio;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;

import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EofException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/* ------------------------------------------------------------ */
/**
 * An Endpoint that can be scheduled by {@link SelectorManager}.
 */
public final class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint
{
	public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");

	private final SelectorManager _manager;
	private final SelectionKey _key;
	private final Runnable _handler = new Runnable()
		{
			public void run() { handle(); }
		};

	/**
	 * The connection instance is the handler for any IO activity on the endpoint.
	 * There is a different type of connection for HTTP, AJP, WebSocket and
	 * ProxyConnect.   The connection may change for an SCEP as it is upgraded
	 * from HTTP to proxy connect or websocket.
	 */
	private final AsyncConnection _connection;
/*
	private static final int STATE_NEEDS_DISPATCH = -1;
	private static final int STATE_UNDISPATCHED = 0;
	private static final int STATE_DISPATCHED = 1;
	private int _state;
*/
	private boolean isDispatched = false;

	/** true if the last write operation succeed and wrote all offered bytes */
	private volatile boolean _writable = true;

	/** True if a thread has is blocked in {@link #blockReadable(long)} */
	private boolean _readBlocked;

	/** True if a thread has is blocked in {@link #blockWritable(long)} */
	private boolean _writeBlocked;

	private boolean _ishut = false;

	public SelectChannelEndPoint(SocketChannel channel, SelectorManager manager, SelectionKey key, int maxIdleTime)
		throws IOException
	{
		super(channel, maxIdleTime);

		_manager = manager;
		_key = key;
		_connection = manager.newConnection(channel,this);
	}

	/* ------------------------------------------------------------ */
	/** Called by selectSet to schedule handling
	 *
	 */
	public synchronized void schedule()
	{
		if (!_key.isValid())
		{
/*
			_readBlocked = false;
			_writeBlocked = false;
			this.notifyAll();
*/
			_key.cancel();
			return;
		}

		// If there are threads dispatched reading and writing
		if (_readBlocked || _writeBlocked)
		{
			// assert _dispatched;
			if (_readBlocked && _key.isReadable())
				_readBlocked = false;
			if (_writeBlocked && _key.isWritable())
				_writeBlocked = false;

			// wake them up is as good as a dispatched.
			this.notifyAll();

			// we are not interested in further selecting
			_key.interestOps(0);
			if( !isDispatched )
				updateKey();
			return;
		}

		// Remove writeable op
		if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
		{
			// Remove writeable op
			int interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
			_key.interestOps(interestOps);
			_writable = true; // Once writable is in ops, only removed with dispatch.
		}

		// If dispatched, then deregister interest
		if (isDispatched)
			_key.interestOps(0);
		else
		{
			// other wise do the dispatch
			dispatch();
		}
	}

	@Override
	public synchronized void dispatch()
	{
		if( !isDispatched )
		{
			isDispatched = true;
			try {
				_manager.execute(_handler);
			} catch(RejectedExecutionException e) {
				isDispatched = false;
				LOG.warn("Dispatched Failed! "+this+" to "+_manager);
//				updateKey();
			}
		}
	}

	@Override
	public int fill(Buffer buffer) throws IOException
	{
		int fill=super.fill(buffer);
		return fill;
	}

	@Override
	public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
	{
		int l = super.flush(header, buffer, trailer);

		// If there was something to write and it wasn't written, then we are not writable.
		if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
		{
			synchronized (this)
			{   
				_writable = false;
				if( !isDispatched )
					updateKey();
			}
		}
		else if (l>0)
		{
			_writable = true;
		}
		return l;
	}

	@Override
	public int flush(Buffer buffer) throws IOException
	{
		int l = super.flush(buffer);

		// If there was something to write and it wasn't written, then we are not writable.
		if (l==0 && buffer!=null && buffer.hasContent())
		{
			synchronized (this)
			{   
				_writable = false;
				if( !isDispatched )
					updateKey();
			}
		}
		else if (l>0)
		{
			_writable = true;
		}

		return l;
	}

	/* ------------------------------------------------------------ */
	/*
	 * Allows thread to block waiting for further events.
	 */
	@Override
	public synchronized boolean blockReadable(long timeoutMs) throws IOException
	{
		if (isInputShutdown())
			throw new EofException();

		long now = _manager.getNow();
		long end = now+timeoutMs;
		try
		{
			_readBlocked = true;
			while (!isInputShutdown() && _readBlocked)
			{
				try
				{
					updateKey();
					this.wait(timeoutMs>0?(end-now):10000);
				}
				catch (final InterruptedException e)
				{
					LOG.warn("",e);
				}
				finally
				{
					now = _manager.getNow();
				}

				if (_readBlocked && timeoutMs>0 && now>=end)
					return false;
			}
		}
		finally
		{
			_readBlocked = false;
		}
		return true;
	}

	/* ------------------------------------------------------------ */
	/*
	 * Allows thread to block waiting for further events.
	 */
	@Override
	public synchronized boolean blockWritable(long timeoutMs) throws IOException
	{
		if (isOutputShutdown())
			throw new EofException();

		long now=_manager.getNow();
		long end=now+timeoutMs;
		try
		{
			_writeBlocked = true;
			while (_writeBlocked && !isOutputShutdown())
			{
				try
				{
					updateKey();
					this.wait(timeoutMs>0?(end-now):10000);
				}
				catch (final InterruptedException e)
				{
					LOG.warn("",e);
				}
				finally
				{
					now = _manager.getNow();
				}
				if (_writeBlocked && timeoutMs>0 && now>=end)
					return false;
			}
		}
		finally
		{
			_writeBlocked = false;
		}
		return true;
	}

	@Override
	public boolean hasProgressed()
	{
		return false;
	}

	/* ------------------------------------------------------------ */
	/**
	 * Updates selection key. Adds operations types to the selection key as needed. No operations
	 * are removed as this is only done during dispatch. This method records the new key and
	 * schedules a call to doUpdateKey to do the keyChange
	 */
	private synchronized void updateKey()
	{
		if( getChannel().isOpen() && _key.isValid())
		{
			boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended());
			boolean write_interest = _writeBlocked || (!isDispatched && !_writable);
//			boolean write_interest = _writeBlocked || !isDispatched;
//			boolean write_interest = true;

			int interestOps =
				((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
			|   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
			if( _key.interestOps() != interestOps ) {
				_key.interestOps(interestOps);
				_manager.getSelector().update();
			}
		} else {
			_key.cancel();
			// update needed?
		}
	}


	private void handle()
	{
		try
		{
			try
			{
				_connection.handle();
			}
			catch (ClosedChannelException e)
			{
				LOG.trace("",e);
			}
			catch (EofException e)
			{
				LOG.debug("EOF", e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			catch (IOException e)
			{
				LOG.warn(e.toString());
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			catch (Throwable e)
			{
				LOG.warn("handle failed", e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			finally
			{
				if (!_ishut && isInputShutdown() && isOpen())
				{
					_ishut = true;
					try
					{
						_connection.onInputShutdown();
					}
					catch(Throwable x)
					{
						LOG.warn("onInputShutdown failed", x);
						try{close();}
						catch(IOException e2){LOG.trace("",e2);}
					}
				}
			}
		}
		finally
		{
			isDispatched = false;
			updateKey();
		}
	}

	@Override
	public void close() throws IOException
	{
		try
		{
			super.close();
		}
		catch (IOException e)
		{
			LOG.trace("",e);
		}
		finally
		{
			updateKey();
		}
	}

	@Override
	public String toString()
	{
		// Do NOT use synchronized (this)
		// because it's very easy to deadlock when debugging is enabled.
		// We do a best effort to print the right toString() and that's it.
		SelectionKey key = _key;
		String keyString = "";
		if (key.isValid())
		{
			if (key.isReadable())
				keyString += "r";
			if (key.isWritable())
				keyString += "w";
		}
		else
		{
			keyString += "!";
		}
		return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}",
				hashCode(),
				_socket.getRemoteSocketAddress(),
				_socket.getLocalSocketAddress(),
				isDispatched,
				isOpen(),
				isInputShutdown(),
				isOutputShutdown(),
				_readBlocked,
				_writeBlocked,
				_writable,
				keyString,
				_connection);
	}

	/* ------------------------------------------------------------ */
	/**
	 * Don't set the SoTimeout
	 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
	 */
	@Override
	public void setMaxIdleTime(int timeMs) throws IOException
	{
		_maxIdleTime = timeMs;
	}

}