changeset 953:7db4a488fc82

simplify SelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 12 Oct 2016 22:16:36 -0600
parents 669769bcdf5c
children a021c4c9c244
files src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java src/org/eclipse/jetty/io/nio/SelectorManager.java
diffstat 2 files changed, 77 insertions(+), 154 deletions(-) [+]
line wrap: on
line diff
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java	Wed Oct 12 19:47:45 2016 -0600
+++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java	Wed Oct 12 22:16:36 2016 -0600
@@ -64,9 +64,9 @@
 	 */
 	private volatile AsyncConnection _connection;
 
-	private static final int STATE_NEEDS_DISPATCH=-1;
-	private static final int STATE_UNDISPATCHED=0;
-	private static final int STATE_DISPATCHED=1;
+	private static final int STATE_NEEDS_DISPATCH = -1;
+	private static final int STATE_UNDISPATCHED = 0;
+	private static final int STATE_DISPATCHED = 1;
 	private int _state;
 	
 	private boolean _onIdle;
@@ -126,62 +126,59 @@
 
 	public void setConnection(Connection connection)
 	{
-		_connection=(AsyncConnection)connection;
+		_connection = (AsyncConnection)connection;
 	}
 
 	/* ------------------------------------------------------------ */
 	/** Called by selectSet to schedule handling
 	 *
 	 */
-	public void schedule()
+	public synchronized void schedule()
 	{
-		synchronized (this)
+		// If there is no key, then do nothing
+		if (_key == null || !_key.isValid())
 		{
-			// If there is no key, then do nothing
-			if (_key == null || !_key.isValid())
-			{
+			_readBlocked=false;
+			_writeBlocked=false;
+			this.notifyAll();
+			return;
+		}
+
+		// If there are threads dispatched reading and writing
+		if (_readBlocked || _writeBlocked)
+		{
+			// assert _dispatched;
+			if (_readBlocked && _key.isReadable())
 				_readBlocked=false;
+			if (_writeBlocked && _key.isWritable())
 				_writeBlocked=false;
-				this.notifyAll();
-				return;
-			}
 
-			// If there are threads dispatched reading and writing
-			if (_readBlocked || _writeBlocked)
-			{
-				// assert _dispatched;
-				if (_readBlocked && _key.isReadable())
-					_readBlocked=false;
-				if (_writeBlocked && _key.isWritable())
-					_writeBlocked=false;
-
-				// wake them up is as good as a dispatched.
-				this.notifyAll();
+			// wake them up is as good as a dispatched.
+			this.notifyAll();
 
-				// we are not interested in further selecting
-				_key.interestOps(0);
-				if (_state<STATE_DISPATCHED)
-					updateKey();
-				return;
-			}
+			// we are not interested in further selecting
+			_key.interestOps(0);
+			if (_state<STATE_DISPATCHED)
+				updateKey();
+			return;
+		}
 
+		// Remove writeable op
+		if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
+		{
 			// Remove writeable op
-			if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
-			{
-				// Remove writeable op
-				_interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
-				_key.interestOps(_interestOps);
-				_writable = true; // Once writable is in ops, only removed with dispatch.
-			}
+			_interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
+			_key.interestOps(_interestOps);
+			_writable = true; // Once writable is in ops, only removed with dispatch.
+		}
 
-			// If dispatched, then deregister interest
-			if (_state>=STATE_DISPATCHED)
-				_key.interestOps(0);
-			else
-			{
-				// other wise do the dispatch
-				dispatch();
-			}
+		// If dispatched, then deregister interest
+		if (_state>=STATE_DISPATCHED)
+			_key.interestOps(0);
+		else
+		{
+			// other wise do the dispatch
+			dispatch();
 		}
 	}
 
@@ -216,7 +213,6 @@
 	{
 		_state = STATE_UNDISPATCHED;
 		updateKey();
-//		return true;
 	}
 
 	public void setCheckForIdle(boolean check)
@@ -441,7 +437,6 @@
 		return true;
 	}
 
-	/* ------------------------------------------------------------ */
 	public boolean hasProgressed()
 	{
 		return false;
@@ -473,7 +468,7 @@
 				}
 				catch(Exception e)
 				{
-					_key=null;
+					_key = null;
 					LOG.trace("",e);
 				}
 			}
@@ -509,7 +504,7 @@
 					{
 						try
 						{
-							_key = _selectSet.getSelector().register((SelectableChannel)getChannel(),_interestOps,this);
+							_key = _selectSet.getSelector().register(sc,_interestOps,this);
 						}
 						catch (Exception e)
 						{
@@ -538,7 +533,7 @@
 				if (_key!=null && _key.isValid())
 					_key.interestOps(0);
 				else
-					_key=null;
+					_key = null;
 			}
 		}
 		else
