view src/org/eclipse/jetty/io/nio/SslConnection.java @ 1053:7e4b41247544

fix JBuffer.array()
author Franklin Schmidt <fschmidt@gmail.com>
date Tue, 08 Nov 2016 00:32:02 -0700
parents 4afdf0f0c5bc
children 4a50422596b6
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;

import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.JBuffer;
import org.eclipse.jetty.io.BufferUtil;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.AsyncHttpConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/* ------------------------------------------------------------ */
/** SSL Connection.
 * An AysyncConnection that acts as an interceptor between and EndPoint and another
 * Connection, that implements TLS encryption using an {@link SSLEngine}.
 * <p>
 * The connector uses an {@link AsyncEndPoint} (like {@link SelectChannelEndPoint}) as
 * it's source/sink of encrypted data.   It then provides {@link #getSslEndPoint()} to
 * expose a source/sink of unencrypted data to another connection (eg HttpConnection).
 */
public final class SslConnection extends AbstractConnection implements AsyncConnection
{
	private final Logger _logger = LoggerFactory.getLogger("org.eclipse.jetty.io.nio.ssl");

	private static final JBuffer __ZERO_BUFFER = BufferUtil.EMPTY_BUFFER;

	private static final ThreadLocal<SslBuffers> __buffers = new ThreadLocal<SslBuffers>();
	private final SSLEngine _engine;
	private final SSLSession _session;
	private AsyncHttpConnection _connection;
	private final SslEndPoint _sslEndPoint;
	private int _allocations;
	private SslBuffers _buffers;
	private JBuffer _inbound;
	private JBuffer _unwrapBuf;
	private JBuffer _outbound;
	private final AsyncEndPoint _aEndp;
	private boolean _allowRenegotiate = true;
	private boolean _handshook;
	private boolean _ishut;
	private boolean _oshut;
	private final AtomicBoolean _progressed = new AtomicBoolean();

	/* ------------------------------------------------------------ */
	/* this is a half baked buffer pool
	 */
	private static class SslBuffers
	{
		final JBuffer _in;
		final JBuffer _out;
		final JBuffer _unwrap;

		SslBuffers(int packetSize, int appSize)
		{
			_in = BufferUtil.newBuffer(packetSize);
			_out = BufferUtil.newBuffer(packetSize);
			_unwrap = BufferUtil.newBuffer(appSize);
		}
	}

	public SslConnection(SSLEngine engine,AsyncEndPoint endp)
	{
		super(endp);
		_engine = engine;
		_session = _engine.getSession();
		_aEndp = endp;
		_sslEndPoint = new SslEndPoint();
	}

	/* ------------------------------------------------------------ */
	/**
	 * Set if SSL re-negotiation is allowed. CVE-2009-3555 discovered
	 * a vulnerability in SSL/TLS with re-negotiation.  If your JVM
	 * does not have CVE-2009-3555 fixed, then re-negotiation should
	 * not be allowed.  CVE-2009-3555 was fixed in Sun java 1.6 with a ban
	 * of renegotiates in u19 and with RFC5746 in u22.
	 *
	 * @param allowRenegotiate
	 *            true if re-negotiation is allowed (default false)
	 */
	public void setAllowRenegotiate(boolean allowRenegotiate)
	{
		_allowRenegotiate = allowRenegotiate;
	}

	private void allocateBuffers()
	{
		synchronized (this)
		{
			if (_allocations++==0)
			{
				if (_buffers==null)
				{
					_buffers=__buffers.get();
					if (_buffers==null)
						_buffers=new SslBuffers(_session.getPacketBufferSize()*2,_session.getApplicationBufferSize()*2);
					_inbound=_buffers._in;
					_outbound=_buffers._out;
					_unwrapBuf=_buffers._unwrap;
					__buffers.set(null);
				}
			}
		}
	}

	private void releaseBuffers()
	{
		synchronized (this)
		{
			if (--_allocations==0)
			{
				if (_buffers!=null &&
					_inbound.remaining()==0 &&
					_outbound.remaining()==0 &&
					_unwrapBuf.remaining()==0)
				{
					_inbound=null;
					_outbound=null;
					_unwrapBuf=null;
					__buffers.set(_buffers);
					_buffers=null;
				}
			}
		}
	}

