changeset 963:4b6216fa9cec

replace SelectChannelEndPoint._state with isDispatched
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 14 Oct 2016 00:15:28 -0600
parents 94498d6daf5b
children 768414c16e10
files src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java src/org/eclipse/jetty/server/AsyncHttpConnection.java
diffstat 2 files changed, 215 insertions(+), 216 deletions(-) [+]
line wrap: on
line diff
diff -r 94498d6daf5b -r 4b6216fa9cec src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java	Thu Oct 13 22:56:15 2016 -0600
+++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java	Fri Oct 14 00:15:28 2016 -0600
@@ -60,12 +60,14 @@
 	 * from HTTP to proxy connect or websocket.
 	 */
 	private volatile AsyncConnection _connection;
-
+/*
 	private static final int STATE_NEEDS_DISPATCH = -1;
 	private static final int STATE_UNDISPATCHED = 0;
 	private static final int STATE_DISPATCHED = 1;
 	private int _state;
-	
+*/
+	private boolean isDispatched = false;
+
 	/** true if the last write operation succeed and wrote all offered bytes */
 	private volatile boolean _writable = true;
 
@@ -75,7 +77,7 @@
 	/** True if a thread has is blocked in {@link #blockWritable(long)} */
 	private boolean _writeBlocked;
 
-	private boolean _ishut;
+	private boolean _ishut = false;
 
 	public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
 		throws IOException
@@ -84,7 +86,6 @@
 
 		_manager = selectSet.getManager();
 		_selectSet = selectSet;
-		_state = STATE_UNDISPATCHED;
 		_key = key;
 	}
 
@@ -106,12 +107,14 @@
 	 */
 	public synchronized void schedule()
 	{
-		// If there is no key, then do nothing
 		if (!_key.isValid())
 		{
+/*
 			_readBlocked = false;
 			_writeBlocked = false;
 			this.notifyAll();
+*/
+			_key.cancel();
 			return;
 		}
 
@@ -120,7 +123,7 @@
 		{
 			// assert _dispatched;
 			if (_readBlocked && _key.isReadable())
-				_readBlocked=false;
+				_readBlocked = false;
 			if (_writeBlocked && _key.isWritable())
 				_writeBlocked = false;
 
@@ -129,7 +132,7 @@
 
 			// we are not interested in further selecting
 			_key.interestOps(0);
-			if (_state<STATE_DISPATCHED)
+			if( !isDispatched )
 				updateKey();
 			return;
 		}
@@ -144,7 +147,7 @@
 		}
 
 		// If dispatched, then deregister interest
-		if (_state>=STATE_DISPATCHED)
+		if (isDispatched)
 			_key.interestOps(0);
 		else
 		{
@@ -156,15 +159,15 @@
 	@Override
 	public synchronized void dispatch()
 	{
-		if (_state<=STATE_UNDISPATCHED)
+		if( !isDispatched )
 		{
-			_state = STATE_DISPATCHED;
+			isDispatched = true;
 			try {
 				_manager.execute(_handler);
 			} catch(RejectedExecutionException e) {
-				_state = STATE_NEEDS_DISPATCH;
+				isDispatched = false;
 				LOG.warn("Dispatched Failed! "+this+" to "+_manager);
-				updateKey();
+//				updateKey();
 			}
 		}
 	}
@@ -187,7 +190,7 @@
 			synchronized (this)
 			{   
 				_writable = false;
-				if (_state<STATE_DISPATCHED)
+				if( !isDispatched )
 					updateKey();
 			}
 		}
@@ -209,7 +212,7 @@
 			synchronized (this)
 			{   
 				_writable = false;
-				if (_state<STATE_DISPATCHED)
+				if( !isDispatched )
 					updateKey();
 			}
 		}
@@ -226,42 +229,39 @@
 	 * Allows thread to block waiting for further events.
 	 */
 	@Override
