diff src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 963:4b6216fa9cec

replace SelectChannelEndPoint._state with isDispatched
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 14 Oct 2016 00:15:28 -0600
parents 94498d6daf5b
children 768414c16e10
line wrap: on
line diff
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java	Thu Oct 13 22:56:15 2016 -0600
+++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java	Fri Oct 14 00:15:28 2016 -0600
@@ -60,12 +60,14 @@
 	 * from HTTP to proxy connect or websocket.
 	 */
 	private volatile AsyncConnection _connection;
-
+/*
 	private static final int STATE_NEEDS_DISPATCH = -1;
 	private static final int STATE_UNDISPATCHED = 0;
 	private static final int STATE_DISPATCHED = 1;
 	private int _state;
-	
+*/
+	private boolean isDispatched = false;
+
 	/** true if the last write operation succeed and wrote all offered bytes */
 	private volatile boolean _writable = true;
 
@@ -75,7 +77,7 @@
 	/** True if a thread has is blocked in {@link #blockWritable(long)} */
 	private boolean _writeBlocked;
 
-	private boolean _ishut;
+	private boolean _ishut = false;
 
 	public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
 		throws IOException
@@ -84,7 +86,6 @@
 
 		_manager = selectSet.getManager();
 		_selectSet = selectSet;
-		_state = STATE_UNDISPATCHED;
 		_key = key;
 	}
 
@@ -106,12 +107,14 @@
 	 */
 	public synchronized void schedule()
 	{
-		// If there is no key, then do nothing
 		if (!_key.isValid())
 		{
+/*
 			_readBlocked = false;
 			_writeBlocked = false;
 			this.notifyAll();
+*/
+			_key.cancel();
 			return;
 		}
 
@@ -120,7 +123,7 @@
 		{
 			// assert _dispatched;
 			if (_readBlocked && _key.isReadable())
-				_readBlocked=false;
+				_readBlocked = false;
 			if (_writeBlocked && _key.isWritable())
 				_writeBlocked = false;
 
@@ -129,7 +132,7 @@
 
 			// we are not interested in further selecting
 			_key.interestOps(0);
-			if (_state<STATE_DISPATCHED)
+			if( !isDispatched )
 				updateKey();
 			return;
 		}
@@ -144,7 +147,7 @@
 		}
 
 		// If dispatched, then deregister interest
-		if (_state>=STATE_DISPATCHED)
+		if (isDispatched)
 			_key.interestOps(0);
 		else
 		{
@@ -156,15 +159,15 @@
 	@Override
 	public synchronized void dispatch()
 	{
-		if (_state<=STATE_UNDISPATCHED)
+		if( !isDispatched )
 		{
-			_state = STATE_DISPATCHED;
+			isDispatched = true;
 			try {
 				_manager.execute(_handler);
 			} catch(RejectedExecutionException e) {
-				_state = STATE_NEEDS_DISPATCH;
+				isDispatched = false;
 				LOG.warn("Dispatched Failed! "+this+" to "+_manager);
-				updateKey();
+//				updateKey();
 			}
 		}
 	}
@@ -187,7 +190,7 @@
 			synchronized (this)
 			{   
 				_writable = false;
-				if (_state<STATE_DISPATCHED)
+				if( !isDispatched )
 					updateKey();
 			}
 		}
@@ -209,7 +212,7 @@
 			synchronized (this)
 			{   
 				_writable = false;
-				if (_state<STATE_DISPATCHED)
+				if( !isDispatched )
 					updateKey();
 			}
 		}
@@ -226,42 +229,39 @@
 	 * Allows thread to block waiting for further events.
 	 */
 	@Override
