view src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java @ 1046:a8c92b0a08ed

add JBuffer
author Franklin Schmidt <fschmidt@gmail.com>
date Mon, 07 Nov 2016 22:39:39 -0700
parents 0e96ce3db20a
children 2b769da7f67d
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.server.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;

import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.nio.ChannelEndPoint;
import org.eclipse.jetty.server.BlockingHttpConnection;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.AbstractHttpConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/* ------------------------------------------------------------------------------- */
/**  Blocking NIO connector.
 * This connector uses efficient NIO buffers with a traditional blocking thread model.
 * Direct NIO buffers are used and a thread is allocated per connections.
 *
 * This connector is best used when there are a few very active connections.
 *
 * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
 *
 *
 *
 */
public final class BlockingChannelConnector extends Connector
{
	private static final Logger LOG = LoggerFactory.getLogger(BlockingChannelConnector.class);

	private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashMap<BlockingChannelEndPoint,Boolean>().newKeySet();


	/* ------------------------------------------------------------ */
	/** Constructor.
	 *
	 */
	public BlockingChannelConnector(Server server,int port)
	{
		super(server,port);
	}


	@Override
	protected void doStart() throws Exception
	{
		// Create a new server socket and set to non blocking mode
		_acceptChannel = ServerSocketChannel.open();
		_acceptChannel.configureBlocking(true);

		// Bind the server socket to the local host and port
		InetSocketAddress addr = getHost()==null?new InetSocketAddress(port):new InetSocketAddress(getHost(),port);
		_acceptChannel.bind(addr);

		super.doStart();
		server.threadPool.execute(new Runnable()
		{

			public void run()
			{
				while (isRunning())
				{
					try
					{
						Thread.sleep(400);
						long now = System.currentTimeMillis();
						for (BlockingChannelEndPoint endp : _endpoints)
						{
							endp.checkIdleTimestamp(now);
						}
					}
					catch(InterruptedException e)
					{
						LOG.trace("",e);
					}
					catch(Exception e)
					{
						LOG.warn("",e);
					}
				}
			}

		});

	}

	@Override
	public void accept()
		throws IOException, InterruptedException
	{
		SocketChannel channel = _acceptChannel.accept();
		channel.configureBlocking(true);
		Socket socket = channel.socket();
		configure(socket);

		BlockingChannelEndPoint endp = new BlockingChannelEndPoint(channel);
		try {
			server.threadPool.execute(endp);
		} catch(RejectedExecutionException e) {
			LOG.warn("dispatch failed for  {}",endp._connection);
			endp.close();
		}
	}

	@Override
	public void customize(AbstractHttpConnection con)
		throws IOException
	{
		con._endp.setMaxIdleTime(_maxIdleTime);
		configure(con._endp.getChannel().socket());
	}


	private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable
	{
		private final BlockingHttpConnection _connection;
		private int _timeout;
		private volatile long _idleTimestamp;

		BlockingChannelEndPoint(SocketChannel channel)
			throws IOException
		{
			super(channel,BlockingChannelConnector.this._maxIdleTime);
			_connection = new BlockingHttpConnection(BlockingChannelConnector.this,this);
		}

		private void checkIdleTimestamp(long now)
		{
			if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
			{
				try
				{
					close();
				}
				catch (IOException e)
				{
					LOG.trace("",e);
				}
			}
		}

		@Override
		public int fill(Buffer buffer) throws IOException
		{
			_idleTimestamp = System.currentTimeMillis();
			return super.fill(buffer);
		}

		@Override
		public int flush(Buffer buffer) throws IOException
		{
			_idleTimestamp = System.currentTimeMillis();
			return super.flush(buffer);
		}

		@Override
		public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
		{
			_idleTimestamp = System.currentTimeMillis();
			return super.flush(header,buffer,trailer);
		}

		@Override
		public void run()
		{
			try
			{
				_timeout = getMaxIdleTime();
				_endpoints.add(this);
/*
				while (isOpen())
				{
					_idleTimestamp = System.currentTimeMillis();
					_connection.handle();
				}
*/
				_idleTimestamp = System.currentTimeMillis();
				_connection.handle();
				if( isOpen() )  throw new RuntimeException();
			}
			catch (EofException e)
			{
				LOG.warn("EOF", e);
//				LOG.debug("EOF", e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			catch (HttpException e)
			{
				LOG.warn("BAD", e);
//				LOG.debug("BAD", e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			catch(Throwable e)
			{
				LOG.warn("handle failed",e);
				try{close();}
				catch(IOException e2){LOG.trace("",e2);}
			}
			finally
			{
				_endpoints.remove(this);

				// wait for client to close, but if not, close ourselves.
				try
				{
					if (!_socket.isClosed())
					{
						long timestamp = System.currentTimeMillis();
						int max_idle = getMaxIdleTime();

						_socket.setSoTimeout(getMaxIdleTime());
						int c=0;
						do
						{
							c = _socket.getInputStream().read();
						}
						while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
						if (!_socket.isClosed())
							_socket.close();
					}
				}
				catch(IOException e)
				{
					LOG.trace("",e);
				}
			}
		}

		@Override
		public String toString()
		{
			return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}",
					hashCode(),
					_socket.getRemoteSocketAddress(),
					_socket.getLocalSocketAddress(),
					isOpen(),
					isInputShutdown(),
					isOutputShutdown(),
					_connection);
		}

	}
}