view src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 949:e9088af3787f

remove SelectSet._changes
author Franklin Schmidt <fschmidt@gmail.com>
date Tue, 11 Oct 2016 23:18:13 -0600
parents f5aefdc4a81a
children a778413aefc0
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io.nio;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;

import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/* ------------------------------------------------------------ */
/**
 * An Endpoint that can be scheduled by {@link SelectorManager}.
 */
public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
{
	public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");

	private final SelectorManager.SelectSet _selectSet;
	private final SelectorManager _manager;
	private  SelectionKey _key;
	private final Runnable _handler = new Runnable()
		{
			public void run() { handle(); }
		};

	/** The desired value for {@link SelectionKey#interestOps()} */
	private int _interestOps;

	/**
	 * The connection instance is the handler for any IO activity on the endpoint.
	 * There is a different type of connection for HTTP, AJP, WebSocket and
	 * ProxyConnect.   The connection may change for an SCEP as it is upgraded
	 * from HTTP to proxy connect or websocket.
	 */
	private volatile AsyncConnection _connection;

	private static final int STATE_NEEDS_DISPATCH=-1;
	private static final int STATE_UNDISPATCHED=0;
	private static final int STATE_DISPATCHED=1;
	private int _state;
	
	private boolean _onIdle;

	/** true if the last write operation succeed and wrote all offered bytes */
	private volatile boolean _writable = true;


	/** True if a thread has is blocked in {@link #blockReadable(long)} */
	private boolean _readBlocked;

	/** True if a thread has is blocked in {@link #blockWritable(long)} */
	private boolean _writeBlocked;

	/** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
	private boolean _open;

	private volatile long _idleTimestamp;
	private volatile boolean _checkIdle;
	
	private boolean _ishut;

	/* ------------------------------------------------------------ */
	public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
		throws IOException
	{
		super(channel, maxIdleTime);

		_manager = selectSet.getManager();
		_selectSet = selectSet;
		_state=STATE_UNDISPATCHED;
		_onIdle=false;
		_open=true;
		_key = key;

		setCheckForIdle(true);
	}

	/* ------------------------------------------------------------ */
	public SelectionKey getSelectionKey()
	{
		synchronized (this)
		{
			return _key;
		}
	}

	public SelectorManager getSelectManager()
	{
		return _manager;
	}

	public Connection getConnection()
	{
		return _connection;
	}

	public void setConnection(Connection connection)
	{
		_connection=(AsyncConnection)connection;
	}

	/* ------------------------------------------------------------ */
	/** Called by selectSet to schedule handling
	 *
	 */
	public void schedule()
	{
		synchronized (this)
		{
			// If there is no key, then do nothing
			if (_key == null || !_key.isValid())
			{
				_readBlocked=false;
				_writeBlocked=false;
				this.notifyAll();
				return;
			}

			// If there are threads dispatched reading and writing
			if (_readBlocked || _writeBlocked)
			{
				// assert _dispatched;
				if (_readBlocked && _key.isReadable())
					_readBlocked=false;
				if (_writeBlocked && _key.isWritable())
					_writeBlocked=false;

				// wake them up is as good as a dispatched.
				this.notifyAll();

				// we are not interested in further selecting
				_key.interestOps(0);
				if (_state<STATE_DISPATCHED)
					updateKey();
				return;
			}

			// Remove writeable op
			if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
			{
				// Remove writeable op
				_interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
				_key.interestOps(_interestOps);
				_writable = true; // Once writable is in ops, only removed with dispatch.
			}

			// If dispatched, then deregister interest
			if (_state>=STATE_DISPATCHED)
				_key.interestOps(0);
			else
			{
				// other wise do the dispatch
				dispatch();
			}
		}
	}

	public synchronized void dispatch()
	{
		if (_state<=STATE_UNDISPATCHED)
		{
			if (_onIdle)
				_state = STATE_NEEDS_DISPATCH;
			else
			{
				_state = STATE_DISPATCHED;
				try {
					_manager.execute(_handler);
				} catch(RejectedExecutionException e) {
					_state = STATE_NEEDS_DISPATCH;
					LOG.warn("Dispatched Failed! "+this+" to "+_manager);
					updateKey();
				}
			}
		}
	}

	/* ------------------------------------------------------------ */
	/**
	 * Called when a dispatched thread is no longer handling the endpoint.
	 * The selection key operations are updated.
	 * @return If false is returned, the endpoint has been redispatched and
	 * thread must keep handling the endpoint.
	 */
	protected synchronized void undispatch()
	{
		_state = STATE_UNDISPATCHED;
		updateKey();
//		return true;
	}

	public void setCheckForIdle(boolean check)
	{
		if (check)
		{
			_idleTimestamp=System.currentTimeMillis();
			_checkIdle=true;
		}
		else
			_checkIdle=false;
	}

	public boolean isCheckForIdle()
	{
		return _checkIdle;
	}

	protected void notIdle()
	{
		_idleTimestamp=System.currentTimeMillis();
	}

	public void checkIdleTimestamp(long now)
	{
		if (isCheckForIdle() && _maxIdleTime>0)
		{
			final long idleForMs=now-_idleTimestamp;

			if (idleForMs>_maxIdleTime)
			{
				// Don't idle out again until onIdleExpired task completes.
				setCheckForIdle(false);
				_manager.execute(new Runnable()
				{
					public void run()
					{
						try
						{
							onIdleExpired(idleForMs);
						}
						finally
						{
							setCheckForIdle(true);
						}
					}
				});
			}
		}
	}

	public void onIdleExpired(long idleForMs)
	{
		try
		{
			synchronized (this)
			{
				_onIdle=true;
			}

			_connection.onIdleExpired(idleForMs);
		}
		finally
		{
			synchronized (this)
			{
				_onIdle=false;
				if (_state==STATE_NEEDS_DISPATCH)
					dispatch();
			}
		}
	}

	@Override
	public int fill(Buffer buffer) throws IOException
	{
		int fill=super.fill(buffer);
		if (fill>0)
			notIdle();
		return fill;
	}

	@Override
	public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
	{
		int l = super.flush(header, buffer, trailer);

		// If there was something to write and it wasn't written, then we are not writable.
		if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
		{
			synchronized (this)
			{   
				_writable=false;
				if (_state<STATE_DISPATCHED)
					updateKey();
			}
		}
		else if (l>0)
		{
			_writable=true;
			notIdle();
		}
		return l;
	}

	@Override
	public int flush(Buffer buffer) throws IOException
	{
		int l = super.flush(buffer);

		// If there was something to write and it wasn't written, then we are not writable.
		if (l==0 && buffer!=null && buffer.hasContent())
		{
			synchronized (this)
			{   
				_writable=false;
				if (_state<STATE_DISPATCHED)
					updateKey();
			}
		}
		else if (l>0)
		{
			_writable=true;
			notIdle();
		}

		return l;
	}

	/* ------------------------------------------------------------ */
	/*
	 * Allows thread to block waiting for further events.
	 */
	@Override
	public boolean blockReadable(long timeoutMs) throws IOException
	{
		synchronized (this)
		{
			if (isInputShutdown())
				throw new EofException();

			long now=_selectSet.getNow();
			long end=now+timeoutMs;
			boolean check=isCheckForIdle();
			setCheckForIdle(true);
			try
			{
				_readBlocked=true;
				while (!isInputShutdown() && _readBlocked)
				{
					try
					{
						updateKey();
						this.wait(timeoutMs>0?(end-now):10000);
					}
					catch (final InterruptedException e)
					{
						LOG.warn("",e);
					}
					finally
					{
						now=_selectSet.getNow();
					}

					if (_readBlocked && timeoutMs>0 && now>=end)
						return false;
				}
			}
			finally
			{
				_readBlocked=false;
				setCheckForIdle(check);
			}
		}
		return true;
	}

	/* ------------------------------------------------------------ */
	/*
	 * Allows thread to block waiting for further events.
	 */
	@Override
	public boolean blockWritable(long timeoutMs) throws IOException
	{
		synchronized (this)
		{
			if (isOutputShutdown())
				throw new EofException();

			long now=_selectSet.getNow();
			long end=now+timeoutMs;
			boolean check=isCheckForIdle();
			setCheckForIdle(true);
			try
			{
				_writeBlocked=true;
				while (_writeBlocked && !isOutputShutdown())
				{
					try
					{
						updateKey();
						this.wait(timeoutMs>0?(end-now):10000);
					}
					catch (final InterruptedException e)
					{
						LOG.warn("",e);
					}
					finally
					{
						now=_selectSet.getNow();
					}
					if (_writeBlocked && timeoutMs>0 && now>=end)
						return false;
				}
			}
			finally
			{
				_writeBlocked=false;
				setCheckForIdle(check);
			}
		}
		return true;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
	 */
	public void scheduleWrite()
	{
		if (_writable)
			LOG.debug("Required scheduleWrite {}",this);

		_writable=false;
		updateKey();
	}

	/* ------------------------------------------------------------ */
	public boolean isWritable()
	{
		return _writable;
	}

	/* ------------------------------------------------------------ */
	public boolean hasProgressed()
	{
		return false;
	}

	/* ------------------------------------------------------------ */
	/**
	 * Updates selection key. Adds operations types to the selection key as needed. No operations
	 * are removed as this is only done during dispatch. This method records the new key and
	 * schedules a call to doUpdateKey to do the keyChange
	 */
	private void updateKey()
	{
		final boolean changed;
		synchronized (this)
		{
			int current_ops=-1;
			if (getChannel().isOpen())
			{
				boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
				boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);

				_interestOps =
					((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
				|   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
				try
				{
					current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
				}
				catch(Exception e)
				{
					_key=null;
					LOG.trace("",e);
				}
			}
			changed=_interestOps!=current_ops;
		}

		if(changed)
		{
			doUpdateKey();
			_selectSet.wakeup();
		}
	}


	/* ------------------------------------------------------------ */
	/**
	 * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
	 */
	synchronized void doUpdateKey()
	{
		if (getChannel().isOpen())
		{
			if (_interestOps>0)
			{
				if (_key==null || !_key.isValid())
				{
					SelectableChannel sc = (SelectableChannel)getChannel();
					if (sc.isRegistered())
					{
						updateKey();
					}
					else
					{
						try
						{
							_key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
						}
						catch (Exception e)
						{
							LOG.trace("",e);
							if (_key!=null && _key.isValid())
							{
								_key.cancel();
							}

							if (_open)
							{
								_selectSet.destroyEndPoint(this);
							}
							_open=false;
							_key = null;
						}
					}
				}
				else
				{
					_key.interestOps(_interestOps);
				}
			}
			else
			{
				if (_key!=null && _key.isValid())
					_key.interestOps(0);
				else
					_key=null;
			}
		}
		else
		{
			if (_key!=null && _key.isValid())
				_key.cancel();

			if (_open)
			{
				_open=false;
				_selectSet.destroyEndPoint(this);
			}
			_key = null;
		}
	}

	/* ------------------------------------------------------------ */
	/*
	 */
	protected void handle()
	{
		boolean dispatched = true;
		try
		{
			try
			{
				while(true)
				{
					final AsyncConnection next = (AsyncConnection)_connection.handle();
					if (next!=_connection)
					{
						LOG.debug("{} replaced {}",next,_connection);
						_connection=next;
						continue;
					}
					break;
				}
			}
			catch (ClosedChannelException e)
			{
				LOG.trace("",e);
			}
			catch (EofException e)
			{
				LOG.debug("EOF", e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			catch (IOException e)
			{
				LOG.warn(e.toString());
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			catch (Throwable e)
			{
				LOG.warn("handle failed", e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			finally
			{
				if (!_ishut && isInputShutdown() && isOpen())
				{
					_ishut = true;
					try
					{
						_connection.onInputShutdown();
					}
					catch(Throwable x)
					{
						LOG.warn("onInputShutdown failed", x);
						try{close();}
						catch(IOException e2){LOG.trace("",e2);}
					}
					finally
					{
						updateKey();
					}
				}
				undispatch();
				dispatched = false;
			}
		}
		finally
		{
			if (dispatched)
			{
				undispatch();
			}
		}
	}

	/* ------------------------------------------------------------ */
	/*
	 * @see org.eclipse.io.nio.ChannelEndPoint#close()
	 */
	@Override
	public void close() throws IOException
	{
		try
		{
			super.close();
		}
		catch (IOException e)
		{
			LOG.trace("",e);
		}
		finally
		{
			updateKey();
		}
	}

	/* ------------------------------------------------------------ */
	@Override
	public String toString()
	{
		// Do NOT use synchronized (this)
		// because it's very easy to deadlock when debugging is enabled.
		// We do a best effort to print the right toString() and that's it.
		SelectionKey key = _key;
		String keyString = "";
		if (key != null)
		{
			if (key.isValid())
			{
				if (key.isReadable())
					keyString += "r";
				if (key.isWritable())
					keyString += "w";
			}
			else
			{
				keyString += "!";
			}
		}
		else
		{
			keyString += "-";
		}
		return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
				hashCode(),
				_socket.getRemoteSocketAddress(),
				_socket.getLocalSocketAddress(),
				_state,
				isOpen(),
				isInputShutdown(),
				isOutputShutdown(),
				_readBlocked,
				_writeBlocked,
				_writable,
				_interestOps,
				keyString,
				_connection);
	}

	/* ------------------------------------------------------------ */
	/**
	 * Don't set the SoTimeout
	 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
	 */
	@Override
	public void setMaxIdleTime(int timeMs) throws IOException
	{
		_maxIdleTime=timeMs;
	}

}