view src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 913:17f4fe8271de

simplify connectors
author Franklin Schmidt <fschmidt@gmail.com>
date Sat, 08 Oct 2016 23:57:11 -0600
parents 6b210bb66c63
children 54308d65265a
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io.nio;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;

import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.util.thread.Timeout.Task;

/* ------------------------------------------------------------ */
/**
 * An Endpoint that can be scheduled by {@link SelectorManager}.
 */
public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
{
	public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");

	private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
	private final SelectorManager.SelectSet _selectSet;
	private final SelectorManager _manager;
	private  SelectionKey _key;
	private final Runnable _handler = new Runnable()
		{
			public void run() { handle(); }
		};

	/** The desired value for {@link SelectionKey#interestOps()} */
	private int _interestOps;

	/**
	 * The connection instance is the handler for any IO activity on the endpoint.
	 * There is a different type of connection for HTTP, AJP, WebSocket and
	 * ProxyConnect.   The connection may change for an SCEP as it is upgraded
	 * from HTTP to proxy connect or websocket.
	 */
	private volatile AsyncConnection _connection;

	private static final int STATE_NEEDS_DISPATCH=-1;
	private static final int STATE_UNDISPATCHED=0;
	private static final int STATE_DISPATCHED=1;
	private static final int STATE_ASYNC=2;
	private int _state;
	
	private boolean _onIdle;

	/** true if the last write operation succeed and wrote all offered bytes */
	private volatile boolean _writable = true;


	/** True if a thread has is blocked in {@link #blockReadable(long)} */
	private boolean _readBlocked;

	/** True if a thread has is blocked in {@link #blockWritable(long)} */
	private boolean _writeBlocked;

	/** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
	private boolean _open;

	private volatile long _idleTimestamp;
	private volatile boolean _checkIdle;
	
	private boolean _interruptable;

	private boolean _ishut;

	/* ------------------------------------------------------------ */
	public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
		throws IOException
	{
		super(channel, maxIdleTime);

		_manager = selectSet.getManager();
		_selectSet = selectSet;
		_state=STATE_UNDISPATCHED;
		_onIdle=false;
		_open=true;
		_key = key;

		setCheckForIdle(true);
	}

	/* ------------------------------------------------------------ */
	public SelectionKey getSelectionKey()
	{
		synchronized (this)
		{
			return _key;
		}
	}

	/* ------------------------------------------------------------ */
	public SelectorManager getSelectManager()
	{
		return _manager;
	}

	/* ------------------------------------------------------------ */
	public Connection getConnection()
	{
		return _connection;
	}

	/* ------------------------------------------------------------ */
	public void setConnection(Connection connection)
	{
		Connection old=_connection;
		_connection=(AsyncConnection)connection;
		if (old!=null && old!=_connection)
			_manager.endPointUpgraded(this,old);
	}

	/* ------------------------------------------------------------ */
	public long getIdleTimestamp()
	{
		return _idleTimestamp;
	}

	/* ------------------------------------------------------------ */
	/** Called by selectSet to schedule handling
	 *
	 */
	public void schedule()
	{
		synchronized (this)
		{
			// If there is no key, then do nothing
			if (_key == null || !_key.isValid())
			{
				_readBlocked=false;
				_writeBlocked=false;
				this.notifyAll();
				return;
			}

			// If there are threads dispatched reading and writing
			if (_readBlocked || _writeBlocked)
			{
				// assert _dispatched;
				if (_readBlocked && _key.isReadable())
					_readBlocked=false;
				if (_writeBlocked && _key.isWritable())
					_writeBlocked=false;

				// wake them up is as good as a dispatched.
				this.notifyAll();

				// we are not interested in further selecting
				_key.interestOps(0);
				if (_state<STATE_DISPATCHED)
					updateKey();
				return;
			}

			// Remove writeable op
			if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
			{
				// Remove writeable op
				_interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
				_key.interestOps(_interestOps);
				_writable = true; // Once writable is in ops, only removed with dispatch.
			}

			// If dispatched, then deregister interest
			if (_state>=STATE_DISPATCHED)
				_key.interestOps(0);
			else
			{
				// other wise do the dispatch
				dispatch();
				if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
				{
					_key.interestOps(0);
				}
			}
		}
	}

	/* ------------------------------------------------------------ */
	public void asyncDispatch()
	{
		synchronized(this)
		{
			switch(_state)
			{
				case STATE_NEEDS_DISPATCH:
				case STATE_UNDISPATCHED:
					dispatch();
					break;
					
				case STATE_DISPATCHED:
				case STATE_ASYNC:
					_state=STATE_ASYNC;
					break;
			}
		}
	}

	/* ------------------------------------------------------------ */
	public void dispatch()
	{
		synchronized(this)
		{
			if (_state<=STATE_UNDISPATCHED)
			{
				if (_onIdle)
					_state = STATE_NEEDS_DISPATCH;
				else
				{
					_state = STATE_DISPATCHED;
					try {
						_manager.execute(_handler);
					} catch(RejectedExecutionException e) {
						_state = STATE_NEEDS_DISPATCH;
						LOG.warn("Dispatched Failed! "+this+" to "+_manager);
						updateKey();
					}
				}
			}
		}
	}

	/* ------------------------------------------------------------ */
	/**
	 * Called when a dispatched thread is no longer handling the endpoint.
	 * The selection key operations are updated.
	 * @return If false is returned, the endpoint has been redispatched and
	 * thread must keep handling the endpoint.
	 */
	protected boolean undispatch()
	{
		synchronized (this)
		{
			switch(_state)
			{
				case STATE_ASYNC:
					_state=STATE_DISPATCHED;
					return false;

				default:
					_state=STATE_UNDISPATCHED;
					updateKey();
					return true;
			}
		}
	}

	/* ------------------------------------------------------------ */
	public void cancelTimeout(Task task)
	{
		getSelectSet().cancelTimeout(task);
	}

	/* ------------------------------------------------------------ */
	public void scheduleTimeout(Task task, long timeoutMs)
	{
		getSelectSet().scheduleTimeout(task,timeoutMs);
	}

	/* ------------------------------------------------------------ */
	public void setCheckForIdle(boolean check)
	{
		if (check)
		{
			_idleTimestamp=System.currentTimeMillis();
			_checkIdle=true;
		}
		else
			_checkIdle=false;
	}

	/* ------------------------------------------------------------ */
	public boolean isCheckForIdle()
	{
		return _checkIdle;
	}

	/* ------------------------------------------------------------ */
	protected void notIdle()
	{
		_idleTimestamp=System.currentTimeMillis();
	}

	/* ------------------------------------------------------------ */
	public void checkIdleTimestamp(long now)
	{
		if (isCheckForIdle() && _maxIdleTime>0)
		{
			final long idleForMs=now-_idleTimestamp;

			if (idleForMs>_maxIdleTime)
			{
				// Don't idle out again until onIdleExpired task completes.
				setCheckForIdle(false);
				_manager.execute(new Runnable()
				{
					public void run()
					{
						try
						{
							onIdleExpired(idleForMs);
						}
						finally
						{
							setCheckForIdle(true);
						}
					}
				});
			}
		}
	}

	/* ------------------------------------------------------------ */
	public void onIdleExpired(long idleForMs)
	{
		try
		{
			synchronized (this)
			{
				_onIdle=true;
			}

			_connection.onIdleExpired(idleForMs);
		}
		finally
		{
			synchronized (this)
			{
				_onIdle=false;
				if (_state==STATE_NEEDS_DISPATCH)
					dispatch();
			}
		}
	}

	/* ------------------------------------------------------------ */
	@Override
	public int fill(Buffer buffer) throws IOException
	{
		int fill=super.fill(buffer);
		if (fill>0)
			notIdle();
		return fill;
	}

	/* ------------------------------------------------------------ */
	@Override
	public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
	{
		int l = super.flush(header, buffer, trailer);

		// If there was something to write and it wasn't written, then we are not writable.
		if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
		{
			synchronized (this)
			{   
				_writable=false;
				if (_state<STATE_DISPATCHED)
					updateKey();
			}
		}
		else if (l>0)
		{
			_writable=true;
			notIdle();
		}
		return l;
	}

	/* ------------------------------------------------------------ */
	/*
	 */
	@Override
	public int flush(Buffer buffer) throws IOException
	{
		int l = super.flush(buffer);

		// If there was something to write and it wasn't written, then we are not writable.
		if (l==0 && buffer!=null && buffer.hasContent())
		{
			synchronized (this)
			{   
				_writable=false;
				if (_state<STATE_DISPATCHED)
					updateKey();
			}
		}
		else if (l>0)
		{
			_writable=true;
			notIdle();
		}

		return l;
	}

	/* ------------------------------------------------------------ */
	/*
	 * Allows thread to block waiting for further events.
	 */
	@Override
	public boolean blockReadable(long timeoutMs) throws IOException
	{
		synchronized (this)
		{
			if (isInputShutdown())
				throw new EofException();

			long now=_selectSet.getNow();
			long end=now+timeoutMs;
			boolean check=isCheckForIdle();
			setCheckForIdle(true);
			try
			{
				_readBlocked=true;
				while (!isInputShutdown() && _readBlocked)
				{
					try
					{
						updateKey();
						this.wait(timeoutMs>0?(end-now):10000);
					}
					catch (final InterruptedException e)
					{
						LOG.warn("",e);
						if (_interruptable)
							throw new InterruptedIOException(){{this.initCause(e);}};
					}
					finally
					{
						now=_selectSet.getNow();
					}

					if (_readBlocked && timeoutMs>0 && now>=end)
						return false;
				}
			}
			finally
			{
				_readBlocked=false;
				setCheckForIdle(check);
			}
		}
		return true;
	}

	/* ------------------------------------------------------------ */
	/*
	 * Allows thread to block waiting for further events.
	 */
	@Override
	public boolean blockWritable(long timeoutMs) throws IOException
	{
		synchronized (this)
		{
			if (isOutputShutdown())
				throw new EofException();

			long now=_selectSet.getNow();
			long end=now+timeoutMs;
			boolean check=isCheckForIdle();
			setCheckForIdle(true);
			try
			{
				_writeBlocked=true;
				while (_writeBlocked && !isOutputShutdown())
				{
					try
					{
						updateKey();
						this.wait(timeoutMs>0?(end-now):10000);
					}
					catch (final InterruptedException e)
					{
						LOG.warn("",e);
						if (_interruptable)
							throw new InterruptedIOException(){{this.initCause(e);}};
					}
					finally
					{
						now=_selectSet.getNow();
					}
					if (_writeBlocked && timeoutMs>0 && now>=end)
						return false;
				}
			}
			finally
			{
				_writeBlocked=false;
				setCheckForIdle(check);
			}
		}
		return true;
	}

	/* ------------------------------------------------------------ */
	/** Set the interruptable mode of the endpoint.
	 * If set to false (default), then interrupts are assumed to be spurious 
	 * and blocking operations continue unless the endpoint has been closed.
	 * If true, then interrupts of blocking operations result in InterruptedIOExceptions
	 * being thrown.
	 * @param interupable
	 */
	public void setInterruptable(boolean interupable)
	{
		synchronized (this)
		{
			_interruptable=interupable;
		}
	}

	/* ------------------------------------------------------------ */
	public boolean isInterruptable()
	{
		return _interruptable;
	}
	
	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
	 */
	public void scheduleWrite()
	{
		if (_writable)
			LOG.debug("Required scheduleWrite {}",this);

		_writable=false;
		updateKey();
	}

	/* ------------------------------------------------------------ */
	public boolean isWritable()
	{
		return _writable;
	}

	/* ------------------------------------------------------------ */
	public boolean hasProgressed()
	{
		return false;
	}

	/* ------------------------------------------------------------ */
	/**
	 * Updates selection key. Adds operations types to the selection key as needed. No operations
	 * are removed as this is only done during dispatch. This method records the new key and
	 * schedules a call to doUpdateKey to do the keyChange
	 */
	private void updateKey()
	{
		final boolean changed;
		synchronized (this)
		{
			int current_ops=-1;
			if (getChannel().isOpen())
			{
				boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
				boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);

				_interestOps =
					((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
				|   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
				try
				{
					current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
				}
				catch(Exception e)
				{
					_key=null;
					LOG.trace("",e);
				}
			}
			changed=_interestOps!=current_ops;
		}

		if(changed)
		{
			_selectSet.addChange(this);
			_selectSet.wakeup();
		}
	}


	/* ------------------------------------------------------------ */
	/**
	 * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
	 */
	void doUpdateKey()
	{
		synchronized (this)
		{
			if (getChannel().isOpen())
			{
				if (_interestOps>0)
				{
					if (_key==null || !_key.isValid())
					{
						SelectableChannel sc = (SelectableChannel)getChannel();
						if (sc.isRegistered())
						{
							updateKey();
						}
						else
						{
							try
							{
								_key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
							}
							catch (Exception e)
							{
								LOG.trace("",e);
								if (_key!=null && _key.isValid())
								{
									_key.cancel();
								}

								if (_open)
								{
									_selectSet.destroyEndPoint(this);
								}
								_open=false;
								_key = null;
							}
						}
					}
					else
					{
						_key.interestOps(_interestOps);
					}
				}
				else
				{
					if (_key!=null && _key.isValid())
						_key.interestOps(0);
					else
						_key=null;
				}
			}
			else
			{
				if (_key!=null && _key.isValid())
					_key.cancel();

				if (_open)
				{
					_open=false;
					_selectSet.destroyEndPoint(this);
				}
				_key = null;
			}
		}
	}

	/* ------------------------------------------------------------ */
	/*
	 */
	protected void handle()
	{
		boolean dispatched=true;
		try
		{
			while(dispatched)
			{
				try
				{
					while(true)
					{
						final AsyncConnection next = (AsyncConnection)_connection.handle();
						if (next!=_connection)
						{
							LOG.debug("{} replaced {}",next,_connection);
							Connection old=_connection;
							_connection=next;
							_manager.endPointUpgraded(this,old);
							continue;
						}
						break;
					}
				}
				catch (ClosedChannelException e)
				{
					LOG.trace("",e);
				}
				catch (EofException e)
				{
					LOG.debug("EOF", e);
					try{close();}
					catch(IOException e2){LOG.trace("",e2);}
				}
				catch (IOException e)
				{
					LOG.warn(e.toString());
					try{close();}
					catch(IOException e2){LOG.trace("",e2);}
				}
				catch (Throwable e)
				{
					LOG.warn("handle failed", e);
					try{close();}
					catch(IOException e2){LOG.trace("",e2);}
				}
				finally
				{
					if (!_ishut && isInputShutdown() && isOpen())
					{
						_ishut=true;
						try
						{
							_connection.onInputShutdown();
						}
						catch(Throwable x)
						{
							LOG.warn("onInputShutdown failed", x);
							try{close();}
							catch(IOException e2){LOG.trace("",e2);}
						}
						finally
						{
							updateKey();
						}
					}
					dispatched=!undispatch();
				}
			}
		}
		finally
		{
			if (dispatched)
			{
				dispatched=!undispatch();
				while (dispatched)
				{
					LOG.warn("SCEP.run() finally DISPATCHED");
					dispatched=!undispatch();
				}
			}
		}
	}

	/* ------------------------------------------------------------ */
	/*
	 * @see org.eclipse.io.nio.ChannelEndPoint#close()
	 */
	@Override
	public void close() throws IOException
	{
		// On unix systems there is a JVM issue that if you cancel before closing, it can 
		// cause the selector to block waiting for a channel to close and that channel can 
		// block waiting for the remote end.  But on windows, if you don't cancel before a 
		// close, then the selector can block anyway!
		// https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
		if (WORK_AROUND_JVM_BUG_6346658)
		{
			try
			{
				SelectionKey key = _key;
				if (key!=null)
					key.cancel();
			}
			catch (Throwable e)
			{
				LOG.trace("",e);
			}
		}

		try
		{
			super.close();
		}
		catch (IOException e)
		{
			LOG.trace("",e);
		}
		finally
		{
			updateKey();
		}
	}

	/* ------------------------------------------------------------ */
	@Override
	public String toString()
	{
		// Do NOT use synchronized (this)
		// because it's very easy to deadlock when debugging is enabled.
		// We do a best effort to print the right toString() and that's it.
		SelectionKey key = _key;
		String keyString = "";
		if (key != null)
		{
			if (key.isValid())
			{
				if (key.isReadable())
					keyString += "r";
				if (key.isWritable())
					keyString += "w";
			}
			else
			{
				keyString += "!";
			}
		}
		else
		{
			keyString += "-";
		}
		return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
				hashCode(),
				_socket.getRemoteSocketAddress(),
				_socket.getLocalSocketAddress(),
				_state,
				isOpen(),
				isInputShutdown(),
				isOutputShutdown(),
				_readBlocked,
				_writeBlocked,
				_writable,
				_interestOps,
				keyString,
				_connection);
	}

	/* ------------------------------------------------------------ */
	public SelectSet getSelectSet()
	{
		return _selectSet;
	}

	/* ------------------------------------------------------------ */
	/**
	 * Don't set the SoTimeout
	 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
	 */
	@Override
	public void setMaxIdleTime(int timeMs) throws IOException
	{
		_maxIdleTime=timeMs;
	}

}