Mercurial Hosting > luan
diff src/org/eclipse/jetty/util/thread/QueuedThreadPool.java @ 802:3428c60d7cfc
replace jetty jars with source
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 07 Sep 2016 21:15:48 -0600 |
parents | |
children | 8e9db0bbf4f9 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/eclipse/jetty/util/thread/QueuedThreadPool.java Wed Sep 07 21:15:48 2016 -0600 @@ -0,0 +1,678 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + + +package org.eclipse.jetty.util.thread; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.AggregateLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; + +public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable +{ + private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); + + private final AtomicInteger _threadsStarted = new AtomicInteger(); + private final AtomicInteger _threadsIdle = new AtomicInteger(); + private final AtomicLong _lastShrink = new AtomicLong(); + private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); + private final Object _joinLock = new Object(); + private BlockingQueue<Runnable> _jobs; + private String _name; + private int _maxIdleTimeMs=60000; + private int _maxThreads=254; + private int _minThreads=8; + private int _maxQueued=-1; + private int _priority=Thread.NORM_PRIORITY; + private boolean _daemon=false; + private int _maxStopTime=100; + private boolean _detailedDump=false; + + /* ------------------------------------------------------------------- */ + /** Construct + */ + public QueuedThreadPool() + { + _name="qtp"+super.hashCode(); + } + + /* ------------------------------------------------------------------- */ + /** Construct + */ + public QueuedThreadPool(int maxThreads) + { + this(); + setMaxThreads(maxThreads); + } + + /* ------------------------------------------------------------------- */ + /** Construct + */ + public QueuedThreadPool(BlockingQueue<Runnable> jobQ) + { + this(); + _jobs=jobQ; + _jobs.clear(); + } + + + /* ------------------------------------------------------------ */ + @Override + protected void doStart() throws Exception + { + super.doStart(); + _threadsStarted.set(0); + + if (_jobs==null) + { + _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) + :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); + } + + int threads=_threadsStarted.get(); + while (isRunning() && threads<_minThreads) + { + startThread(threads); + threads=_threadsStarted.get(); + } + } + + /* ------------------------------------------------------------ */ + @Override + protected void doStop() throws Exception + { + super.doStop(); + long start=System.currentTimeMillis(); + + // let jobs complete naturally for a while + while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) + Thread.sleep(1); + + // kill queued jobs and flush out idle jobs + _jobs.clear(); + Runnable noop = new Runnable(){public void run(){}}; + for (int i=_threadsIdle.get();i-->0;) + _jobs.offer(noop); + Thread.yield(); + + // interrupt remaining threads + if (_threadsStarted.get()>0) + for (Thread thread : _threads) + thread.interrupt(); + + // wait for remaining threads to die + while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) + { + Thread.sleep(1); + } + Thread.yield(); + int size=_threads.size(); + if (size>0) + { + LOG.warn(size+" threads could not be stopped"); + + if (size==1 || LOG.isDebugEnabled()) + { + for (Thread unstopped : _threads) + { + LOG.info("Couldn't stop "+unstopped); + for (StackTraceElement element : unstopped.getStackTrace()) + { + LOG.info(" at "+element); + } + } + } + } + + synchronized (_joinLock) + { + _joinLock.notifyAll(); + } + } + + /* ------------------------------------------------------------ */ + /** + * Delegated to the named or anonymous Pool. + */ + public void setDaemon(boolean daemon) + { + _daemon=daemon; + } + + /* ------------------------------------------------------------ */ + /** Set the maximum thread idle time. + * Threads that are idle for longer than this period may be + * stopped. + * Delegated to the named or anonymous Pool. + * @see #getMaxIdleTimeMs + * @param maxIdleTimeMs Max idle time in ms. + */ + public void setMaxIdleTimeMs(int maxIdleTimeMs) + { + _maxIdleTimeMs=maxIdleTimeMs; + } + + /* ------------------------------------------------------------ */ + /** + * @param stopTimeMs maximum total time that stop() will wait for threads to die. + */ + public void setMaxStopTimeMs(int stopTimeMs) + { + _maxStopTime = stopTimeMs; + } + + /* ------------------------------------------------------------ */ + /** Set the maximum number of threads. + * Delegated to the named or anonymous Pool. + * @see #getMaxThreads + * @param maxThreads maximum number of threads. + */ + public void setMaxThreads(int maxThreads) + { + _maxThreads=maxThreads; + if (_minThreads>_maxThreads) + _minThreads=_maxThreads; + } + + /* ------------------------------------------------------------ */ + /** Set the minimum number of threads. + * Delegated to the named or anonymous Pool. + * @see #getMinThreads + * @param minThreads minimum number of threads + */ + public void setMinThreads(int minThreads) + { + _minThreads=minThreads; + + if (_minThreads>_maxThreads) + _maxThreads=_minThreads; + + int threads=_threadsStarted.get(); + while (isStarted() && threads<_minThreads) + { + startThread(threads); + threads=_threadsStarted.get(); + } + } + + /* ------------------------------------------------------------ */ + /** + * @param name Name of the BoundedThreadPool to use when naming Threads. + */ + public void setName(String name) + { + if (isRunning()) + throw new IllegalStateException("started"); + _name= name; + } + + /* ------------------------------------------------------------ */ + /** Set the priority of the pool threads. + * @param priority the new thread priority. + */ + public void setThreadsPriority(int priority) + { + _priority=priority; + } + + /* ------------------------------------------------------------ */ + /** + * @return maximum queue size + */ + public int getMaxQueued() + { + return _maxQueued; + } + + /* ------------------------------------------------------------ */ + /** + * @param max job queue size + */ + public void setMaxQueued(int max) + { + if (isRunning()) + throw new IllegalStateException("started"); + _maxQueued=max; + } + + /* ------------------------------------------------------------ */ + /** Get the maximum thread idle time. + * Delegated to the named or anonymous Pool. + * @see #setMaxIdleTimeMs + * @return Max idle time in ms. + */ + public int getMaxIdleTimeMs() + { + return _maxIdleTimeMs; + } + + /* ------------------------------------------------------------ */ + /** + * @return maximum total time that stop() will wait for threads to die. + */ + public int getMaxStopTimeMs() + { + return _maxStopTime; + } + + /* ------------------------------------------------------------ */ + /** Set the maximum number of threads. + * Delegated to the named or anonymous Pool. + * @see #setMaxThreads + * @return maximum number of threads. + */ + public int getMaxThreads() + { + return _maxThreads; + } + + /* ------------------------------------------------------------ */ + /** Get the minimum number of threads. + * Delegated to the named or anonymous Pool. + * @see #setMinThreads + * @return minimum number of threads. + */ + public int getMinThreads() + { + return _minThreads; + } + + /* ------------------------------------------------------------ */ + /** + * @return The name of the BoundedThreadPool. + */ + public String getName() + { + return _name; + } + + /* ------------------------------------------------------------ */ + /** Get the priority of the pool threads. + * @return the priority of the pool threads. + */ + public int getThreadsPriority() + { + return _priority; + } + + /* ------------------------------------------------------------ */ + /** + * Delegated to the named or anonymous Pool. + */ + public boolean isDaemon() + { + return _daemon; + } + + /* ------------------------------------------------------------ */ + public boolean isDetailedDump() + { + return _detailedDump; + } + + /* ------------------------------------------------------------ */ + public void setDetailedDump(boolean detailedDump) + { + _detailedDump = detailedDump; + } + + /* ------------------------------------------------------------ */ + public boolean dispatch(Runnable job) + { + if (isRunning()) + { + final int jobQ = _jobs.size(); + final int idle = getIdleThreads(); + if(_jobs.offer(job)) + { + // If we had no idle threads or the jobQ is greater than the idle threads + if (idle==0 || jobQ>idle) + { + int threads=_threadsStarted.get(); + if (threads<_maxThreads) + startThread(threads); + } + return true; + } + } + LOG.debug("Dispatched {} to stopped {}",job,this); + return false; + } + + /* ------------------------------------------------------------ */ + public void execute(Runnable job) + { + if (!dispatch(job)) + throw new RejectedExecutionException(); + } + + /* ------------------------------------------------------------ */ + /** + * Blocks until the thread pool is {@link LifeCycle#stop stopped}. + */ + public void join() throws InterruptedException + { + synchronized (_joinLock) + { + while (isRunning()) + _joinLock.wait(); + } + + while (isStopping()) + Thread.sleep(1); + } + + /* ------------------------------------------------------------ */ + /** + * @return The total number of threads currently in the pool + */ + public int getThreads() + { + return _threadsStarted.get(); + } + + /* ------------------------------------------------------------ */ + /** + * @return The number of idle threads in the pool + */ + public int getIdleThreads() + { + return _threadsIdle.get(); + } + + /* ------------------------------------------------------------ */ + /** + * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs + */ + public boolean isLowOnThreads() + { + return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); + } + + /* ------------------------------------------------------------ */ + private boolean startThread(int threads) + { + final int next=threads+1; + if (!_threadsStarted.compareAndSet(threads,next)) + return false; + + boolean started=false; + try + { + Thread thread=newThread(_runnable); + thread.setDaemon(_daemon); + thread.setPriority(_priority); + thread.setName(_name+"-"+thread.getId()); + _threads.add(thread); + + thread.start(); + started=true; + } + finally + { + if (!started) + _threadsStarted.decrementAndGet(); + } + return started; + } + + /* ------------------------------------------------------------ */ + protected Thread newThread(Runnable runnable) + { + return new Thread(runnable); + } + + + /* ------------------------------------------------------------ */ + public String dump() + { + return AggregateLifeCycle.dump(this); + } + + /* ------------------------------------------------------------ */ + public void dump(Appendable out, String indent) throws IOException + { + List<Object> dump = new ArrayList<Object>(getMaxThreads()); + for (final Thread thread: _threads) + { + final StackTraceElement[] trace=thread.getStackTrace(); + boolean inIdleJobPoll=false; + // trace can be null on early java 6 jvms + if (trace != null) + { + for (StackTraceElement t : trace) + { + if ("idleJobPoll".equals(t.getMethodName())) + { + inIdleJobPoll = true; + break; + } + } + } + final boolean idle=inIdleJobPoll; + + if (_detailedDump) + { + dump.add(new Dumpable() + { + public void dump(Appendable out, String indent) throws IOException + { + out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); + if (!idle) + AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); + } + + public String dump() + { + return null; + } + }); + } + else + { + dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); + } + } + + AggregateLifeCycle.dumpObject(out,this); + AggregateLifeCycle.dump(out,indent,dump); + + } + + + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; + } + + /* ------------------------------------------------------------ */ + private Runnable idleJobPoll() throws InterruptedException + { + return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); + } + + /* ------------------------------------------------------------ */ + private Runnable _runnable = new Runnable() + { + public void run() + { + boolean shrink=false; + try + { + Runnable job=_jobs.poll(); + while (isRunning()) + { + // Job loop + while (job!=null && isRunning()) + { + runJob(job); + job=_jobs.poll(); + } + + // Idle loop + try + { + _threadsIdle.incrementAndGet(); + + while (isRunning() && job==null) + { + if (_maxIdleTimeMs<=0) + job=_jobs.take(); + else + { + // maybe we should shrink? + final int size=_threadsStarted.get(); + if (size>_minThreads) + { + long last=_lastShrink.get(); + long now=System.currentTimeMillis(); + if (last==0 || (now-last)>_maxIdleTimeMs) + { + shrink=_lastShrink.compareAndSet(last,now) && + _threadsStarted.compareAndSet(size,size-1); + if (shrink) + return; + } + } + job=idleJobPoll(); + } + } + } + finally + { + _threadsIdle.decrementAndGet(); + } + } + } + catch(InterruptedException e) + { + LOG.ignore(e); + } + catch(Exception e) + { + LOG.warn(e); + } + finally + { + if (!shrink) + _threadsStarted.decrementAndGet(); + _threads.remove(Thread.currentThread()); + } + } + }; + + /* ------------------------------------------------------------ */ + /** + * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> + * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> + * + * @param job the job to run + */ + protected void runJob(Runnable job) + { + job.run(); + } + + /* ------------------------------------------------------------ */ + /** + * @return the job queue + */ + protected BlockingQueue<Runnable> getQueue() + { + return _jobs; + } + + /* ------------------------------------------------------------ */ + /** + * @param id The thread ID to stop. + * @return true if the thread was found and stopped. + * @deprecated Use {@link #interruptThread(long)} in preference + */ + @Deprecated + public boolean stopThread(long id) + { + for (Thread thread: _threads) + { + if (thread.getId()==id) + { + thread.stop(); + return true; + } + } + return false; + } + + /* ------------------------------------------------------------ */ + /** + * @param id The thread ID to interrupt. + * @return true if the thread was found and interrupted. + */ + public boolean interruptThread(long id) + { + for (Thread thread: _threads) + { + if (thread.getId()==id) + { + thread.interrupt(); + return true; + } + } + return false; + } + + /* ------------------------------------------------------------ */ + /** + * @param id The thread ID to interrupt. + * @return true if the thread was found and interrupted. + */ + public String dumpThread(long id) + { + for (Thread thread: _threads) + { + if (thread.getId()==id) + { + StringBuilder buf = new StringBuilder(); + buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); + for (StackTraceElement element : thread.getStackTrace()) + buf.append(" at ").append(element.toString()).append('\n'); + return buf.toString(); + } + } + return null; + } +}