changeset 954:a021c4c9c244

use just one SelectSet per SelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 13 Oct 2016 00:54:10 -0600
parents 7db4a488fc82
children 6f49b8dfffe6
files src/org/eclipse/jetty/io/nio/SelectorManager.java src/org/eclipse/jetty/server/Connector.java src/org/eclipse/jetty/server/nio/SelectChannelConnector.java
diffstat 3 files changed, 85 insertions(+), 174 deletions(-) [+]
line wrap: on
line diff
diff -r 7db4a488fc82 -r a021c4c9c244 src/org/eclipse/jetty/io/nio/SelectorManager.java
--- a/src/org/eclipse/jetty/io/nio/SelectorManager.java	Wed Oct 12 22:16:36 2016 -0600
+++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java	Thu Oct 13 00:54:10 2016 -0600
@@ -60,9 +60,7 @@
 
 	private int _maxIdleTime;
 	private long _lowResourcesConnections;
-	private SelectSet[] _selectSet;
-	private int _selectSets = 1;
-	private volatile int _set=0;
+	private SelectSet _selectSet;
 
 	/* ------------------------------------------------------------ */
 	/**
@@ -75,34 +73,14 @@
 	}
 
 	/* ------------------------------------------------------------ */
-	/**
-	 * @param selectSets number of select sets to create
-	 */
-	public void setSelectSets(int selectSets)
-	{
-		long lrc = _lowResourcesConnections * _selectSets;
-		_selectSets=selectSets;
-		_lowResourcesConnections=lrc/_selectSets;
-	}
-
-	/* ------------------------------------------------------------ */
 	/** Register a channel
 	 * @param channel
 	 */
 	public void register(SocketChannel channel)
 	{
-		// The ++ increment here is not atomic, but it does not matter.
-		// so long as the value changes sometimes, then connections will
-		// be distributed over the available sets.
-
-		int s = _set++;
-		if (s<0)
-			s=-s;
-		s=s%_selectSets;
-		SelectSet[] sets = _selectSet;
-		if (sets!=null)
+		SelectSet set = _selectSet;
+		if (set!=null)
 		{
-			SelectSet set=sets[s];
 			set.addChange(channel);
 		}
 	}
@@ -113,7 +91,7 @@
 	 */
 	public long getLowResourcesConnections()
 	{
-		return _lowResourcesConnections*_selectSets;
+		return _lowResourcesConnections;
 	}
 
 	/* ------------------------------------------------------------ */
@@ -125,7 +103,7 @@
 	 */
 	public void setLowResourcesConnections(long lowResourcesConnections)
 	{
-		_lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
+		_lowResourcesConnections = lowResourcesConnections;
 	}
 
 
@@ -138,55 +116,48 @@
 	@Override
 	protected void doStart() throws Exception
 	{
-		_selectSet = new SelectSet[_selectSets];
-		for (int i=0;i<_selectSet.length;i++)
-			_selectSet[i]= new SelectSet(i);
+		_selectSet = new SelectSet();
 
 		super.doStart();
 
 		// start a thread to Select
-		for (int i=0;i<_selectSets;i++)
+		execute(new Runnable()
 		{
-			final int id=i;
-			execute(new Runnable()
+			public void run()
 			{
-				public void run()
+				String name=Thread.currentThread().getName();
+				try
 				{
-					String name=Thread.currentThread().getName();
-					try
-					{
-						SelectSet[] sets=_selectSet;
-						if (sets==null)
-							return;
-						SelectSet set=sets[id];
+					SelectSet set = _selectSet;
+					if (set==null)
+						return;
 
-						Thread.currentThread().setName(name+" Selector"+id);
-						LOG.debug("Starting {} on {}",Thread.currentThread(),this);
-						while (isRunning())
+					Thread.currentThread().setName(name+" Selector");
+					LOG.debug("Starting {} on {}",Thread.currentThread(),this);
+					while (isRunning())
+					{
+						try
 						{
-							try
-							{
-								set.doSelect();
-							}
-							catch(IOException e)
-							{
-								LOG.trace("",e);
-							}
-							catch(Exception e)
-							{
-								LOG.warn("",e);
-							}
+							set.doSelect();
+						}
+						catch(IOException e)
+						{
+							LOG.trace("",e);
+						}
+						catch(Exception e)
+						{
+							LOG.warn("",e);
 						}
 					}
-					finally
-					{
-						LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
-						Thread.currentThread().setName(name);
-					}
 				}
+				finally
+				{
+					LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
+					Thread.currentThread().setName(name);
+				}
+			}
 
-			});
-		}
+		});
 	}
 
 