	@Override
	public void handle() throws IOException
	{
		try
		{
			allocateBuffers();

			boolean progress=true;

			while (progress)
			{
				progress=false;

				// If we are handshook let the delegate connection
				if (_engine.getHandshakeStatus()!=HandshakeStatus.NOT_HANDSHAKING)
					progress=process(null,null);

				// handle the delegate connection
				_connection.handle();

				_logger.debug("{} handle {} progress={}", _session, this, progress);
			}
		}
		finally
		{
			releaseBuffers();

			if (!_ishut && _sslEndPoint.isInputShutdown() && _sslEndPoint.isOpen())
			{
				_ishut=true;
				try
				{
					_connection.onInputShutdown();
				}
				catch(Throwable x)
				{
					_logger.warn("onInputShutdown failed", x);
					try{_sslEndPoint.close();}
					catch(IOException e2){
						_logger.trace("",e2);}
				}
			}
		}
	}

	@Override
	public boolean isSuspended()
	{
		return false;
	}

	@Override
	public void onInputShutdown() throws IOException
	{
	}

	private synchronized boolean process(JBuffer toFill, JBuffer toFlush) throws IOException
	{
		boolean some_progress=false;
		try
		{
			// We need buffers to progress
			allocateBuffers();

			// if we don't have a buffer to put received data into
			if (toFill==null)
			{
				// use the unwrapbuffer to hold received data.
				_unwrapBuf.compact();
				toFill=_unwrapBuf;
			}
			// Else if the fill buffer is too small for the SSL session
			else if (toFill.capacity()<_session.getApplicationBufferSize())
			{
				// fill to the temporary unwrapBuffer
				boolean progress=process(null,toFlush);

				// if we received any data,
				if (_unwrapBuf!=null && _unwrapBuf.hasRemaining())
				{
					// transfer from temp buffer to fill buffer
					_unwrapBuf.skip(toFill.put(_unwrapBuf));
					return true;
				}
				else
					// return progress from recursive call
					return progress;
			}
			// Else if there is some temporary data
			else if (_unwrapBuf!=null && _unwrapBuf.hasRemaining())
			{
				// transfer from temp buffer to fill buffer
				_unwrapBuf.skip(toFill.put(_unwrapBuf));
				return true;
			}

			// If we are here, we have a buffer ready into which we can put some read data.

			// If we have no data to flush, flush the empty buffer
			if (toFlush==null)
				toFlush = __ZERO_BUFFER;

			// While we are making progress processing SSL engine
			boolean progress=true;
			while (progress)
			{
				progress=false;

				// Do any real IO
				int filled=0,flushed=0;
				try
				{
					// Read any available data
					if (_inbound.space()>0 && (filled=_endp.fill(_inbound))>0)
						progress = true;

					// flush any output data
					if (_outbound.hasRemaining() && (flushed=_endp.flush(_outbound))>0)
						progress = true;
				}
				catch (IOException e)
				{
					_endp.close();
					throw e;
				}
				finally
				{
					_logger.debug("{} {} {} filled={}/{} flushed={}/{}",_session,this,_engine.getHandshakeStatus(),filled,_inbound.remaining(),flushed,_outbound.remaining());
				}

				// handle the current hand share status
				switch(_engine.getHandshakeStatus())
				{
					case FINISHED:
						throw new IllegalStateException();

					case NOT_HANDSHAKING:
					{
						// Try unwrapping some application data
						if (toFill.space()>0 && _inbound.hasRemaining() && unwrap(toFill))
							progress=true;

						// Try wrapping some application data
						if (toFlush.hasRemaining() && _outbound.space()>0 && wrap(toFlush))
							progress=true;
					}
					break;

					case NEED_TASK:
					{
						// A task needs to be run, so run it!
						Runnable task;
						while ((task=_engine.getDelegatedTask())!=null)
						{
							progress=true;
							task.run();
						}

					}
					break;

					case NEED_WRAP:
					{
						// The SSL needs to send some handshake data to the other side
						if (_handshook && !_allowRenegotiate)
							_endp.close();
						else if (wrap(toFlush))
							progress=true;
					}
					break;

					case NEED_UNWRAP:
					{
						// The SSL needs to receive some handshake data from the other side
						if (_handshook && !_allowRenegotiate)
							_endp.close();
						else if (!_inbound.hasRemaining()&&filled==-1)
						{
							// No more input coming
							_endp.shutdownInput();
						}
						else if (unwrap(toFill))
							progress=true;
					}
					break;
				}

				// pass on ishut/oshut state
				if (_endp.isOpen() && _endp.isInputShutdown() && !_inbound.hasRemaining())
					closeInbound();

				if (_endp.isOpen() && _engine.isOutboundDone() && !_outbound.hasRemaining())
					_endp.shutdownOutput();

				// remember if any progress has been made
				some_progress|=progress;
			}

			// If we are reading into the temp buffer and it has some content, then we should be dispatched.
			if (toFill==_unwrapBuf && _unwrapBuf.hasRemaining() && !_connection.isSuspended())
				_aEndp.dispatch();
		}
		finally
		{
			releaseBuffers();
			if (some_progress)
				_progressed.set(true);
		}
		return some_progress;
	}

