view src/org/eclipse/jetty/server/AsyncContinuation.java @ 927:1c1c350fbe4b

remove AsyncContinuation.cancel()
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 09 Oct 2016 18:34:24 -0600
parents 88b20b775fa2
children 23a57aad34c0
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.server;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletException;

import java.util.ArrayList;
import java.util.List;

import javax.servlet.ServletContext;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;

import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationThrowable;
import org.eclipse.jetty.continuation.ContinuationListener;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.util.URIUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.util.thread.Timeout;

/* ------------------------------------------------------------ */
/** Implementation of Continuation and AsyncContext interfaces
 * 
 */
public class AsyncContinuation implements AsyncContext, Continuation
{
	private static final Logger LOG = LoggerFactory.getLogger(AsyncContinuation.class);

	private final static long DEFAULT_TIMEOUT=30000L;
	
	private final static ContinuationThrowable __exception = new ContinuationThrowable();
	
	// STATES:
	//               handling()    suspend()     unhandle()    resume()       complete()  doComplete()
	//                             startAsync()                dispatch()   
	// IDLE          DISPATCHED      
	// DISPATCHED                  ASYNCSTARTED  UNCOMPLETED
	// ASYNCSTARTED                              ASYNCWAIT     REDISPATCHING  COMPLETING
	// REDISPATCHING                             REDISPATCHED  
	// ASYNCWAIT                                               REDISPATCH     COMPLETING
	// REDISPATCH    REDISPATCHED
	// REDISPATCHED                ASYNCSTARTED  UNCOMPLETED
	// COMPLETING    UNCOMPLETED                 UNCOMPLETED
	// UNCOMPLETED                                                                        COMPLETED
	// COMPLETED
	private static final int __IDLE=0;         // Idle request
	private static final int __DISPATCHED=1;   // Request dispatched to filter/servlet
	private static final int __UNCOMPLETED=8;  // Request is completable
	private static final int __COMPLETED=9;    // Request is complete
	
	/* ------------------------------------------------------------ */
	protected AbstractHttpConnection _connection;
	private List<AsyncListener> _asyncListeners;
	private List<ContinuationListener> _continuationListeners;

	/* ------------------------------------------------------------ */
	private int _state;
	private boolean _initial;
	private volatile boolean _responseWrapped;
	private long _timeoutMs=DEFAULT_TIMEOUT;
	private AsyncEventState _event;
	private volatile long _expireAt;    
	
	/* ------------------------------------------------------------ */
	protected AsyncContinuation()
	{
		_state=__IDLE;
		_initial=true;
	}

	/* ------------------------------------------------------------ */
	protected void setConnection(final AbstractHttpConnection connection)
	{
		synchronized(this)
		{
			_connection=connection;
		}
	}

	/* ------------------------------------------------------------ */
	public void addListener(AsyncListener listener)
	{
		synchronized(this)
		{
			if (_asyncListeners==null)
				_asyncListeners=new ArrayList<AsyncListener>();
			_asyncListeners.add(listener);
		}
	}