@@ -194,15 +165,11 @@
 	@Override
 	protected void doStop() throws Exception
 	{
-		SelectSet[] sets= _selectSet;
-		_selectSet=null;
-		if (sets!=null)
+		SelectSet set = _selectSet;
+		_selectSet = null;
+		if (set!=null)
 		{
-			for (SelectSet set : sets)
-			{
-				if (set!=null)
-					set.stop();
-			}
+			set.stop();
 		}
 		super.doStop();
 	}
@@ -217,25 +184,20 @@
 	public void dump(Appendable out, String indent) throws IOException
 	{
 		AggregateLifeCycle.dumpObject(out,this);
-		AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
+		AggregateLifeCycle.dump(out,indent,Collections.singletonList(_selectSet));
 	}
 
 
-	public class SelectSet implements Dumpable
+	public final class SelectSet implements Dumpable
 	{
-		private final int _setID;
 		private volatile long _now = System.currentTimeMillis();
 
-		private volatile SaneSelector _selector;
+		private final SaneSelector _selector;
 
-		private volatile Thread _selecting;
 		private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
 
-		SelectSet(int acceptorID) throws Exception
+		SelectSet() throws IOException
 		{
-			_setID=acceptorID;
-
-			// create a selector;
 			_selector = new SaneSelector();
 		}
 
@@ -260,26 +222,15 @@
 				}
 			}
 		}
-		/* ------------------------------------------------------------ */
-		/**
-		 * Select and dispatch tasks found from changes and the selector.
-		 *
-		 * @throws IOException
-		 */
+
 		private void doSelect() throws IOException
 		{
 			try
 			{
-				_selecting=Thread.currentThread();
-				final SaneSelector selector = _selector;
-				// Stopped concurrently ?
-				if (selector == null)
-					return;
-
-				selector.select();
+				_selector.select();
 
 				// Look for things to do
-				for (SelectionKey key: selector.selectedKeys())
+				for (SelectionKey key: _selector.selectedKeys())
 				{
 					try
 					{
@@ -296,7 +247,6 @@
 							SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
 							endpoint.schedule();
 						}
-						key = null;
 					}
 					catch (CancelledKeyException e)
 					{
@@ -312,7 +262,7 @@
 				}
 
 				// Everything always handled
-				selector.selectedKeys().clear();
+				_selector.selectedKeys().clear();
 
 				_now = System.currentTimeMillis();
 			}
@@ -327,10 +277,6 @@
 			{
 				LOG.trace("",e);
 			}
-			finally
-			{
-				_selecting=null;
-			}
 		}
 
 		public SelectorManager getManager()
@@ -369,18 +315,14 @@
 			// close endpoints and selector
 			for (SelectionKey key : _selector.keys())
 			{
-				Object att=key.attachment();
-				if (att instanceof EndPoint)
+				EndPoint endpoint = (EndPoint)key.attachment();
+				try
 				{
-					EndPoint endpoint = (EndPoint)att;
-					try
-					{
-						endpoint.close();
-					}
-					catch(IOException e)
-					{
-						LOG.trace("",e);
-					}
+					endpoint.close();
+				}
+				catch(IOException e)
+				{
+					LOG.trace("",e);
 				}
 			}
 
@@ -392,7 +334,6 @@
 			{
 				LOG.trace("",e);
 			}
-			_selector = null;
 		}
 
 		public String dump()
@@ -402,7 +343,7 @@
 
 		public void dump(Appendable out, String indent) throws IOException
 		{
-			out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
+			out.append(String.valueOf(this)).append("\n");
 			AggregateLifeCycle.dump(out,indent,Collections.emptyList());
 		}
 
diff -r 7db4a488fc82 -r a021c4c9c244 src/org/eclipse/jetty/server/Connector.java
--- a/src/org/eclipse/jetty/server/Connector.java	Wed Oct 12 22:16:36 2016 -0600
+++ b/src/org/eclipse/jetty/server/Connector.java	Thu Oct 13 00:54:10 2016 -0600
@@ -54,7 +54,7 @@
  * <li>Optional reverse proxy headers checking</li>
  * </ul>
  */