-	public boolean blockReadable(long timeoutMs) throws IOException
+	public synchronized boolean blockReadable(long timeoutMs) throws IOException
 	{
-		synchronized (this)
-		{
-			if (isInputShutdown())
-				throw new EofException();
+		if (isInputShutdown())
+			throw new EofException();
 
-			long now = _selectSet.getNow();
-			long end = now+timeoutMs;
-			try
+		long now = _selectSet.getNow();
+		long end = now+timeoutMs;
+		try
+		{
+			_readBlocked = true;
+			while (!isInputShutdown() && _readBlocked)
 			{
-				_readBlocked = true;
-				while (!isInputShutdown() && _readBlocked)
+				try
 				{
-					try
-					{
-						updateKey();
-						this.wait(timeoutMs>0?(end-now):10000);
-					}
-					catch (final InterruptedException e)
-					{
-						LOG.warn("",e);
-					}
-					finally
-					{
-						now=_selectSet.getNow();
-					}
+					updateKey();
+					this.wait(timeoutMs>0?(end-now):10000);
+				}
+				catch (final InterruptedException e)
+				{
+					LOG.warn("",e);
+				}
+				finally
+				{
+					now = _selectSet.getNow();
+				}
 
-					if (_readBlocked && timeoutMs>0 && now>=end)
-						return false;
-				}
+				if (_readBlocked && timeoutMs>0 && now>=end)
+					return false;
 			}
-			finally
-			{
-				_readBlocked = false;
-			}
+		}
+		finally
+		{
+			_readBlocked = false;
 		}
 		return true;
 	}
@@ -271,41 +271,38 @@
 	 * Allows thread to block waiting for further events.
 	 */
 	@Override
-	public boolean blockWritable(long timeoutMs) throws IOException
+	public synchronized boolean blockWritable(long timeoutMs) throws IOException
 	{
-		synchronized (this)
-		{
-			if (isOutputShutdown())
-				throw new EofException();
+		if (isOutputShutdown())
+			throw new EofException();
 
-			long now=_selectSet.getNow();
-			long end=now+timeoutMs;
-			try
+		long now=_selectSet.getNow();
+		long end=now+timeoutMs;
+		try
+		{
+			_writeBlocked = true;
+			while (_writeBlocked && !isOutputShutdown())
 			{
-				_writeBlocked = true;
-				while (_writeBlocked && !isOutputShutdown())
+				try
 				{
-					try
-					{
-						updateKey();
-						this.wait(timeoutMs>0?(end-now):10000);
-					}
-					catch (final InterruptedException e)
-					{
-						LOG.warn("",e);
-					}
-					finally
-					{
-						now=_selectSet.getNow();
-					}
-					if (_writeBlocked && timeoutMs>0 && now>=end)
-						return false;
+					updateKey();
+					this.wait(timeoutMs>0?(end-now):10000);
+				}
+				catch (final InterruptedException e)
+				{
+					LOG.warn("",e);
 				}
+				finally
+				{
+					now = _selectSet.getNow();
+				}
+				if (_writeBlocked && timeoutMs>0 && now>=end)
+					return false;
 			}
-			finally
-			{
-				_writeBlocked = false;
-			}
+		}
+		finally
+		{
+			_writeBlocked = false;
 		}
 		return true;
 	}
@@ -326,8 +323,10 @@
 	{
 		if( getChannel().isOpen() && _key.isValid())
 		{
-			boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
-			boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
+			boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended());
+			boolean write_interest = _writeBlocked || (!isDispatched && !_writable);
+//			boolean write_interest = _writeBlocked || !isDispatched;
+//			boolean write_interest = true;
 
 			int interestOps =
 				((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
@@ -400,7 +399,7 @@
 		}
 		finally
 		{
-			_state = STATE_UNDISPATCHED;
+			isDispatched = false;
 			updateKey();
 		}
 	}
@@ -445,11 +444,11 @@
 		{
 			keyString += "!";
 		}
-		return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}",
+		return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}",
 				hashCode(),
 				_socket.getRemoteSocketAddress(),
 				_socket.getLocalSocketAddress(),
-				_state,
+				isDispatched,
 				isOpen(),
 				isInputShutdown(),
 				isOutputShutdown(),