view src/org/eclipse/jetty/util/thread/QueuedThreadPool.java @ 864:e21ca9878a10

simplify ThreadPool use
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 02 Oct 2016 16:17:38 -0600
parents 88d3c8ff242a
children
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//


package org.eclipse.jetty.util.thread;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor, Dumpable
{
	private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class);

	private final AtomicInteger _threadsStarted = new AtomicInteger();
	private final AtomicInteger _threadsIdle = new AtomicInteger();
	private final AtomicLong _lastShrink = new AtomicLong();
	private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
	private final Object _joinLock = new Object();
	private BlockingQueue<Runnable> _jobs;
	private String _name;
	private int _maxIdleTimeMs=60000;
	private int _maxThreads=254;
	private int _minThreads=8;
	private int _maxQueued=-1;
	private int _priority=Thread.NORM_PRIORITY;
	private boolean _daemon=false;
	private int _maxStopTime=100;
	private boolean _detailedDump=false;

	/* ------------------------------------------------------------------- */
	/** Construct
	 */
	public QueuedThreadPool()
	{
		_name="qtp"+super.hashCode();
	}

	/* ------------------------------------------------------------------- */
	/** Construct
	 */
	public QueuedThreadPool(int maxThreads)
	{
		this();
		setMaxThreads(maxThreads);
	}

	/* ------------------------------------------------------------------- */
	/** Construct
	 */
	public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
	{
		this();
		_jobs=jobQ;
		_jobs.clear();
	}


	/* ------------------------------------------------------------ */
	@Override
	protected void doStart() throws Exception
	{
		super.doStart();
		_threadsStarted.set(0);

		if (_jobs==null)
		{
			_jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
				:new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
		}

		int threads=_threadsStarted.get();
		while (isRunning() && threads<_minThreads)
		{
			startThread(threads);
			threads=_threadsStarted.get();
		}
	}

	/* ------------------------------------------------------------ */
	@Override
	protected void doStop() throws Exception
	{
		super.doStop();
		long start=System.currentTimeMillis();

		// let jobs complete naturally for a while
		while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
			Thread.sleep(1);

		// kill queued jobs and flush out idle jobs
		_jobs.clear();
		Runnable noop = new Runnable(){public void run(){}};
		for  (int i=_threadsIdle.get();i-->0;)
			_jobs.offer(noop);
		Thread.yield();

		// interrupt remaining threads
		if (_threadsStarted.get()>0)
			for (Thread thread : _threads)
				thread.interrupt();

		// wait for remaining threads to die
		while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
		{
			Thread.sleep(1);
		}
		Thread.yield();
		int size=_threads.size();
		if (size>0)
		{
			LOG.warn(size+" threads could not be stopped");

			if (size==1 || LOG.isDebugEnabled())
			{
				for (Thread unstopped : _threads)
				{
					LOG.info("Couldn't stop "+unstopped);
					for (StackTraceElement element : unstopped.getStackTrace())
					{
						LOG.info(" at "+element);
					}
				}
			}
		}

		synchronized (_joinLock)
		{
			_joinLock.notifyAll();
		}
	}

	/* ------------------------------------------------------------ */
	/**
	 * Delegated to the named or anonymous Pool.
	 */
	public void setDaemon(boolean daemon)
	{
		_daemon=daemon;
	}

	/* ------------------------------------------------------------ */
	/** Set the maximum thread idle time.
	 * Threads that are idle for longer than this period may be
	 * stopped.
	 * Delegated to the named or anonymous Pool.
	 * @see #getMaxIdleTimeMs
	 * @param maxIdleTimeMs Max idle time in ms.
	 */
	public void setMaxIdleTimeMs(int maxIdleTimeMs)
	{
		_maxIdleTimeMs=maxIdleTimeMs;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @param stopTimeMs maximum total time that stop() will wait for threads to die.
	 */
	public void setMaxStopTimeMs(int stopTimeMs)
	{
		_maxStopTime = stopTimeMs;
	}

	/* ------------------------------------------------------------ */
	/** Set the maximum number of threads.
	 * Delegated to the named or anonymous Pool.
	 * @see #getMaxThreads
	 * @param maxThreads maximum number of threads.
	 */
	public void setMaxThreads(int maxThreads)
	{
		_maxThreads=maxThreads;
		if (_minThreads>_maxThreads)
			_minThreads=_maxThreads;
	}

	/* ------------------------------------------------------------ */
	/** Set the minimum number of threads.
	 * Delegated to the named or anonymous Pool.
	 * @see #getMinThreads
	 * @param minThreads minimum number of threads
	 */
	public void setMinThreads(int minThreads)
	{
		_minThreads=minThreads;

		if (_minThreads>_maxThreads)
			_maxThreads=_minThreads;

		int threads=_threadsStarted.get();
		while (isStarted() && threads<_minThreads)
		{
			startThread(threads);
			threads=_threadsStarted.get();
		}
	}

	/* ------------------------------------------------------------ */
	/**
	 * @param name Name of the BoundedThreadPool to use when naming Threads.
	 */
	public void setName(String name)
	{
		if (isRunning())
			throw new IllegalStateException("started");
		_name= name;
	}

	/* ------------------------------------------------------------ */
	/** Set the priority of the pool threads.
	 *  @param priority the new thread priority.
	 */
	public void setThreadsPriority(int priority)
	{
		_priority=priority;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return maximum queue size
	 */
	public int getMaxQueued()
	{
		return _maxQueued;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @param max job queue size
	 */
	public void setMaxQueued(int max)
	{
		if (isRunning())
			throw new IllegalStateException("started");
		_maxQueued=max;
	}

	/* ------------------------------------------------------------ */
	/** Get the maximum thread idle time.
	 * Delegated to the named or anonymous Pool.
	 * @see #setMaxIdleTimeMs
	 * @return Max idle time in ms.
	 */
	public int getMaxIdleTimeMs()
	{
		return _maxIdleTimeMs;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return maximum total time that stop() will wait for threads to die.
	 */
	public int getMaxStopTimeMs()
	{
		return _maxStopTime;
	}

	/* ------------------------------------------------------------ */
	/** Set the maximum number of threads.
	 * Delegated to the named or anonymous Pool.
	 * @see #setMaxThreads
	 * @return maximum number of threads.
	 */
	public int getMaxThreads()
	{
		return _maxThreads;
	}

	/* ------------------------------------------------------------ */
	/** Get the minimum number of threads.
	 * Delegated to the named or anonymous Pool.
	 * @see #setMinThreads
	 * @return minimum number of threads.
	 */
	public int getMinThreads()
	{
		return _minThreads;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return The name of the BoundedThreadPool.
	 */
	public String getName()
	{
		return _name;
	}

	/* ------------------------------------------------------------ */
	/** Get the priority of the pool threads.
	 *  @return the priority of the pool threads.
	 */
	public int getThreadsPriority()
	{
		return _priority;
	}

	/* ------------------------------------------------------------ */
	/**
	 * Delegated to the named or anonymous Pool.
	 */
	public boolean isDaemon()
	{
		return _daemon;
	}

	/* ------------------------------------------------------------ */
	public boolean isDetailedDump()
	{
		return _detailedDump;
	}

	/* ------------------------------------------------------------ */
	public void setDetailedDump(boolean detailedDump)
	{
		_detailedDump = detailedDump;
	}

	/* ------------------------------------------------------------ */
	public boolean dispatch(Runnable job)
	{
		if (isRunning())
		{
			final int jobQ = _jobs.size();
			final int idle = getIdleThreads();
			if(_jobs.offer(job))
			{
				// If we had no idle threads or the jobQ is greater than the idle threads
				if (idle==0 || jobQ>idle)
				{
					int threads=_threadsStarted.get();
					if (threads<_maxThreads)
						startThread(threads);
				}
				return true;
			}
		}
		LOG.debug("Dispatched {} to stopped {}",job,this);
		return false;
	}

	/* ------------------------------------------------------------ */
	public void execute(Runnable job)
	{
		if (!dispatch(job))
			throw new RejectedExecutionException();
	}

	/* ------------------------------------------------------------ */
	/**
	 * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
	 */
	public void join() throws InterruptedException
	{
		synchronized (_joinLock)
		{
			while (isRunning())
				_joinLock.wait();
		}

		while (isStopping())
			Thread.sleep(1);
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return The total number of threads currently in the pool
	 */
	public int getThreads()
	{
		return _threadsStarted.get();
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return The number of idle threads in the pool
	 */
	public int getIdleThreads()
	{
		return _threadsIdle.get();
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
	 */
	public boolean isLowOnThreads()
	{
		return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
	}

	/* ------------------------------------------------------------ */
	private boolean startThread(int threads)
	{
		final int next=threads+1;
		if (!_threadsStarted.compareAndSet(threads,next))
			return false;

		boolean started=false;
		try
		{
			Thread thread=newThread(_runnable);
			thread.setDaemon(_daemon);
			thread.setPriority(_priority);
			thread.setName(_name+"-"+thread.getId());
			_threads.add(thread);

			thread.start();
			started=true;
		}
		finally
		{
			if (!started)
				_threadsStarted.decrementAndGet();
		}
		return started;
	}

	/* ------------------------------------------------------------ */
	protected Thread newThread(Runnable runnable)
	{
		return new Thread(runnable);
	}


	/* ------------------------------------------------------------ */
	public String dump()
	{
		return AggregateLifeCycle.dump(this);
	}

	/* ------------------------------------------------------------ */
	public void dump(Appendable out, String indent) throws IOException
	{
		List<Object> dump = new ArrayList<Object>(getMaxThreads());
		for (final Thread thread: _threads)
		{
			final StackTraceElement[] trace=thread.getStackTrace();
			boolean inIdleJobPoll=false;
			// trace can be null on early java 6 jvms
			if (trace != null)
			{
				for (StackTraceElement t : trace)
				{
					if ("idleJobPoll".equals(t.getMethodName()))
					{
						inIdleJobPoll = true;
						break;
					}
				}
			}
			final boolean idle=inIdleJobPoll;

			if (_detailedDump)
			{
				dump.add(new Dumpable()
				{
					public void dump(Appendable out, String indent) throws IOException
					{
						out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
						if (!idle)
							AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
					}

					public String dump()
					{
						return null;
					}
				});
			}
			else
			{
				dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
			}
		}

		AggregateLifeCycle.dumpObject(out,this);
		AggregateLifeCycle.dump(out,indent,dump);

	}


	/* ------------------------------------------------------------ */
	@Override
	public String toString()
	{
		return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
	}

	/* ------------------------------------------------------------ */
	private Runnable idleJobPoll() throws InterruptedException
	{
		return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
	}

	/* ------------------------------------------------------------ */
	private Runnable _runnable = new Runnable()
	{
		public void run()
		{
			boolean shrink=false;
			try
			{
				Runnable job=_jobs.poll();
				while (isRunning())
				{
					// Job loop
					while (job!=null && isRunning())
					{
						runJob(job);
						job=_jobs.poll();
					}

					// Idle loop
					try
					{
						_threadsIdle.incrementAndGet();

						while (isRunning() && job==null)
						{
							if (_maxIdleTimeMs<=0)
								job=_jobs.take();
							else
							{
								// maybe we should shrink?
								final int size=_threadsStarted.get();
								if (size>_minThreads)
								{
									long last=_lastShrink.get();
									long now=System.currentTimeMillis();
									if (last==0 || (now-last)>_maxIdleTimeMs)
									{
										shrink=_lastShrink.compareAndSet(last,now) &&
										_threadsStarted.compareAndSet(size,size-1);
										if (shrink)
											return;
									}
								}
								job=idleJobPoll();
							}
						}
					}
					finally
					{
						_threadsIdle.decrementAndGet();
					}
				}
			}
			catch(InterruptedException e)
			{
				LOG.trace("",e);
			}
			catch(Exception e)
			{
				LOG.warn("",e);
			}
			finally
			{
				if (!shrink)
					_threadsStarted.decrementAndGet();
				_threads.remove(Thread.currentThread());
			}
		}
	};

	/* ------------------------------------------------------------ */
	/**
	 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
	 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
	 *
	 * @param job the job to run
	 */
	protected void runJob(Runnable job)
	{
		job.run();
	}

	/* ------------------------------------------------------------ */
	/**
	 * @return the job queue
	 */
	protected BlockingQueue<Runnable> getQueue()
	{
		return _jobs;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @param id The thread ID to stop.
	 * @return true if the thread was found and stopped.
	 * @deprecated Use {@link #interruptThread(long)} in preference
	 */
	@Deprecated
	public boolean stopThread(long id)
	{
		for (Thread thread: _threads)
		{
			if (thread.getId()==id)
			{
				thread.stop();
				return true;
			}
		}
		return false;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @param id The thread ID to interrupt.
	 * @return true if the thread was found and interrupted.
	 */
	public boolean interruptThread(long id)
	{
		for (Thread thread: _threads)
		{
			if (thread.getId()==id)
			{
				thread.interrupt();
				return true;
			}
		}
		return false;
	}

	/* ------------------------------------------------------------ */
	/**
	 * @param id The thread ID to interrupt.
	 * @return true if the thread was found and interrupted.
	 */
	public String dumpThread(long id)
	{
		for (Thread thread: _threads)
		{
			if (thread.getId()==id)
			{
				StringBuilder buf = new StringBuilder();
				buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
				for (StackTraceElement element : thread.getStackTrace())
					buf.append("  at ").append(element.toString()).append('\n');
				return buf.toString();
			}
		}
		return null;
	}
}