	private void closeInbound()
	{
		try
		{
			_engine.closeInbound();
		}
		catch (SSLException x)
		{
			_logger.debug("",x);
		}
	}

	private synchronized boolean wrap(final JBuffer buffer) throws IOException
	{
		ByteBuffer bbuf=extractByteBuffer(buffer);
		final SSLEngineResult result;

		synchronized(bbuf)
		{
			_outbound.compact();
			ByteBuffer out_buffer=_outbound.getByteBuffer();
			synchronized(out_buffer)
			{
				try
				{
					bbuf.position(buffer.getIndex());
					bbuf.limit(buffer.putIndex());
					out_buffer.position(_outbound.putIndex());
					out_buffer.limit(out_buffer.capacity());
					result=_engine.wrap(bbuf,out_buffer);
					if (_logger.isDebugEnabled())
						_logger.debug("{} wrap {} {} consumed={} produced={}",
							_session,
							result.getStatus(),
							result.getHandshakeStatus(),
							result.bytesConsumed(),
							result.bytesProduced());


					buffer.skip(result.bytesConsumed());
					_outbound.setPutIndex(_outbound.putIndex()+result.bytesProduced());
				}
				catch(SSLException e)
				{
					_logger.debug(String.valueOf(_endp), e);
					_endp.close();
					throw e;
				}
				finally
				{
					out_buffer.position(0);
					out_buffer.limit(out_buffer.capacity());
					bbuf.position(0);
					bbuf.limit(bbuf.capacity());
				}
			}
		}

		switch(result.getStatus())
		{
			case BUFFER_UNDERFLOW:
				throw new IllegalStateException();

			case BUFFER_OVERFLOW:
				break;

			case OK:
				if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
					_handshook=true;
				break;

			case CLOSED:
				_logger.debug("wrap CLOSE {} {}",this,result);
				if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
					_endp.close();
				break;

			default:
				_logger.debug("{} wrap default {}",_session,result);
			throw new IOException(result.toString());
		}

		return result.bytesConsumed()>0 || result.bytesProduced()>0;
	}

	private synchronized boolean unwrap(final JBuffer buffer) throws IOException
	{
		if (!_inbound.hasRemaining())
			return false;

		ByteBuffer bbuf=extractByteBuffer(buffer);
		final SSLEngineResult result;

		synchronized(bbuf)
		{
			ByteBuffer in_buffer=_inbound.getByteBuffer();
			synchronized(in_buffer)
			{
				try
				{
					bbuf.position(buffer.putIndex());
					bbuf.limit(buffer.capacity());
					in_buffer.position(_inbound.getIndex());
					in_buffer.limit(_inbound.putIndex());

					result=_engine.unwrap(in_buffer,bbuf);
					if (_logger.isDebugEnabled())
						_logger.debug("{} unwrap {} {} consumed={} produced={}",
							_session,
							result.getStatus(),
							result.getHandshakeStatus(),
							result.bytesConsumed(),
							result.bytesProduced());

					_inbound.skip(result.bytesConsumed());
					_inbound.compact();
					buffer.setPutIndex(buffer.putIndex()+result.bytesProduced());
				}
				catch(SSLException e)
				{
					_logger.debug(String.valueOf(_endp), e);
					_endp.close();
					throw e;
				}
				finally
				{
					in_buffer.position(0);
					in_buffer.limit(in_buffer.capacity());
					bbuf.position(0);
					bbuf.limit(bbuf.capacity());
				}
			}
		}

		switch(result.getStatus())
		{
			case BUFFER_UNDERFLOW:
				if (_endp.isInputShutdown())
					_inbound.clear();
				break;

			case BUFFER_OVERFLOW:
				break;

			case OK:
				if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
					_handshook=true;
				break;

			case CLOSED:
				_logger.debug("unwrap CLOSE {} {}",this,result);
				if (result.getHandshakeStatus()==HandshakeStatus.FINISHED)
					_endp.close();
				break;

			default:
				_logger.debug("{} wrap default {}",_session,result);
			throw new IOException(result.toString());
		}

		//if (LOG.isDebugEnabled() && result.bytesProduced()>0)
		//    LOG.debug("{} unwrapped '{}'",_session,buffer);

		return result.bytesConsumed()>0 || result.bytesProduced()>0;
	}


	private ByteBuffer extractByteBuffer(JBuffer buffer)
	{
		return buffer.getByteBuffer();
	}

	public SslEndPoint getSslEndPoint()
	{
		return _sslEndPoint;
	}