	/* ------------------------------------------------------------ */
	public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response)
	{
		synchronized(this)
		{
			// TODO handle the request/response ???
			if (_asyncListeners==null)
				_asyncListeners=new ArrayList<AsyncListener>();
			_asyncListeners.add(listener);
		}
	}

	/* ------------------------------------------------------------ */
	public void addContinuationListener(ContinuationListener listener)
	{
		synchronized(this)
		{
			if (_continuationListeners==null)
				_continuationListeners=new ArrayList<ContinuationListener>();
			_continuationListeners.add(listener);
		}
	}

	/* ------------------------------------------------------------ */
	public void setTimeout(long ms)
	{
		synchronized(this)
		{
			_timeoutMs=ms;
		}
	} 

	/* ------------------------------------------------------------ */
	public long getTimeout()
	{
		synchronized(this)
		{
			return _timeoutMs;
		}
	} 

	/* ------------------------------------------------------------ */
	public AsyncEventState getAsyncEventState()
	{
		synchronized(this)
		{
			return _event;
		}
	} 
   
	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.continuation.Continuation#keepWrappers()
	 */

	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped()
	 */
	public boolean isResponseWrapped()
	{
		return _responseWrapped;
	}

	/* ------------------------------------------------------------ */
	/* (non-Javadoc)
	 * @see javax.servlet.ServletRequest#isInitial()
	 */
	public boolean isInitial()
	{
		synchronized(this)
		{
			return _initial;
		}
	}
	
	/* ------------------------------------------------------------ */
	/* (non-Javadoc)
	 * @see javax.servlet.ServletRequest#isSuspended()
	 */
	public boolean isSuspended()
	{
		synchronized(this)
		{
			switch(_state)
			{
				default:
					return false;   
			}
		}
	}
	
	/* ------------------------------------------------------------ */
	public boolean isSuspending()
	{
		synchronized(this)
		{
			switch(_state)
			{
				default:
					return false;   
			}
		}
	}
	
	/* ------------------------------------------------------------ */
	public boolean isDispatchable()
	{
		synchronized(this)
		{
			switch(_state)
			{
				default:
					return false;   
			}
		}
	}

	/* ------------------------------------------------------------ */
	@Override
	public String toString()
	{
		synchronized (this)
		{
			return super.toString()+"@"+getStatusString();
		}
	}

	/* ------------------------------------------------------------ */
	public String getStatusString()
	{
		synchronized (this)
		{
			return
			((_state==__IDLE)?"IDLE":
				(_state==__DISPATCHED)?"DISPATCHED":
											(_state==__UNCOMPLETED)?"UNCOMPLETED":
												(_state==__COMPLETED)?"COMPLETE":
													("UNKNOWN?"+_state))+
			(_initial?",initial":"");
		}
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return false if the handling of the request should not proceed
	 */
	protected boolean handling()
	{
		synchronized (this)
		{
			switch(_state)
			{
				case __IDLE:
					_initial=true;
					_state=__DISPATCHED;
					if (_asyncListeners!=null)
						_asyncListeners.clear();
					return true;
					
				default:
					throw new IllegalStateException(this.getStatusString());
			}
		}
	}

	/* ------------------------------------------------------------ */
	/**
	 * Signal that the HttpConnection has finished handling the request.
	 * For blocking connectors, this call may block if the request has
	 * been suspended (startAsync called).
	 * @return true if handling is complete, false if the request should 
	 * be handled again (eg because of a resume that happened before unhandle was called)
	 */
	protected boolean unhandle()
	{
		synchronized (this)
		{
			switch(_state)
			{
				case __DISPATCHED:
					_state = __UNCOMPLETED;
					return true;

				case __IDLE:
					throw new IllegalStateException(this.getStatusString());

				default:
					throw new IllegalStateException(this.getStatusString());
			}
		}
	}

	/* ------------------------------------------------------------ */
	public void dispatch()
	{
		boolean dispatch=false;
		synchronized (this)
		{
			switch(_state)
			{
				default:
					throw new IllegalStateException(this.getStatusString());
			}
		}
	}

	/* ------------------------------------------------------------ */
	protected void expired()
	{
	}
	
	/* ------------------------------------------------------------ */
	/* (non-Javadoc)
	 * @see javax.servlet.ServletRequest#complete()
	 */
	public void complete()
	{
		// just like resume, except don't set _resumed=true;
		boolean dispatch=false;
		synchronized (this)
		{
			switch(_state)
			{
				case __DISPATCHED:
					throw new IllegalStateException(this.getStatusString());

				default:
					throw new IllegalStateException(this.getStatusString());
			}
		}
	}
	
	/* ------------------------------------------------------------ */
	/* (non-Javadoc)
	 * @see javax.servlet.ServletRequest#complete()
	 */
	public void errorComplete()
	{
		// just like complete except can overrule a prior dispatch call;
		synchronized (this)
		{
			switch(_state)
			{
				default:
					throw new IllegalStateException(this.getStatusString());
			}
		}
	}

	/* ------------------------------------------------------------ */
	@Override
	public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException 
	{
		try
		{
			// TODO inject
			return clazz.newInstance();
		}
		catch(Exception e)
		{
			throw new ServletException(e);
		}
	}


	/* ------------------------------------------------------------ */
	/* (non-Javadoc)
	 * @see javax.servlet.ServletRequest#complete()
	 */
	protected void doComplete(Throwable ex)
	{
		final List<ContinuationListener> cListeners;
		final List<AsyncListener> aListeners;
		synchronized (this)
		{
			switch(_state)
			{
				case __UNCOMPLETED:
					_state = __COMPLETED;
					cListeners=_continuationListeners;
					aListeners=_asyncListeners;
					break;
					
				default:
					cListeners=null;
					aListeners=null;
					throw new IllegalStateException(this.getStatusString());
			}
		}
		
		if (aListeners!=null)
		{
			for (AsyncListener listener : aListeners)
			{
				try
				{
					if (ex!=null)
					{
						_event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,ex);
						_event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,ex.getMessage());
						listener.onError(_event);
					}
					else
						listener.onComplete(_event);
				}
				catch(Exception e)
				{
					LOG.warn("",e);
				}
			}
		}
		if (cListeners!=null)
		{
			for (ContinuationListener listener : cListeners)
			{
				try
				{
					listener.onComplete(this);
				}
				catch(Exception e)
				{
					LOG.warn("",e);
				}
			}
		}
	}

	/* ------------------------------------------------------------ */
	protected void recycle()
	{
		synchronized (this)
		{
			switch(_state)
			{
				case __DISPATCHED:
					throw new IllegalStateException(getStatusString());
				default:
					_state=__IDLE;
			}
			_initial = true;
			_responseWrapped=false;
			cancelTimeout();
			_timeoutMs=DEFAULT_TIMEOUT;
			_continuationListeners=null;
		}
	}    
	
	/* ------------------------------------------------------------ */
	protected void cancelTimeout()
	{
		EndPoint endp=_connection.getEndPoint();
		if (endp.isBlocking())
		{
			synchronized(this)
			{
				_expireAt=0;
				this.notifyAll();
			}
		}
		else 
		{
			final AsyncEventState event=_event;
			if (event!=null)
			{
				((AsyncEndPoint)endp).cancelTimeout(event._timeout);
			}
		}
	}

	boolean isUncompleted()
	{
		synchronized (this)
		{
			return _state==__UNCOMPLETED;
		}
	} 
	
	public boolean isComplete()
	{
		synchronized (this)
		{
			return _state==__COMPLETED;
		}
	}


	public boolean isAsync()
	{
		synchronized (this)
		{
			switch(_state)
			{
				case __IDLE:
				case __DISPATCHED:
				case __UNCOMPLETED:
				case __COMPLETED:
					return false;

				default:
					return true;
			}
		}
	}

	/* ------------------------------------------------------------ */
	public void dispatch(ServletContext context, String path)
	{
		_event._dispatchContext=context;
		_event.setPath(path);
		dispatch();
	}

	/* ------------------------------------------------------------ */
	public void dispatch(String path)
	{
		_event.setPath(path);
		dispatch();
	}

	/* ------------------------------------------------------------ */
	public Request getBaseRequest()
	{
		return _connection.getRequest();
	}
	
	/* ------------------------------------------------------------ */
	public ServletRequest getRequest()
	{
		if (_event!=null)
			return _event.getSuppliedRequest();
		return _connection.getRequest();
	}

	/* ------------------------------------------------------------ */
	public ServletResponse getResponse()
	{
		if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
			return _event.getSuppliedResponse();
		return _connection.getResponse();
	}

	/* ------------------------------------------------------------ */
	public void start(final Runnable run)
	{
		final AsyncEventState event=_event;
		if (event!=null)
		{
			_connection.getServer().threadPool.execute(new Runnable()
			{
				public void run()
				{
					((Context)event.getServletContext()).getContextHandler().handle(run);
				}
			});
		}
	}

	/* ------------------------------------------------------------ */
	public boolean hasOriginalRequestAndResponse()
	{
		synchronized (this)
		{
			return (_event!=null && _event.getSuppliedRequest()==_connection._request && _event.getSuppliedResponse()==_connection._response);
		}
	}

	/* ------------------------------------------------------------ */
	public ContextHandler getContextHandler()
	{
		final AsyncEventState event=_event;
		if (event!=null)
			return ((Context)event.getServletContext()).getContextHandler();
		return null;
	}


	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.continuation.Continuation#getServletResponse()
	 */
	public ServletResponse getServletResponse()
	{
		if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
			return _event.getSuppliedResponse();
		return _connection.getResponse();
	}

	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String)
	 */
	public Object getAttribute(String name)
	{
		return _connection.getRequest().getAttribute(name);
	}

	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String)
	 */
	public void removeAttribute(String name)
	{
		_connection.getRequest().removeAttribute(name);
	}

	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object)
	 */
	public void setAttribute(String name, Object attribute)
	{
		_connection.getRequest().setAttribute(name,attribute);
	}

	/* ------------------------------------------------------------ */
	/**
	 * @see org.eclipse.jetty.continuation.Continuation#undispatch()
	 */
	public void undispatch()
	{
		if (isSuspended())
		{
			if (LOG.isDebugEnabled())
				throw new ContinuationThrowable();
			else
				throw __exception;
		}
		throw new IllegalStateException("!suspended");
	}

	/* ------------------------------------------------------------ */
	/* ------------------------------------------------------------ */
	public class AsyncTimeout extends Timeout.Task implements Runnable
	{
			@Override
			public void expired()
			{
				AsyncContinuation.this.expired();
			}

			@Override
			public void run()
			{
				AsyncContinuation.this.expired();
			}
	}

	/* ------------------------------------------------------------ */
	/* ------------------------------------------------------------ */
	public class AsyncEventState extends AsyncEvent
	{
		private final ServletContext _suspendedContext;
		private ServletContext _dispatchContext;
		private String _pathInContext;
		private Timeout.Task _timeout=  new AsyncTimeout();
		
		public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
		{
			super(AsyncContinuation.this, request,response);
			_suspendedContext=context;
			// Get the base request So we can remember the initial paths
			Request r=_connection.getRequest();
 
			// If we haven't been async dispatched before
			if (r.getAttribute(AsyncContext.ASYNC_REQUEST_URI)==null)
			{
				// We are setting these attributes during startAsync, when the spec implies that 
				// they are only available after a call to AsyncContext.dispatch(...);
				
				// have we been forwarded before?
				String uri=(String)r.getAttribute(RequestDispatcher.FORWARD_REQUEST_URI);
				if (uri!=null)
				{
					r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,uri);
					r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getAttribute(RequestDispatcher.FORWARD_CONTEXT_PATH));
					r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getAttribute(RequestDispatcher.FORWARD_SERVLET_PATH));
					r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getAttribute(RequestDispatcher.FORWARD_PATH_INFO));
					r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getAttribute(RequestDispatcher.FORWARD_QUERY_STRING));
				}
				else
				{
					r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,r.getRequestURI());
					r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getContextPath());
					r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getServletPath());
					r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getPathInfo());
					r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getQueryString());
				}
			}
		}
		
		public ServletContext getSuspendedContext()
		{
			return _suspendedContext;
		}
		
		public ServletContext getDispatchContext()
		{
			return _dispatchContext;
		}
		
		public ServletContext getServletContext()
		{
			return _dispatchContext==null?_suspendedContext:_dispatchContext;
		}
		
		public void setPath(String path)
		{
			_pathInContext=path;
		}
		
		/* ------------------------------------------------------------ */
		/**
		 * @return The path in the context
		 */
		public String getPath()
		{
			return _pathInContext;
		}
	}
}