--- a/src/org/eclipse/jetty/io/nio/SelectorManager.java	Wed Oct 12 19:47:45 2016 -0600
+++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java	Wed Oct 12 22:16:36 2016 -0600
@@ -61,7 +61,7 @@
 	private int _maxIdleTime;
 	private long _lowResourcesConnections;
 	private SelectSet[] _selectSet;
-	private int _selectSets=1;
+	private int _selectSets = 1;
 	private volatile int _set=0;
 
 	/* ------------------------------------------------------------ */
@@ -242,9 +242,14 @@
 		private void addChange(SocketChannel channel)
 		{
 			try {
-				SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null);
+//System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq a");
+//				SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null);
+				SelectionKey key = _selector.register(channel,0,null);
 				SelectChannelEndPoint endpoint = createEndPoint(channel,key);
 				key.attach(endpoint);
+				key.interestOps(SelectionKey.OP_READ);
+				_selector.update();
+//System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq b");
 				endpoint.schedule();
 			} catch(IOException e) {
 				LOG.warn("",e);
@@ -261,7 +266,7 @@
 		 *
 		 * @throws IOException
 		 */
-		public void doSelect() throws IOException
+		private void doSelect() throws IOException
 		{
 			try
 			{
@@ -276,8 +281,6 @@
 				// Look for things to do
 				for (SelectionKey key: selector.selectedKeys())
 				{
-					SocketChannel channel=null;
-
 					try
 					{
 						if (!key.isValid())
@@ -289,50 +292,9 @@
 							continue;
 						}
 
-						Object att = key.attachment();
-						if (att instanceof SelectChannelEndPoint)
-						{
-							if (key.isReadable()||key.isWritable())
-								((SelectChannelEndPoint)att).schedule();
-						}
-						else if (key.isConnectable())
-						{
-							// Complete a connection of a registered channel
-							channel = (SocketChannel)key.channel();
-							boolean connected=false;
-							try
-							{
-								connected=channel.finishConnect();
-							}
-							catch(Exception e)
-							{
-								LOG.warn(e+","+channel+","+att);
-								LOG.debug("",e);
-							}
-							finally
-							{
-								if (connected)
-								{
-									key.interestOps(SelectionKey.OP_READ);
-									SelectChannelEndPoint endpoint = createEndPoint(channel,key);
-									key.attach(endpoint);
-									endpoint.schedule();
-								}
-								else
-								{
-									key.cancel();
-									channel.close();
-								}
-							}
-						}
-						else
-						{
-							// Wrap readable registered channel in an endpoint
-							channel = (SocketChannel)key.channel();
-							SelectChannelEndPoint endpoint = createEndPoint(channel,key);
-							key.attach(endpoint);
-							if (key.isReadable())
-								endpoint.schedule();
+						if (key.isReadable()||key.isWritable()) {
+							SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
+							endpoint.schedule();
 						}
 						key = null;
 					}
@@ -346,19 +308,6 @@
 							LOG.warn("",e);
 						else
 							LOG.trace("",e);
-
-						try
-						{
-							if (channel!=null)
-								channel.close();
-						}
-						catch(IOException e2)
-						{
-							LOG.debug("",e2);
-						}
-
-						if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
-							key.cancel();
 					}
 				}
 
@@ -415,56 +364,35 @@
 			return _selector;
 		}
 
-		void stop() throws Exception
+		synchronized void stop() throws Exception
 		{
-			// Spin for a while waiting for selector to complete
-			// to avoid unneccessary closed channel exceptions
-/*
-			try
+			// close endpoints and selector
+			for (SelectionKey key : _selector.keys())
 			{
-				for (int i=0;i<100 && _selecting!=null;i++)
+				Object att=key.attachment();
+				if (att instanceof EndPoint)
 				{
-					_selector.wakeup();
-					Thread.sleep(10);
+					EndPoint endpoint = (EndPoint)att;
+					try
+					{
+						endpoint.close();
+					}
+					catch(IOException e)
+					{
+						LOG.trace("",e);
+					}
 				}
 			}
-			catch(Exception e)
-			{
-				LOG.warn("",e);
-			}
-*/
-			// close endpoints and selector
-			synchronized (this)
+
+			try
 			{
-				for (SelectionKey key : _selector.keys())
-				{
-					if (key==null)
-						continue;
-					Object att=key.attachment();
-					if (att instanceof EndPoint)
-					{
-						EndPoint endpoint = (EndPoint)att;
-						try
-						{
-							endpoint.close();
-						}
-						catch(IOException e)
-						{
-							LOG.trace("",e);
-						}
-					}
-				}
-
-				try
-				{
-					_selector.close();
-				}
-				catch (IOException e)
-				{
-					LOG.trace("",e);
-				}
-				_selector = null;
+				_selector.close();
 			}
+			catch (IOException e)
+			{
+				LOG.trace("",e);
+			}
+			_selector = null;
 		}
 
 		public String dump()