	public String toString()
	{
		return String.format("%s %s", super.toString(), _sslEndPoint);
	}


	public final class SslEndPoint implements AsyncEndPoint
	{
		public SSLEngine getSslEngine()
		{
			return _engine;
		}

		public AsyncEndPoint getEndpoint()
		{
			return _aEndp;
		}

		public void shutdownOutput() throws IOException
		{
			synchronized (SslConnection.this)
			{
				_logger.debug("{} ssl endp.oshut {}",_session,this);
				_engine.closeOutbound();
				_oshut=true;
			}
			flush();
		}

		public boolean isOutputShutdown()
		{
			synchronized (SslConnection.this)
			{
				return _oshut||!isOpen()||_engine.isOutboundDone();
			}
		}

		public void shutdownInput() throws IOException
		{
			_logger.debug("{} ssl endp.ishut!",_session);
			// We do not do a closeInput here, as SSL does not support half close.
			// isInputShutdown works it out itself from buffer state and underlying endpoint state.
		}

		public boolean isInputShutdown()
		{
			synchronized (SslConnection.this)
			{
				return _endp.isInputShutdown() &&
				!(_unwrapBuf!=null&&_unwrapBuf.hasRemaining()) &&
				!(_inbound!=null&&_inbound.hasRemaining());
			}
		}

		public void close() throws IOException
		{
			_logger.debug("{} ssl endp.close",_session);
			_endp.close();
		}

		public int fill(JBuffer buffer) throws IOException
		{
			int size=buffer.remaining();
			process(buffer, null);

			int filled=buffer.remaining()-size;

			if (filled==0 && isInputShutdown())
				return -1;
			return filled;
		}

		public int flush(JBuffer buffer) throws IOException
		{
			int size = buffer.remaining();
			process(null, buffer);
			return size-buffer.remaining();
		}

		public int flush(JBuffer header, JBuffer buffer, JBuffer trailer) throws IOException
		{
			if (header!=null && header.hasRemaining())
				return flush(header);
			if (buffer!=null && buffer.hasRemaining())
				return flush(buffer);
			if (trailer!=null && trailer.hasRemaining())
				return flush(trailer);
			return 0;
		}

		public boolean blockReadable(long millisecs) throws IOException
		{
			long now = System.currentTimeMillis();
			long end=millisecs>0?(now+millisecs):Long.MAX_VALUE;

			while (now<end)
			{
				if (process(null,null))
					break;
				_endp.blockReadable(end-now);
				now = System.currentTimeMillis();
			}

			return now<end;
		}

		public boolean blockWritable(long millisecs) throws IOException
		{
			return _endp.blockWritable(millisecs);
		}

		public boolean isOpen()
		{
			return _endp.isOpen();
		}

		public SocketChannel getChannel()
		{
			return _endp.getChannel();
		}

		public void flush() throws IOException
		{
			process(null, null);
		}

		public void dispatch()
		{
			_aEndp.dispatch();
		}

		public boolean hasProgressed()
		{
			return _progressed.getAndSet(false);
		}

		public String getLocalAddr()
		{
			return _aEndp.getLocalAddr();
		}

		public String getLocalHost()
		{
			return _aEndp.getLocalHost();
		}

		public int getLocalPort()
		{
			return _aEndp.getLocalPort();
		}

		public String getRemoteAddr()
		{
			return _aEndp.getRemoteAddr();
		}

		public String getRemoteHost()
		{
			return _aEndp.getRemoteHost();
		}

		public int getRemotePort()
		{
			return _aEndp.getRemotePort();
		}

		public boolean isBlocking()
		{
			return false;
		}

		public int getMaxIdleTime()
		{
			return _aEndp.getMaxIdleTime();
		}

		public void setMaxIdleTime(int timeMs) throws IOException
		{
			_aEndp.setMaxIdleTime(timeMs);
		}

		public void setConnection(AsyncHttpConnection connection)
		{
			_connection = connection;
		}

		public String toString()
		{
			// Do NOT use synchronized (SslConnection.this)
			// because it's very easy to deadlock when debugging is enabled.
			// We do a best effort to print the right toString() and that's it.
			JBuffer inbound = _inbound;
			JBuffer outbound = _outbound;
			JBuffer unwrap = _unwrapBuf;
			int i = inbound == null? -1 : inbound.remaining();
			int o = outbound == null ? -1 : outbound.remaining();
			int u = unwrap == null ? -1 : unwrap.remaining();
			return String.format("SSL %s i/o/u=%d/%d/%d ishut=%b oshut=%b {%s}",
					_engine.getHandshakeStatus(),
					i, o, u,
					_ishut, _oshut,
					_connection);
		}

	}
}