Mercurial Hosting > luan
diff src/org/eclipse/jetty/util/thread/QueuedThreadPool.java @ 863:88d3c8ff242a
remove SizedThreadPool
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 05:22:55 -0600 |
parents | 8e9db0bbf4f9 |
children |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/util/thread/QueuedThreadPool.java Sun Oct 02 05:17:11 2016 -0600 +++ b/src/org/eclipse/jetty/util/thread/QueuedThreadPool.java Sun Oct 02 05:22:55 2016 -0600 @@ -39,640 +39,639 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; -public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable +public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor, Dumpable { - private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class); + private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class); - private final AtomicInteger _threadsStarted = new AtomicInteger(); - private final AtomicInteger _threadsIdle = new AtomicInteger(); - private final AtomicLong _lastShrink = new AtomicLong(); - private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); - private final Object _joinLock = new Object(); - private BlockingQueue<Runnable> _jobs; - private String _name; - private int _maxIdleTimeMs=60000; - private int _maxThreads=254; - private int _minThreads=8; - private int _maxQueued=-1; - private int _priority=Thread.NORM_PRIORITY; - private boolean _daemon=false; - private int _maxStopTime=100; - private boolean _detailedDump=false; + private final AtomicInteger _threadsStarted = new AtomicInteger(); + private final AtomicInteger _threadsIdle = new AtomicInteger(); + private final AtomicLong _lastShrink = new AtomicLong(); + private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); + private final Object _joinLock = new Object(); + private BlockingQueue<Runnable> _jobs; + private String _name; + private int _maxIdleTimeMs=60000; + private int _maxThreads=254; + private int _minThreads=8; + private int _maxQueued=-1; + private int _priority=Thread.NORM_PRIORITY; + private boolean _daemon=false; + private int _maxStopTime=100; + private boolean _detailedDump=false; - /* ------------------------------------------------------------------- */ - /** Construct - */ - public QueuedThreadPool() - { - _name="qtp"+super.hashCode(); - } + /* ------------------------------------------------------------------- */ + /** Construct + */ + public QueuedThreadPool() + { + _name="qtp"+super.hashCode(); + } - /* ------------------------------------------------------------------- */ - /** Construct - */ - public QueuedThreadPool(int maxThreads) - { - this(); - setMaxThreads(maxThreads); - } + /* ------------------------------------------------------------------- */ + /** Construct + */ + public QueuedThreadPool(int maxThreads) + { + this(); + setMaxThreads(maxThreads); + } - /* ------------------------------------------------------------------- */ - /** Construct - */ - public QueuedThreadPool(BlockingQueue<Runnable> jobQ) - { - this(); - _jobs=jobQ; - _jobs.clear(); - } + /* ------------------------------------------------------------------- */ + /** Construct + */ + public QueuedThreadPool(BlockingQueue<Runnable> jobQ) + { + this(); + _jobs=jobQ; + _jobs.clear(); + } - /* ------------------------------------------------------------ */ - @Override - protected void doStart() throws Exception - { - super.doStart(); - _threadsStarted.set(0); + /* ------------------------------------------------------------ */ + @Override + protected void doStart() throws Exception + { + super.doStart(); + _threadsStarted.set(0); - if (_jobs==null) - { - _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) - :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); - } + if (_jobs==null) + { + _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) + :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); + } - int threads=_threadsStarted.get(); - while (isRunning() && threads<_minThreads) - { - startThread(threads); - threads=_threadsStarted.get(); - } - } + int threads=_threadsStarted.get(); + while (isRunning() && threads<_minThreads) + { + startThread(threads); + threads=_threadsStarted.get(); + } + } - /* ------------------------------------------------------------ */ - @Override - protected void doStop() throws Exception - { - super.doStop(); - long start=System.currentTimeMillis(); + /* ------------------------------------------------------------ */ + @Override + protected void doStop() throws Exception + { + super.doStop(); + long start=System.currentTimeMillis(); - // let jobs complete naturally for a while - while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) - Thread.sleep(1); + // let jobs complete naturally for a while + while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) + Thread.sleep(1); - // kill queued jobs and flush out idle jobs - _jobs.clear(); - Runnable noop = new Runnable(){public void run(){}}; - for (int i=_threadsIdle.get();i-->0;) - _jobs.offer(noop); - Thread.yield(); + // kill queued jobs and flush out idle jobs + _jobs.clear(); + Runnable noop = new Runnable(){public void run(){}}; + for (int i=_threadsIdle.get();i-->0;) + _jobs.offer(noop); + Thread.yield(); - // interrupt remaining threads - if (_threadsStarted.get()>0) - for (Thread thread : _threads) - thread.interrupt(); + // interrupt remaining threads + if (_threadsStarted.get()>0) + for (Thread thread : _threads) + thread.interrupt(); - // wait for remaining threads to die - while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) - { - Thread.sleep(1); - } - Thread.yield(); - int size=_threads.size(); - if (size>0) - { - LOG.warn(size+" threads could not be stopped"); + // wait for remaining threads to die + while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) + { + Thread.sleep(1); + } + Thread.yield(); + int size=_threads.size(); + if (size>0) + { + LOG.warn(size+" threads could not be stopped"); - if (size==1 || LOG.isDebugEnabled()) - { - for (Thread unstopped : _threads) - { - LOG.info("Couldn't stop "+unstopped); - for (StackTraceElement element : unstopped.getStackTrace()) - { - LOG.info(" at "+element); - } - } - } - } + if (size==1 || LOG.isDebugEnabled()) + { + for (Thread unstopped : _threads) + { + LOG.info("Couldn't stop "+unstopped); + for (StackTraceElement element : unstopped.getStackTrace()) + { + LOG.info(" at "+element); + } + } + } + } - synchronized (_joinLock) - { - _joinLock.notifyAll(); - } - } + synchronized (_joinLock) + { + _joinLock.notifyAll(); + } + } - /* ------------------------------------------------------------ */ - /** - * Delegated to the named or anonymous Pool. - */ - public void setDaemon(boolean daemon) - { - _daemon=daemon; - } + /* ------------------------------------------------------------ */ + /** + * Delegated to the named or anonymous Pool. + */ + public void setDaemon(boolean daemon) + { + _daemon=daemon; + } - /* ------------------------------------------------------------ */ - /** Set the maximum thread idle time. - * Threads that are idle for longer than this period may be - * stopped. - * Delegated to the named or anonymous Pool. - * @see #getMaxIdleTimeMs - * @param maxIdleTimeMs Max idle time in ms. - */ - public void setMaxIdleTimeMs(int maxIdleTimeMs) - { - _maxIdleTimeMs=maxIdleTimeMs; - } + /* ------------------------------------------------------------ */ + /** Set the maximum thread idle time. + * Threads that are idle for longer than this period may be + * stopped. + * Delegated to the named or anonymous Pool. + * @see #getMaxIdleTimeMs + * @param maxIdleTimeMs Max idle time in ms. + */ + public void setMaxIdleTimeMs(int maxIdleTimeMs) + { + _maxIdleTimeMs=maxIdleTimeMs; + } - /* ------------------------------------------------------------ */ - /** - * @param stopTimeMs maximum total time that stop() will wait for threads to die. - */ - public void setMaxStopTimeMs(int stopTimeMs) - { - _maxStopTime = stopTimeMs; - } + /* ------------------------------------------------------------ */ + /** + * @param stopTimeMs maximum total time that stop() will wait for threads to die. + */ + public void setMaxStopTimeMs(int stopTimeMs) + { + _maxStopTime = stopTimeMs; + } - /* ------------------------------------------------------------ */ - /** Set the maximum number of threads. - * Delegated to the named or anonymous Pool. - * @see #getMaxThreads - * @param maxThreads maximum number of threads. - */ - public void setMaxThreads(int maxThreads) - { - _maxThreads=maxThreads; - if (_minThreads>_maxThreads) - _minThreads=_maxThreads; - } + /* ------------------------------------------------------------ */ + /** Set the maximum number of threads. + * Delegated to the named or anonymous Pool. + * @see #getMaxThreads + * @param maxThreads maximum number of threads. + */ + public void setMaxThreads(int maxThreads) + { + _maxThreads=maxThreads; + if (_minThreads>_maxThreads) + _minThreads=_maxThreads; + } - /* ------------------------------------------------------------ */ - /** Set the minimum number of threads. - * Delegated to the named or anonymous Pool. - * @see #getMinThreads - * @param minThreads minimum number of threads - */ - public void setMinThreads(int minThreads) - { - _minThreads=minThreads; + /* ------------------------------------------------------------ */ + /** Set the minimum number of threads. + * Delegated to the named or anonymous Pool. + * @see #getMinThreads + * @param minThreads minimum number of threads + */ + public void setMinThreads(int minThreads) + { + _minThreads=minThreads; - if (_minThreads>_maxThreads) - _maxThreads=_minThreads; + if (_minThreads>_maxThreads) + _maxThreads=_minThreads; - int threads=_threadsStarted.get(); - while (isStarted() && threads<_minThreads) - { - startThread(threads); - threads=_threadsStarted.get(); - } - } + int threads=_threadsStarted.get(); + while (isStarted() && threads<_minThreads) + { + startThread(threads); + threads=_threadsStarted.get(); + } + } - /* ------------------------------------------------------------ */ - /** - * @param name Name of the BoundedThreadPool to use when naming Threads. - */ - public void setName(String name) - { - if (isRunning()) - throw new IllegalStateException("started"); - _name= name; - } + /* ------------------------------------------------------------ */ + /** + * @param name Name of the BoundedThreadPool to use when naming Threads. + */ + public void setName(String name) + { + if (isRunning()) + throw new IllegalStateException("started"); + _name= name; + } - /* ------------------------------------------------------------ */ - /** Set the priority of the pool threads. - * @param priority the new thread priority. - */ - public void setThreadsPriority(int priority) - { - _priority=priority; - } + /* ------------------------------------------------------------ */ + /** Set the priority of the pool threads. + * @param priority the new thread priority. + */ + public void setThreadsPriority(int priority) + { + _priority=priority; + } - /* ------------------------------------------------------------ */ - /** - * @return maximum queue size - */ - public int getMaxQueued() - { - return _maxQueued; - } + /* ------------------------------------------------------------ */ + /** + * @return maximum queue size + */ + public int getMaxQueued() + { + return _maxQueued; + } - /* ------------------------------------------------------------ */ - /** - * @param max job queue size - */ - public void setMaxQueued(int max) - { - if (isRunning()) - throw new IllegalStateException("started"); - _maxQueued=max; - } + /* ------------------------------------------------------------ */ + /** + * @param max job queue size + */ + public void setMaxQueued(int max) + { + if (isRunning()) + throw new IllegalStateException("started"); + _maxQueued=max; + } - /* ------------------------------------------------------------ */ - /** Get the maximum thread idle time. - * Delegated to the named or anonymous Pool. - * @see #setMaxIdleTimeMs - * @return Max idle time in ms. - */ - public int getMaxIdleTimeMs() - { - return _maxIdleTimeMs; - } + /* ------------------------------------------------------------ */ + /** Get the maximum thread idle time. + * Delegated to the named or anonymous Pool. + * @see #setMaxIdleTimeMs + * @return Max idle time in ms. + */ + public int getMaxIdleTimeMs() + { + return _maxIdleTimeMs; + } - /* ------------------------------------------------------------ */ - /** - * @return maximum total time that stop() will wait for threads to die. - */ - public int getMaxStopTimeMs() - { - return _maxStopTime; - } + /* ------------------------------------------------------------ */ + /** + * @return maximum total time that stop() will wait for threads to die. + */ + public int getMaxStopTimeMs() + { + return _maxStopTime; + } - /* ------------------------------------------------------------ */ - /** Set the maximum number of threads. - * Delegated to the named or anonymous Pool. - * @see #setMaxThreads - * @return maximum number of threads. - */ - public int getMaxThreads() - { - return _maxThreads; - } + /* ------------------------------------------------------------ */ + /** Set the maximum number of threads. + * Delegated to the named or anonymous Pool. + * @see #setMaxThreads + * @return maximum number of threads. + */ + public int getMaxThreads() + { + return _maxThreads; + } - /* ------------------------------------------------------------ */ - /** Get the minimum number of threads. - * Delegated to the named or anonymous Pool. - * @see #setMinThreads - * @return minimum number of threads. - */ - public int getMinThreads() - { - return _minThreads; - } + /* ------------------------------------------------------------ */ + /** Get the minimum number of threads. + * Delegated to the named or anonymous Pool. + * @see #setMinThreads + * @return minimum number of threads. + */ + public int getMinThreads() + { + return _minThreads; + } - /* ------------------------------------------------------------ */ - /** - * @return The name of the BoundedThreadPool. - */ - public String getName() - { - return _name; - } + /* ------------------------------------------------------------ */ + /** + * @return The name of the BoundedThreadPool. + */ + public String getName() + { + return _name; + } - /* ------------------------------------------------------------ */ - /** Get the priority of the pool threads. - * @return the priority of the pool threads. - */ - public int getThreadsPriority() - { - return _priority; - } + /* ------------------------------------------------------------ */ + /** Get the priority of the pool threads. + * @return the priority of the pool threads. + */ + public int getThreadsPriority() + { + return _priority; + } - /* ------------------------------------------------------------ */ - /** - * Delegated to the named or anonymous Pool. - */ - public boolean isDaemon() - { - return _daemon; - } + /* ------------------------------------------------------------ */ + /** + * Delegated to the named or anonymous Pool. + */ + public boolean isDaemon() + { + return _daemon; + } - /* ------------------------------------------------------------ */ - public boolean isDetailedDump() - { - return _detailedDump; - } + /* ------------------------------------------------------------ */ + public boolean isDetailedDump() + { + return _detailedDump; + } - /* ------------------------------------------------------------ */ - public void setDetailedDump(boolean detailedDump) - { - _detailedDump = detailedDump; - } + /* ------------------------------------------------------------ */ + public void setDetailedDump(boolean detailedDump) + { + _detailedDump = detailedDump; + } - /* ------------------------------------------------------------ */ - public boolean dispatch(Runnable job) - { - if (isRunning()) - { - final int jobQ = _jobs.size(); - final int idle = getIdleThreads(); - if(_jobs.offer(job)) - { - // If we had no idle threads or the jobQ is greater than the idle threads - if (idle==0 || jobQ>idle) - { - int threads=_threadsStarted.get(); - if (threads<_maxThreads) - startThread(threads); - } - return true; - } - } - LOG.debug("Dispatched {} to stopped {}",job,this); - return false; - } + /* ------------------------------------------------------------ */ + public boolean dispatch(Runnable job) + { + if (isRunning()) + { + final int jobQ = _jobs.size(); + final int idle = getIdleThreads(); + if(_jobs.offer(job)) + { + // If we had no idle threads or the jobQ is greater than the idle threads + if (idle==0 || jobQ>idle) + { + int threads=_threadsStarted.get(); + if (threads<_maxThreads) + startThread(threads); + } + return true; + } + } + LOG.debug("Dispatched {} to stopped {}",job,this); + return false; + } - /* ------------------------------------------------------------ */ - public void execute(Runnable job) - { - if (!dispatch(job)) - throw new RejectedExecutionException(); - } + /* ------------------------------------------------------------ */ + public void execute(Runnable job) + { + if (!dispatch(job)) + throw new RejectedExecutionException(); + } - /* ------------------------------------------------------------ */ - /** - * Blocks until the thread pool is {@link LifeCycle#stop stopped}. - */ - public void join() throws InterruptedException - { - synchronized (_joinLock) - { - while (isRunning()) - _joinLock.wait(); - } + /* ------------------------------------------------------------ */ + /** + * Blocks until the thread pool is {@link LifeCycle#stop stopped}. + */ + public void join() throws InterruptedException + { + synchronized (_joinLock) + { + while (isRunning()) + _joinLock.wait(); + } - while (isStopping()) - Thread.sleep(1); - } + while (isStopping()) + Thread.sleep(1); + } - /* ------------------------------------------------------------ */ - /** - * @return The total number of threads currently in the pool - */ - public int getThreads() - { - return _threadsStarted.get(); - } + /* ------------------------------------------------------------ */ + /** + * @return The total number of threads currently in the pool + */ + public int getThreads() + { + return _threadsStarted.get(); + } - /* ------------------------------------------------------------ */ - /** - * @return The number of idle threads in the pool - */ - public int getIdleThreads() - { - return _threadsIdle.get(); - } + /* ------------------------------------------------------------ */ + /** + * @return The number of idle threads in the pool + */ + public int getIdleThreads() + { + return _threadsIdle.get(); + } - /* ------------------------------------------------------------ */ - /** - * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs - */ - public boolean isLowOnThreads() - { - return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); - } + /* ------------------------------------------------------------ */ + /** + * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs + */ + public boolean isLowOnThreads() + { + return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); + } - /* ------------------------------------------------------------ */ - private boolean startThread(int threads) - { - final int next=threads+1; - if (!_threadsStarted.compareAndSet(threads,next)) - return false; + /* ------------------------------------------------------------ */ + private boolean startThread(int threads) + { + final int next=threads+1; + if (!_threadsStarted.compareAndSet(threads,next)) + return false; - boolean started=false; - try - { - Thread thread=newThread(_runnable); - thread.setDaemon(_daemon); - thread.setPriority(_priority); - thread.setName(_name+"-"+thread.getId()); - _threads.add(thread); + boolean started=false; + try + { + Thread thread=newThread(_runnable); + thread.setDaemon(_daemon); + thread.setPriority(_priority); + thread.setName(_name+"-"+thread.getId()); + _threads.add(thread); - thread.start(); - started=true; - } - finally - { - if (!started) - _threadsStarted.decrementAndGet(); - } - return started; - } + thread.start(); + started=true; + } + finally + { + if (!started) + _threadsStarted.decrementAndGet(); + } + return started; + } - /* ------------------------------------------------------------ */ - protected Thread newThread(Runnable runnable) - { - return new Thread(runnable); - } + /* ------------------------------------------------------------ */ + protected Thread newThread(Runnable runnable) + { + return new Thread(runnable); + } - /* ------------------------------------------------------------ */ - public String dump() - { - return AggregateLifeCycle.dump(this); - } + /* ------------------------------------------------------------ */ + public String dump() + { + return AggregateLifeCycle.dump(this); + } - /* ------------------------------------------------------------ */ - public void dump(Appendable out, String indent) throws IOException - { - List<Object> dump = new ArrayList<Object>(getMaxThreads()); - for (final Thread thread: _threads) - { - final StackTraceElement[] trace=thread.getStackTrace(); - boolean inIdleJobPoll=false; - // trace can be null on early java 6 jvms - if (trace != null) - { - for (StackTraceElement t : trace) - { - if ("idleJobPoll".equals(t.getMethodName())) - { - inIdleJobPoll = true; - break; - } - } - } - final boolean idle=inIdleJobPoll; + /* ------------------------------------------------------------ */ + public void dump(Appendable out, String indent) throws IOException + { + List<Object> dump = new ArrayList<Object>(getMaxThreads()); + for (final Thread thread: _threads) + { + final StackTraceElement[] trace=thread.getStackTrace(); + boolean inIdleJobPoll=false; + // trace can be null on early java 6 jvms + if (trace != null) + { + for (StackTraceElement t : trace) + { + if ("idleJobPoll".equals(t.getMethodName())) + { + inIdleJobPoll = true; + break; + } + } + } + final boolean idle=inIdleJobPoll; - if (_detailedDump) - { - dump.add(new Dumpable() - { - public void dump(Appendable out, String indent) throws IOException - { - out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); - if (!idle) - AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); - } + if (_detailedDump) + { + dump.add(new Dumpable() + { + public void dump(Appendable out, String indent) throws IOException + { + out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); + if (!idle) + AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); + } - public String dump() - { - return null; - } - }); - } - else - { - dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); - } - } + public String dump() + { + return null; + } + }); + } + else + { + dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); + } + } - AggregateLifeCycle.dumpObject(out,this); - AggregateLifeCycle.dump(out,indent,dump); + AggregateLifeCycle.dumpObject(out,this); + AggregateLifeCycle.dump(out,indent,dump); - } + } - /* ------------------------------------------------------------ */ - @Override - public String toString() - { - return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; - } + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; + } - /* ------------------------------------------------------------ */ - private Runnable idleJobPoll() throws InterruptedException - { - return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); - } + /* ------------------------------------------------------------ */ + private Runnable idleJobPoll() throws InterruptedException + { + return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); + } - /* ------------------------------------------------------------ */ - private Runnable _runnable = new Runnable() - { - public void run() - { - boolean shrink=false; - try - { - Runnable job=_jobs.poll(); - while (isRunning()) - { - // Job loop - while (job!=null && isRunning()) - { - runJob(job); - job=_jobs.poll(); - } + /* ------------------------------------------------------------ */ + private Runnable _runnable = new Runnable() + { + public void run() + { + boolean shrink=false; + try + { + Runnable job=_jobs.poll(); + while (isRunning()) + { + // Job loop + while (job!=null && isRunning()) + { + runJob(job); + job=_jobs.poll(); + } - // Idle loop - try - { - _threadsIdle.incrementAndGet(); + // Idle loop + try + { + _threadsIdle.incrementAndGet(); - while (isRunning() && job==null) - { - if (_maxIdleTimeMs<=0) - job=_jobs.take(); - else - { - // maybe we should shrink? - final int size=_threadsStarted.get(); - if (size>_minThreads) - { - long last=_lastShrink.get(); - long now=System.currentTimeMillis(); - if (last==0 || (now-last)>_maxIdleTimeMs) - { - shrink=_lastShrink.compareAndSet(last,now) && - _threadsStarted.compareAndSet(size,size-1); - if (shrink) - return; - } - } - job=idleJobPoll(); - } - } - } - finally - { - _threadsIdle.decrementAndGet(); - } - } - } - catch(InterruptedException e) - { - LOG.trace("",e); - } - catch(Exception e) - { - LOG.warn("",e); - } - finally - { - if (!shrink) - _threadsStarted.decrementAndGet(); - _threads.remove(Thread.currentThread()); - } - } - }; + while (isRunning() && job==null) + { + if (_maxIdleTimeMs<=0) + job=_jobs.take(); + else + { + // maybe we should shrink? + final int size=_threadsStarted.get(); + if (size>_minThreads) + { + long last=_lastShrink.get(); + long now=System.currentTimeMillis(); + if (last==0 || (now-last)>_maxIdleTimeMs) + { + shrink=_lastShrink.compareAndSet(last,now) && + _threadsStarted.compareAndSet(size,size-1); + if (shrink) + return; + } + } + job=idleJobPoll(); + } + } + } + finally + { + _threadsIdle.decrementAndGet(); + } + } + } + catch(InterruptedException e) + { + LOG.trace("",e); + } + catch(Exception e) + { + LOG.warn("",e); + } + finally + { + if (!shrink) + _threadsStarted.decrementAndGet(); + _threads.remove(Thread.currentThread()); + } + } + }; - /* ------------------------------------------------------------ */ - /** - * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> - * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> - * - * @param job the job to run - */ - protected void runJob(Runnable job) - { - job.run(); - } + /* ------------------------------------------------------------ */ + /** + * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> + * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> + * + * @param job the job to run + */ + protected void runJob(Runnable job) + { + job.run(); + } - /* ------------------------------------------------------------ */ - /** - * @return the job queue - */ - protected BlockingQueue<Runnable> getQueue() - { - return _jobs; - } + /* ------------------------------------------------------------ */ + /** + * @return the job queue + */ + protected BlockingQueue<Runnable> getQueue() + { + return _jobs; + } - /* ------------------------------------------------------------ */ - /** - * @param id The thread ID to stop. - * @return true if the thread was found and stopped. - * @deprecated Use {@link #interruptThread(long)} in preference - */ - @Deprecated - public boolean stopThread(long id) - { - for (Thread thread: _threads) - { - if (thread.getId()==id) - { - thread.stop(); - return true; - } - } - return false; - } + /* ------------------------------------------------------------ */ + /** + * @param id The thread ID to stop. + * @return true if the thread was found and stopped. + * @deprecated Use {@link #interruptThread(long)} in preference + */ + @Deprecated + public boolean stopThread(long id) + { + for (Thread thread: _threads) + { + if (thread.getId()==id) + { + thread.stop(); + return true; + } + } + return false; + } - /* ------------------------------------------------------------ */ - /** - * @param id The thread ID to interrupt. - * @return true if the thread was found and interrupted. - */ - public boolean interruptThread(long id) - { - for (Thread thread: _threads) - { - if (thread.getId()==id) - { - thread.interrupt(); - return true; - } - } - return false; - } + /* ------------------------------------------------------------ */ + /** + * @param id The thread ID to interrupt. + * @return true if the thread was found and interrupted. + */ + public boolean interruptThread(long id) + { + for (Thread thread: _threads) + { + if (thread.getId()==id) + { + thread.interrupt(); + return true; + } + } + return false; + } - /* ------------------------------------------------------------ */ - /** - * @param id The thread ID to interrupt. - * @return true if the thread was found and interrupted. - */ - public String dumpThread(long id) - { - for (Thread thread: _threads) - { - if (thread.getId()==id) - { - StringBuilder buf = new StringBuilder(); - buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); - for (StackTraceElement element : thread.getStackTrace()) - buf.append(" at ").append(element.toString()).append('\n'); - return buf.toString(); - } - } - return null; - } + /* ------------------------------------------------------------ */ + /** + * @param id The thread ID to interrupt. + * @return true if the thread was found and interrupted. + */ + public String dumpThread(long id) + { + for (Thread thread: _threads) + { + if (thread.getId()==id) + { + StringBuilder buf = new StringBuilder(); + buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); + for (StackTraceElement element : thread.getStackTrace()) + buf.append(" at ").append(element.toString()).append('\n'); + return buf.toString(); + } + } + return null; + } }