-	public boolean blockReadable(long timeoutMs) throws IOException
+	public synchronized boolean blockReadable(long timeoutMs) throws IOException
 	{
-		synchronized (this)
-		{
-			if (isInputShutdown())
-				throw new EofException();
+		if (isInputShutdown())
+			throw new EofException();
 
-			long now = _selectSet.getNow();
-			long end = now+timeoutMs;
-			try
+		long now = _selectSet.getNow();
+		long end = now+timeoutMs;
+		try
+		{
+			_readBlocked = true;
+			while (!isInputShutdown() && _readBlocked)
 			{
-				_readBlocked = true;
-				while (!isInputShutdown() && _readBlocked)
+				try
 				{
-					try
-					{
-						updateKey();
-						this.wait(timeoutMs>0?(end-now):10000);
-					}
-					catch (final InterruptedException e)
-					{
-						LOG.warn("",e);
-					}
-					finally
-					{
-						now=_selectSet.getNow();
-					}
+					updateKey();
+					this.wait(timeoutMs>0?(end-now):10000);
+				}
+				catch (final InterruptedException e)
+				{
+					LOG.warn("",e);
+				}
+				finally
+				{
+					now = _selectSet.getNow();
+				}
 
-					if (_readBlocked && timeoutMs>0 && now>=end)
-						return false;
-				}
+				if (_readBlocked && timeoutMs>0 && now>=end)
+					return false;
 			}
-			finally
-			{
-				_readBlocked = false;
-			}
+		}
+		finally
+		{
+			_readBlocked = false;
 		}
 		return true;
 	}
@@ -271,41 +271,38 @@
 	 * Allows thread to block waiting for further events.
 	 */
 	@Override
-	public boolean blockWritable(long timeoutMs) throws IOException
+	public synchronized boolean blockWritable(long timeoutMs) throws IOException
 	{
-		synchronized (this)
-		{
-			if (isOutputShutdown())
-				throw new EofException();
+		if (isOutputShutdown())
+			throw new EofException();
 
-			long now=_selectSet.getNow();
-			long end=now+timeoutMs;
-			try
+		long now=_selectSet.getNow();
+		long end=now+timeoutMs;
+		try
+		{
+			_writeBlocked = true;
+			while (_writeBlocked && !isOutputShutdown())
 			{
-				_writeBlocked = true;
-				while (_writeBlocked && !isOutputShutdown())
+				try
 				{
-					try
-					{
-						updateKey();
-						this.wait(timeoutMs>0?(end-now):10000);
-					}
-					catch (final InterruptedException e)
-					{
-						LOG.warn("",e);
-					}
-					finally
-					{
-						now=_selectSet.getNow();
-					}
-					if (_writeBlocked && timeoutMs>0 && now>=end)
-						return false;
+					updateKey();
+					this.wait(timeoutMs>0?(end-now):10000);
+				}
+				catch (final InterruptedException e)
+				{
+					LOG.warn("",e);
 				}
+				finally
+				{
+					now = _selectSet.getNow();
+				}
+				if (_writeBlocked && timeoutMs>0 && now>=end)
+					return false;
 			}
-			finally
-			{
-				_writeBlocked = false;
-			}
+		}
+		finally
+		{
+			_writeBlocked = false;
 		}
 		return true;
 	}
@@ -326,8 +323,10 @@
 	{
 		if( getChannel().isOpen() && _key.isValid())
 		{
-			boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
-			boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
+			boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended());
+			boolean write_interest = _writeBlocked || (!isDispatched && !_writable);
+//			boolean write_interest = _writeBlocked || !isDispatched;
+//			boolean write_interest = true;
 
 			int interestOps =
 				((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
@@ -400,7 +399,7 @@
 		}
 		finally
 		{
-			_state = STATE_UNDISPATCHED;
+			isDispatched = false;
 			updateKey();
 		}
 	}
@@ -445,11 +444,11 @@
 		{
 			keyString += "!";
 		}
-		return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}",
+		return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}",
 				hashCode(),
 				_socket.getRemoteSocketAddress(),
 				_socket.getLocalSocketAddress(),
-				_state,
+				isDispatched,
 				isOpen(),
 				isInputShutdown(),
 				isOutputShutdown(),
diff -r 94498d6daf5b -r 4b6216fa9cec src/org/eclipse/jetty/server/AsyncHttpConnection.java
--- a/src/org/eclipse/jetty/server/AsyncHttpConnection.java	Thu Oct 13 22:56:15 2016 -0600
+++ b/src/org/eclipse/jetty/server/AsyncHttpConnection.java	Fri Oct 14 00:15:28 2016 -0600
@@ -37,159 +37,159 @@
  */
 public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
 {
-    private final static int NO_PROGRESS_INFO = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_INFO",100);
-    private final static int NO_PROGRESS_CLOSE = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_CLOSE",200);
+	private final static int NO_PROGRESS_INFO = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_INFO",100);
+	private final static int NO_PROGRESS_CLOSE = Integer.getInteger("org.mortbay.jetty.NO_PROGRESS_CLOSE",200);
 
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class);
-    private int _total_no_progress;
-    private final AsyncEndPoint _asyncEndp;
-    private boolean _readInterested = true;
+	private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class);
+	private int _total_no_progress;
+	private final AsyncEndPoint _asyncEndp;
+	private boolean _readInterested = true;
 