-public abstract class Connector extends AggregateLifeCycle implements HttpBuffers, Dumpable
+public abstract class Connector extends AggregateLifeCycle implements HttpBuffers, Dumpable, Runnable
 {
 	private static final Logger LOG = LoggerFactory.getLogger(Connector.class);
 
@@ -63,7 +63,6 @@
 	public final Server server;
 	private String _host;
 	public final int port;
-	private int _acceptors = 1;
 
 	protected final int _maxIdleTime = 200000;
 	protected int _soLingerTime = -1;
@@ -137,27 +136,6 @@
 
 	/* ------------------------------------------------------------ */
 	/**
-	 * @return Returns the number of acceptor threads.
-	 */
-	public int getAcceptors()
-	{
-		return _acceptors;
-	}
-
-	/* ------------------------------------------------------------ */
-	/**
-	 * @param acceptors
-	 *            The number of acceptor threads to set.
-	 */
-	public void setAcceptors(int acceptors)
-	{
-		if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
-			LOG.warn("Acceptors should be <=2*availableProcessors: " + this);
-		_acceptors = acceptors;
-	}
-
-	/* ------------------------------------------------------------ */
-	/**
 	 * @param soLingerTime
 	 *            The soLingerTime to set or -1 to disable.
 	 */
@@ -173,8 +151,7 @@
 
 		// Start selector thread
 		ThreadPoolExecutor _threadPool = server.threadPool;
-		for (int i = 0; i < _acceptors; i++)
-			_threadPool.execute(new Acceptor());
+		_threadPool.execute(this);
 		if (server.isLowOnThreads())
 			LOG.warn("insufficient threads configured for {}",this);
 
@@ -322,46 +299,42 @@
 	}
 
 
-	private class Acceptor implements Runnable
+	public void run()
 	{
+		Thread current = Thread.currentThread();
+		String name = current.getName();
+		current.setName(name + " Acceptor" + " " + Connector.this);
 
-		public void run()
+		try
 		{
-			Thread current = Thread.currentThread();
-			String name = current.getName();
-			current.setName(name + " Acceptor" + " " + Connector.this);
-
-			try
+			while (isRunning() && _acceptChannel != null)
 			{
-				while (isRunning() && _acceptChannel != null)
+				try
 				{
-					try
-					{
-						accept();
-					}
-					catch (EofException e)
-					{
-						LOG.trace("",e);
-					}
-					catch (IOException e)
-					{
-						LOG.trace("",e);
-					}
-					catch (InterruptedException x)
-					{
-						// Connector has been stopped
-						LOG.trace("",x);
-					}
-					catch (Throwable e)
-					{
-						LOG.warn("",e);
-					}
+					accept();
+				}
+				catch (EofException e)
+				{
+					LOG.trace("",e);
+				}
+				catch (IOException e)
+				{
+					LOG.trace("",e);
+				}
+				catch (InterruptedException x)
+				{
+					// Connector has been stopped
+					LOG.trace("",x);
+				}
+				catch (Throwable e)
+				{
+					LOG.warn("",e);
 				}
 			}
-			finally
-			{
-				current.setName(name);
-			}
+		}
+		finally
+		{
+			current.setName(name);
 		}
 	}
 
diff -r 7db4a488fc82 -r a021c4c9c244 src/org/eclipse/jetty/server/nio/SelectChannelConnector.java
--- a/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java	Wed Oct 12 22:16:36 2016 -0600
+++ b/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java	Thu Oct 13 00:54:10 2016 -0600
@@ -32,7 +32,6 @@
 import org.eclipse.jetty.io.nio.AsyncConnection;
 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
 import org.eclipse.jetty.io.nio.SelectorManager;
-import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
 import org.eclipse.jetty.server.AsyncHttpConnection;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Request;
@@ -62,7 +61,6 @@
 		super(server,port);
 		_manager.setMaxIdleTime(getMaxIdleTime());
 		addBean(_manager,true);
-		setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
 	}
 	
 	@Override
@@ -101,7 +99,6 @@
 	@Override
 	protected synchronized void doStart() throws Exception
 	{
-		_manager.setSelectSets(getAcceptors());
 //		_manager.setMaxIdleTime(getMaxIdleTime());
 		_manager.setLowResourcesConnections(0);