-    public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server)
-    {
-        super(connector,endpoint,server);
-        _asyncEndp=(AsyncEndPoint)endpoint;
-    }
+	public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server)
+	{
+		super(connector,endpoint,server);
+		_asyncEndp=(AsyncEndPoint)endpoint;
+	}
 
-    @Override
-    public Connection handle() throws IOException
-    {
-        Connection connection = this;
-        boolean some_progress=false;
-        boolean progress=true;
+	@Override
+	public Connection handle() throws IOException
+	{
+		Connection connection = this;
+		boolean some_progress = false;
+		boolean progress = true;
 
-        try
-        {
-            setCurrentConnection(this);
+		try
+		{
+			setCurrentConnection(this);
 
-            // While progress and the connection has not changed
-            while (progress && connection==this)
-            {
-                progress=false;
-                try
-                {
-                    // Parse more input
-                    if (!_parser.isComplete() && _parser.parseAvailable())
-                        progress=true;
+			// While progress and the connection has not changed
+			while (progress && connection==this)
+			{
+				progress=false;
+				try
+				{
+					// Parse more input
+					if (!_parser.isComplete() && _parser.parseAvailable())
+						progress = true;
 
-                    // Generate more output
-                    if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown())
-                        if (_generator.flushBuffer()>0)
-                            progress=true;
+					// Generate more output
+					if (_generator.isCommitted() && !_generator.isComplete() && !_endp.isOutputShutdown())
+						if (_generator.flushBuffer()>0)
+							progress = true;
 
-                    // Flush output
-                    _endp.flush();
+					// Flush output
+					_endp.flush();
 
-                    // Has any IO been done by the endpoint itself since last loop
-                    if (_asyncEndp.hasProgressed())
-                        progress=true;
-                }
-                catch (HttpException e)
-                {
-                    if (LOG.isDebugEnabled())
-                    {
-                        LOG.debug("uri="+_uri);
-                        LOG.debug("fields="+_requestFields);
-                        LOG.debug("",e);
-                    }
-                    progress=true;
-                    _generator.sendError(e.getStatus(), e.getReason(), null, true);
-                }
-                finally
-                {
-                    some_progress|=progress;
-                    //  Is this request/response round complete and are fully flushed?
-                    boolean parserComplete = _parser.isComplete();
-                    boolean generatorComplete = _generator.isComplete();
-                    boolean complete = parserComplete && generatorComplete;
-                    if (parserComplete)
-                    {
-                        if (generatorComplete)
-                        {
-                            // Reset the parser/generator
-                            progress=true;
+					// Has any IO been done by the endpoint itself since last loop
+					if (_asyncEndp.hasProgressed())
+						progress = true;
+				}
+				catch (HttpException e)
+				{
+					if (LOG.isDebugEnabled())
+					{
+						LOG.debug("uri="+_uri);
+						LOG.debug("fields="+_requestFields);
+						LOG.debug("",e);
+					}
+					progress = true;
+					_generator.sendError(e.getStatus(), e.getReason(), null, true);
+				}
+				finally
+				{
+					some_progress |= progress;
+					//  Is this request/response round complete and are fully flushed?
+					boolean parserComplete = _parser.isComplete();
+					boolean generatorComplete = _generator.isComplete();
+					boolean complete = parserComplete && generatorComplete;
+					if (parserComplete)
+					{
+						if (generatorComplete)
+						{
+							// Reset the parser/generator
+							progress=true;
 
-                            // look for a switched connection instance?
-                            if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
-                            {
-                                Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
-                                if (switched!=null)
-                                    connection=switched;
-                            }
+							// look for a switched connection instance?
+							if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
+							{
+								Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
+								if (switched!=null)
+									connection=switched;
+							}
 
-                            reset();
+							reset();
 
-                            // TODO Is this still required?
-                            if (!_generator.isPersistent() && !_endp.isOutputShutdown())
-                            {
-                                LOG.warn("Safety net oshut!!!  IF YOU SEE THIS, PLEASE RAISE BUGZILLA");
-                                _endp.shutdownOutput();
-                            }
-                        }
-                        else
-                        {
-                            // We have finished parsing, but not generating so
-                            // we must not be interested in reading until we
-                            // have finished generating and we reset the generator
-                            _readInterested = false;
-                            LOG.debug("Disabled read interest while writing response {}", _endp);
-                        }
-                    }
-                }
-            }
-        }
-        finally
-        {
-            setCurrentConnection(null);
+							// TODO Is this still required?
+							if (!_generator.isPersistent() && !_endp.isOutputShutdown())
+							{
+								LOG.warn("Safety net oshut!!!  IF YOU SEE THIS, PLEASE RAISE BUGZILLA");
+								_endp.shutdownOutput();
+							}
+						}
+						else
+						{
+							// We have finished parsing, but not generating so
+							// we must not be interested in reading until we
+							// have finished generating and we reset the generator
+							_readInterested = false;
+							LOG.debug("Disabled read interest while writing response {}", _endp);
+						}
+					}
+				}
+			}
+		}
+		finally
+		{
+			setCurrentConnection(null);
 
-            // return buffers
-            _parser.returnBuffers();
-            _generator.returnBuffers();
+			// return buffers
+			_parser.returnBuffers();
+			_generator.returnBuffers();
 
-            // Safety net to catch spinning
-            if (some_progress)
-                _total_no_progress=0;
-            else
-            {
-                _total_no_progress++;
-                if (NO_PROGRESS_INFO>0 && _total_no_progress%NO_PROGRESS_INFO==0 && (NO_PROGRESS_CLOSE<=0 || _total_no_progress< NO_PROGRESS_CLOSE))
-                    LOG.info("EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
-                if (NO_PROGRESS_CLOSE>0 && _total_no_progress==NO_PROGRESS_CLOSE)
-                {
-                    LOG.warn("Closing EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
-                    if (_endp instanceof SelectChannelEndPoint)
-                        ((SelectChannelEndPoint)_endp).getChannel().close();
-                }
-            }
-        }
-        return connection;
-    }
+			// Safety net to catch spinning
+			if (some_progress)
+				_total_no_progress = 0;
+			else
+			{
+				_total_no_progress++;
+				if (NO_PROGRESS_INFO>0 && _total_no_progress%NO_PROGRESS_INFO==0 && (NO_PROGRESS_CLOSE<=0 || _total_no_progress< NO_PROGRESS_CLOSE))
+					LOG.info("EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
+				if (NO_PROGRESS_CLOSE>0 && _total_no_progress==NO_PROGRESS_CLOSE)
+				{
+					LOG.warn("Closing EndPoint making no progress: "+_total_no_progress+" "+_endp+" "+this);
+					if (_endp instanceof SelectChannelEndPoint)
+						((SelectChannelEndPoint)_endp).getChannel().close();
+				}
+			}
+		}
+		return connection;
+	}
 
-    public void onInputShutdown() throws IOException
-    {
-        // If we don't have a committed response and we are not suspended
-        if (_generator.isIdle())
-        {
-            // then no more can happen, so close.
-            _endp.close();
-        }
+	public void onInputShutdown() throws IOException
+	{
+		// If we don't have a committed response and we are not suspended
+		if (_generator.isIdle())
+		{
+			// then no more can happen, so close.
+			_endp.close();
+		}
 
-        // Make idle parser seek EOF
-        if (_parser.isIdle())
-            _parser.setPersistent(false);
-    }
+		// Make idle parser seek EOF
+		if (_parser.isIdle())
+			_parser.setPersistent(false);
+	}
 
-    @Override
-    public void reset()
-    {
-        _readInterested = true;
-        LOG.debug("Enabled read interest {}", _endp);
-        super.reset();
-    }
+	@Override
+	public void reset()
+	{
+		_readInterested = true;
+		LOG.debug("Enabled read interest {}", _endp);
+		super.reset();
+	}
 
-    @Override
-    public boolean isSuspended()
-    {
-        return !_readInterested || super.isSuspended();
-    }
+	@Override
+	public boolean isSuspended()
+	{
+		return !_readInterested || super.isSuspended();
+	}
 }