comparison src/org/eclipse/jetty/util/thread/QueuedThreadPool.java @ 863:88d3c8ff242a

remove SizedThreadPool
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 02 Oct 2016 05:22:55 -0600
parents 8e9db0bbf4f9
children
comparison
equal deleted inserted replaced
862:2bb375e94f64 863:88d3c8ff242a
37 import org.eclipse.jetty.util.component.AggregateLifeCycle; 37 import org.eclipse.jetty.util.component.AggregateLifeCycle;
38 import org.eclipse.jetty.util.component.Dumpable; 38 import org.eclipse.jetty.util.component.Dumpable;
39 import org.eclipse.jetty.util.component.LifeCycle; 39 import org.eclipse.jetty.util.component.LifeCycle;
40 import org.slf4j.Logger; 40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory; 41 import org.slf4j.LoggerFactory;
42 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; 42
43 43 public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor, Dumpable
44 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
45 { 44 {
46 private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class); 45 private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class);
47 46
48 private final AtomicInteger _threadsStarted = new AtomicInteger(); 47 private final AtomicInteger _threadsStarted = new AtomicInteger();
49 private final AtomicInteger _threadsIdle = new AtomicInteger(); 48 private final AtomicInteger _threadsIdle = new AtomicInteger();
50 private final AtomicLong _lastShrink = new AtomicLong(); 49 private final AtomicLong _lastShrink = new AtomicLong();
51 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); 50 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
52 private final Object _joinLock = new Object(); 51 private final Object _joinLock = new Object();
53 private BlockingQueue<Runnable> _jobs; 52 private BlockingQueue<Runnable> _jobs;
54 private String _name; 53 private String _name;
55 private int _maxIdleTimeMs=60000; 54 private int _maxIdleTimeMs=60000;
56 private int _maxThreads=254; 55 private int _maxThreads=254;
57 private int _minThreads=8; 56 private int _minThreads=8;
58 private int _maxQueued=-1; 57 private int _maxQueued=-1;
59 private int _priority=Thread.NORM_PRIORITY; 58 private int _priority=Thread.NORM_PRIORITY;
60 private boolean _daemon=false; 59 private boolean _daemon=false;
61 private int _maxStopTime=100; 60 private int _maxStopTime=100;
62 private boolean _detailedDump=false; 61 private boolean _detailedDump=false;
63 62
64 /* ------------------------------------------------------------------- */ 63 /* ------------------------------------------------------------------- */
65 /** Construct 64 /** Construct
66 */ 65 */
67 public QueuedThreadPool() 66 public QueuedThreadPool()
68 { 67 {
69 _name="qtp"+super.hashCode(); 68 _name="qtp"+super.hashCode();
70 } 69 }
71 70
72 /* ------------------------------------------------------------------- */ 71 /* ------------------------------------------------------------------- */
73 /** Construct 72 /** Construct
74 */ 73 */
75 public QueuedThreadPool(int maxThreads) 74 public QueuedThreadPool(int maxThreads)
76 { 75 {
77 this(); 76 this();
78 setMaxThreads(maxThreads); 77 setMaxThreads(maxThreads);
79 } 78 }
80 79
81 /* ------------------------------------------------------------------- */ 80 /* ------------------------------------------------------------------- */
82 /** Construct 81 /** Construct
83 */ 82 */
84 public QueuedThreadPool(BlockingQueue<Runnable> jobQ) 83 public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
85 { 84 {
86 this(); 85 this();
87 _jobs=jobQ; 86 _jobs=jobQ;
88 _jobs.clear(); 87 _jobs.clear();
89 } 88 }
90 89
91 90
92 /* ------------------------------------------------------------ */ 91 /* ------------------------------------------------------------ */
93 @Override 92 @Override
94 protected void doStart() throws Exception 93 protected void doStart() throws Exception
95 { 94 {
96 super.doStart(); 95 super.doStart();
97 _threadsStarted.set(0); 96 _threadsStarted.set(0);
98 97
99 if (_jobs==null) 98 if (_jobs==null)
100 { 99 {
101 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) 100 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
102 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); 101 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
103 } 102 }
104 103
105 int threads=_threadsStarted.get(); 104 int threads=_threadsStarted.get();
106 while (isRunning() && threads<_minThreads) 105 while (isRunning() && threads<_minThreads)
107 { 106 {
108 startThread(threads); 107 startThread(threads);
109 threads=_threadsStarted.get(); 108 threads=_threadsStarted.get();
110 } 109 }
111 } 110 }
112 111
113 /* ------------------------------------------------------------ */ 112 /* ------------------------------------------------------------ */
114 @Override 113 @Override
115 protected void doStop() throws Exception 114 protected void doStop() throws Exception
116 { 115 {
117 super.doStop(); 116 super.doStop();
118 long start=System.currentTimeMillis(); 117 long start=System.currentTimeMillis();
119 118
120 // let jobs complete naturally for a while 119 // let jobs complete naturally for a while
121 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) 120 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
122 Thread.sleep(1); 121 Thread.sleep(1);
123 122
124 // kill queued jobs and flush out idle jobs 123 // kill queued jobs and flush out idle jobs
125 _jobs.clear(); 124 _jobs.clear();
126 Runnable noop = new Runnable(){public void run(){}}; 125 Runnable noop = new Runnable(){public void run(){}};
127 for (int i=_threadsIdle.get();i-->0;) 126 for (int i=_threadsIdle.get();i-->0;)
128 _jobs.offer(noop); 127 _jobs.offer(noop);
129 Thread.yield(); 128 Thread.yield();
130 129
131 // interrupt remaining threads 130 // interrupt remaining threads
132 if (_threadsStarted.get()>0) 131 if (_threadsStarted.get()>0)
133 for (Thread thread : _threads) 132 for (Thread thread : _threads)
134 thread.interrupt(); 133 thread.interrupt();
135 134
136 // wait for remaining threads to die 135 // wait for remaining threads to die
137 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) 136 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
138 { 137 {
139 Thread.sleep(1); 138 Thread.sleep(1);
140 } 139 }
141 Thread.yield(); 140 Thread.yield();
142 int size=_threads.size(); 141 int size=_threads.size();
143 if (size>0) 142 if (size>0)
144 { 143 {
145 LOG.warn(size+" threads could not be stopped"); 144 LOG.warn(size+" threads could not be stopped");
146 145
147 if (size==1 || LOG.isDebugEnabled()) 146 if (size==1 || LOG.isDebugEnabled())
148 { 147 {
149 for (Thread unstopped : _threads) 148 for (Thread unstopped : _threads)
150 { 149 {
151 LOG.info("Couldn't stop "+unstopped); 150 LOG.info("Couldn't stop "+unstopped);
152 for (StackTraceElement element : unstopped.getStackTrace()) 151 for (StackTraceElement element : unstopped.getStackTrace())
153 { 152 {
154 LOG.info(" at "+element); 153 LOG.info(" at "+element);
155 } 154 }
156 } 155 }
157 } 156 }
158 } 157 }
159 158
160 synchronized (_joinLock) 159 synchronized (_joinLock)
161 { 160 {
162 _joinLock.notifyAll(); 161 _joinLock.notifyAll();
163 } 162 }
164 } 163 }
165 164
166 /* ------------------------------------------------------------ */ 165 /* ------------------------------------------------------------ */
167 /** 166 /**
168 * Delegated to the named or anonymous Pool. 167 * Delegated to the named or anonymous Pool.
169 */ 168 */
170 public void setDaemon(boolean daemon) 169 public void setDaemon(boolean daemon)
171 { 170 {
172 _daemon=daemon; 171 _daemon=daemon;
173 } 172 }
174 173
175 /* ------------------------------------------------------------ */ 174 /* ------------------------------------------------------------ */
176 /** Set the maximum thread idle time. 175 /** Set the maximum thread idle time.
177 * Threads that are idle for longer than this period may be 176 * Threads that are idle for longer than this period may be
178 * stopped. 177 * stopped.
179 * Delegated to the named or anonymous Pool. 178 * Delegated to the named or anonymous Pool.
180 * @see #getMaxIdleTimeMs 179 * @see #getMaxIdleTimeMs
181 * @param maxIdleTimeMs Max idle time in ms. 180 * @param maxIdleTimeMs Max idle time in ms.
182 */ 181 */
183 public void setMaxIdleTimeMs(int maxIdleTimeMs) 182 public void setMaxIdleTimeMs(int maxIdleTimeMs)
184 { 183 {
185 _maxIdleTimeMs=maxIdleTimeMs; 184 _maxIdleTimeMs=maxIdleTimeMs;
186 } 185 }
187 186
188 /* ------------------------------------------------------------ */ 187 /* ------------------------------------------------------------ */
189 /** 188 /**
190 * @param stopTimeMs maximum total time that stop() will wait for threads to die. 189 * @param stopTimeMs maximum total time that stop() will wait for threads to die.
191 */ 190 */
192 public void setMaxStopTimeMs(int stopTimeMs) 191 public void setMaxStopTimeMs(int stopTimeMs)
193 { 192 {
194 _maxStopTime = stopTimeMs; 193 _maxStopTime = stopTimeMs;
195 } 194 }
196 195
197 /* ------------------------------------------------------------ */ 196 /* ------------------------------------------------------------ */
198 /** Set the maximum number of threads. 197 /** Set the maximum number of threads.
199 * Delegated to the named or anonymous Pool. 198 * Delegated to the named or anonymous Pool.
200 * @see #getMaxThreads 199 * @see #getMaxThreads
201 * @param maxThreads maximum number of threads. 200 * @param maxThreads maximum number of threads.
202 */ 201 */
203 public void setMaxThreads(int maxThreads) 202 public void setMaxThreads(int maxThreads)
204 { 203 {
205 _maxThreads=maxThreads; 204 _maxThreads=maxThreads;
206 if (_minThreads>_maxThreads) 205 if (_minThreads>_maxThreads)
207 _minThreads=_maxThreads; 206 _minThreads=_maxThreads;
208 } 207 }
209 208
210 /* ------------------------------------------------------------ */ 209 /* ------------------------------------------------------------ */
211 /** Set the minimum number of threads. 210 /** Set the minimum number of threads.
212 * Delegated to the named or anonymous Pool. 211 * Delegated to the named or anonymous Pool.
213 * @see #getMinThreads 212 * @see #getMinThreads
214 * @param minThreads minimum number of threads 213 * @param minThreads minimum number of threads
215 */ 214 */
216 public void setMinThreads(int minThreads) 215 public void setMinThreads(int minThreads)
217 { 216 {
218 _minThreads=minThreads; 217 _minThreads=minThreads;
219 218
220 if (_minThreads>_maxThreads) 219 if (_minThreads>_maxThreads)
221 _maxThreads=_minThreads; 220 _maxThreads=_minThreads;
222 221
223 int threads=_threadsStarted.get(); 222 int threads=_threadsStarted.get();
224 while (isStarted() && threads<_minThreads) 223 while (isStarted() && threads<_minThreads)
225 { 224 {
226 startThread(threads); 225 startThread(threads);
227 threads=_threadsStarted.get(); 226 threads=_threadsStarted.get();
228 } 227 }
229 } 228 }
230 229
231 /* ------------------------------------------------------------ */ 230 /* ------------------------------------------------------------ */
232 /** 231 /**
233 * @param name Name of the BoundedThreadPool to use when naming Threads. 232 * @param name Name of the BoundedThreadPool to use when naming Threads.
234 */ 233 */
235 public void setName(String name) 234 public void setName(String name)
236 { 235 {
237 if (isRunning()) 236 if (isRunning())
238 throw new IllegalStateException("started"); 237 throw new IllegalStateException("started");
239 _name= name; 238 _name= name;
240 } 239 }
241 240
242 /* ------------------------------------------------------------ */ 241 /* ------------------------------------------------------------ */
243 /** Set the priority of the pool threads. 242 /** Set the priority of the pool threads.
244 * @param priority the new thread priority. 243 * @param priority the new thread priority.
245 */ 244 */
246 public void setThreadsPriority(int priority) 245 public void setThreadsPriority(int priority)
247 { 246 {
248 _priority=priority; 247 _priority=priority;
249 } 248 }
250 249
251 /* ------------------------------------------------------------ */ 250 /* ------------------------------------------------------------ */
252 /** 251 /**
253 * @return maximum queue size 252 * @return maximum queue size
254 */ 253 */
255 public int getMaxQueued() 254 public int getMaxQueued()
256 { 255 {
257 return _maxQueued; 256 return _maxQueued;
258 } 257 }
259 258
260 /* ------------------------------------------------------------ */ 259 /* ------------------------------------------------------------ */
261 /** 260 /**
262 * @param max job queue size 261 * @param max job queue size
263 */ 262 */
264 public void setMaxQueued(int max) 263 public void setMaxQueued(int max)
265 { 264 {
266 if (isRunning()) 265 if (isRunning())
267 throw new IllegalStateException("started"); 266 throw new IllegalStateException("started");
268 _maxQueued=max; 267 _maxQueued=max;
269 } 268 }
270 269
271 /* ------------------------------------------------------------ */ 270 /* ------------------------------------------------------------ */
272 /** Get the maximum thread idle time. 271 /** Get the maximum thread idle time.
273 * Delegated to the named or anonymous Pool. 272 * Delegated to the named or anonymous Pool.
274 * @see #setMaxIdleTimeMs 273 * @see #setMaxIdleTimeMs
275 * @return Max idle time in ms. 274 * @return Max idle time in ms.
276 */ 275 */
277 public int getMaxIdleTimeMs() 276 public int getMaxIdleTimeMs()
278 { 277 {
279 return _maxIdleTimeMs; 278 return _maxIdleTimeMs;
280 } 279 }
281 280
282 /* ------------------------------------------------------------ */ 281 /* ------------------------------------------------------------ */
283 /** 282 /**
284 * @return maximum total time that stop() will wait for threads to die. 283 * @return maximum total time that stop() will wait for threads to die.
285 */ 284 */
286 public int getMaxStopTimeMs() 285 public int getMaxStopTimeMs()
287 { 286 {
288 return _maxStopTime; 287 return _maxStopTime;
289 } 288 }
290 289
291 /* ------------------------------------------------------------ */ 290 /* ------------------------------------------------------------ */
292 /** Set the maximum number of threads. 291 /** Set the maximum number of threads.
293 * Delegated to the named or anonymous Pool. 292 * Delegated to the named or anonymous Pool.
294 * @see #setMaxThreads 293 * @see #setMaxThreads
295 * @return maximum number of threads. 294 * @return maximum number of threads.
296 */ 295 */
297 public int getMaxThreads() 296 public int getMaxThreads()
298 { 297 {
299 return _maxThreads; 298 return _maxThreads;
300 } 299 }
301 300
302 /* ------------------------------------------------------------ */ 301 /* ------------------------------------------------------------ */
303 /** Get the minimum number of threads. 302 /** Get the minimum number of threads.
304 * Delegated to the named or anonymous Pool. 303 * Delegated to the named or anonymous Pool.
305 * @see #setMinThreads 304 * @see #setMinThreads
306 * @return minimum number of threads. 305 * @return minimum number of threads.
307 */ 306 */
308 public int getMinThreads() 307 public int getMinThreads()
309 { 308 {
310 return _minThreads; 309 return _minThreads;
311 } 310 }
312 311
313 /* ------------------------------------------------------------ */ 312 /* ------------------------------------------------------------ */
314 /** 313 /**
315 * @return The name of the BoundedThreadPool. 314 * @return The name of the BoundedThreadPool.
316 */ 315 */
317 public String getName() 316 public String getName()
318 { 317 {
319 return _name; 318 return _name;
320 } 319 }
321 320
322 /* ------------------------------------------------------------ */ 321 /* ------------------------------------------------------------ */
323 /** Get the priority of the pool threads. 322 /** Get the priority of the pool threads.
324 * @return the priority of the pool threads. 323 * @return the priority of the pool threads.
325 */ 324 */
326 public int getThreadsPriority() 325 public int getThreadsPriority()
327 { 326 {
328 return _priority; 327 return _priority;
329 } 328 }
330 329
331 /* ------------------------------------------------------------ */ 330 /* ------------------------------------------------------------ */
332 /** 331 /**
333 * Delegated to the named or anonymous Pool. 332 * Delegated to the named or anonymous Pool.
334 */ 333 */
335 public boolean isDaemon() 334 public boolean isDaemon()
336 { 335 {
337 return _daemon; 336 return _daemon;
338 } 337 }
339 338
340 /* ------------------------------------------------------------ */ 339 /* ------------------------------------------------------------ */
341 public boolean isDetailedDump() 340 public boolean isDetailedDump()
342 { 341 {
343 return _detailedDump; 342 return _detailedDump;
344 } 343 }
345 344
346 /* ------------------------------------------------------------ */ 345 /* ------------------------------------------------------------ */
347 public void setDetailedDump(boolean detailedDump) 346 public void setDetailedDump(boolean detailedDump)
348 { 347 {
349 _detailedDump = detailedDump; 348 _detailedDump = detailedDump;
350 } 349 }
351 350
352 /* ------------------------------------------------------------ */ 351 /* ------------------------------------------------------------ */
353 public boolean dispatch(Runnable job) 352 public boolean dispatch(Runnable job)
354 { 353 {
355 if (isRunning()) 354 if (isRunning())
356 { 355 {
357 final int jobQ = _jobs.size(); 356 final int jobQ = _jobs.size();
358 final int idle = getIdleThreads(); 357 final int idle = getIdleThreads();
359 if(_jobs.offer(job)) 358 if(_jobs.offer(job))
360 { 359 {
361 // If we had no idle threads or the jobQ is greater than the idle threads 360 // If we had no idle threads or the jobQ is greater than the idle threads
362 if (idle==0 || jobQ>idle) 361 if (idle==0 || jobQ>idle)
363 { 362 {
364 int threads=_threadsStarted.get(); 363 int threads=_threadsStarted.get();
365 if (threads<_maxThreads) 364 if (threads<_maxThreads)
366 startThread(threads); 365 startThread(threads);
367 } 366 }
368 return true; 367 return true;
369 } 368 }
370 } 369 }
371 LOG.debug("Dispatched {} to stopped {}",job,this); 370 LOG.debug("Dispatched {} to stopped {}",job,this);
372 return false; 371 return false;
373 } 372 }
374 373
375 /* ------------------------------------------------------------ */ 374 /* ------------------------------------------------------------ */
376 public void execute(Runnable job) 375 public void execute(Runnable job)
377 { 376 {
378 if (!dispatch(job)) 377 if (!dispatch(job))
379 throw new RejectedExecutionException(); 378 throw new RejectedExecutionException();
380 } 379 }
381 380
382 /* ------------------------------------------------------------ */ 381 /* ------------------------------------------------------------ */
383 /** 382 /**
384 * Blocks until the thread pool is {@link LifeCycle#stop stopped}. 383 * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
385 */ 384 */
386 public void join() throws InterruptedException 385 public void join() throws InterruptedException
387 { 386 {
388 synchronized (_joinLock) 387 synchronized (_joinLock)
389 { 388 {
390 while (isRunning()) 389 while (isRunning())
391 _joinLock.wait(); 390 _joinLock.wait();
392 } 391 }
393 392
394 while (isStopping()) 393 while (isStopping())
395 Thread.sleep(1); 394 Thread.sleep(1);
396 } 395 }
397 396
398 /* ------------------------------------------------------------ */ 397 /* ------------------------------------------------------------ */
399 /** 398 /**
400 * @return The total number of threads currently in the pool 399 * @return The total number of threads currently in the pool
401 */ 400 */
402 public int getThreads() 401 public int getThreads()
403 { 402 {
404 return _threadsStarted.get(); 403 return _threadsStarted.get();
405 } 404 }
406 405
407 /* ------------------------------------------------------------ */ 406 /* ------------------------------------------------------------ */
408 /** 407 /**
409 * @return The number of idle threads in the pool 408 * @return The number of idle threads in the pool
410 */ 409 */
411 public int getIdleThreads() 410 public int getIdleThreads()
412 { 411 {
413 return _threadsIdle.get(); 412 return _threadsIdle.get();
414 } 413 }
415 414
416 /* ------------------------------------------------------------ */ 415 /* ------------------------------------------------------------ */
417 /** 416 /**
418 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs 417 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
419 */ 418 */
420 public boolean isLowOnThreads() 419 public boolean isLowOnThreads()
421 { 420 {
422 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); 421 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
423 } 422 }
424 423
425 /* ------------------------------------------------------------ */ 424 /* ------------------------------------------------------------ */
426 private boolean startThread(int threads) 425 private boolean startThread(int threads)
427 { 426 {
428 final int next=threads+1; 427 final int next=threads+1;
429 if (!_threadsStarted.compareAndSet(threads,next)) 428 if (!_threadsStarted.compareAndSet(threads,next))
430 return false; 429 return false;
431 430
432 boolean started=false; 431 boolean started=false;
433 try 432 try
434 { 433 {
435 Thread thread=newThread(_runnable); 434 Thread thread=newThread(_runnable);
436 thread.setDaemon(_daemon); 435 thread.setDaemon(_daemon);
437 thread.setPriority(_priority); 436 thread.setPriority(_priority);
438 thread.setName(_name+"-"+thread.getId()); 437 thread.setName(_name+"-"+thread.getId());
439 _threads.add(thread); 438 _threads.add(thread);
440 439
441 thread.start(); 440 thread.start();
442 started=true; 441 started=true;
443 } 442 }
444 finally 443 finally
445 { 444 {
446 if (!started) 445 if (!started)
447 _threadsStarted.decrementAndGet(); 446 _threadsStarted.decrementAndGet();
448 } 447 }
449 return started; 448 return started;
450 } 449 }
451 450
452 /* ------------------------------------------------------------ */ 451 /* ------------------------------------------------------------ */
453 protected Thread newThread(Runnable runnable) 452 protected Thread newThread(Runnable runnable)
454 { 453 {
455 return new Thread(runnable); 454 return new Thread(runnable);
456 } 455 }
457 456
458 457
459 /* ------------------------------------------------------------ */ 458 /* ------------------------------------------------------------ */
460 public String dump() 459 public String dump()
461 { 460 {
462 return AggregateLifeCycle.dump(this); 461 return AggregateLifeCycle.dump(this);
463 } 462 }
464 463
465 /* ------------------------------------------------------------ */ 464 /* ------------------------------------------------------------ */
466 public void dump(Appendable out, String indent) throws IOException 465 public void dump(Appendable out, String indent) throws IOException
467 { 466 {
468 List<Object> dump = new ArrayList<Object>(getMaxThreads()); 467 List<Object> dump = new ArrayList<Object>(getMaxThreads());
469 for (final Thread thread: _threads) 468 for (final Thread thread: _threads)
470 { 469 {
471 final StackTraceElement[] trace=thread.getStackTrace(); 470 final StackTraceElement[] trace=thread.getStackTrace();
472 boolean inIdleJobPoll=false; 471 boolean inIdleJobPoll=false;
473 // trace can be null on early java 6 jvms 472 // trace can be null on early java 6 jvms
474 if (trace != null) 473 if (trace != null)
475 { 474 {
476 for (StackTraceElement t : trace) 475 for (StackTraceElement t : trace)
477 { 476 {
478 if ("idleJobPoll".equals(t.getMethodName())) 477 if ("idleJobPoll".equals(t.getMethodName()))
479 { 478 {
480 inIdleJobPoll = true; 479 inIdleJobPoll = true;
481 break; 480 break;
482 } 481 }
483 } 482 }
484 } 483 }
485 final boolean idle=inIdleJobPoll; 484 final boolean idle=inIdleJobPoll;
486 485
487 if (_detailedDump) 486 if (_detailedDump)
488 { 487 {
489 dump.add(new Dumpable() 488 dump.add(new Dumpable()
490 { 489 {
491 public void dump(Appendable out, String indent) throws IOException 490 public void dump(Appendable out, String indent) throws IOException
492 { 491 {
493 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); 492 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
494 if (!idle) 493 if (!idle)
495 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); 494 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
496 } 495 }
497 496
498 public String dump() 497 public String dump()
499 { 498 {
500 return null; 499 return null;
501 } 500 }
502 }); 501 });
503 } 502 }
504 else 503 else
505 { 504 {
506 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); 505 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
507 } 506 }
508 } 507 }
509 508
510 AggregateLifeCycle.dumpObject(out,this); 509 AggregateLifeCycle.dumpObject(out,this);
511 AggregateLifeCycle.dump(out,indent,dump); 510 AggregateLifeCycle.dump(out,indent,dump);
512 511
513 } 512 }
514 513
515 514
516 /* ------------------------------------------------------------ */ 515 /* ------------------------------------------------------------ */
517 @Override 516 @Override
518 public String toString() 517 public String toString()
519 { 518 {
520 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; 519 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}";
521 } 520 }
522 521
523 /* ------------------------------------------------------------ */ 522 /* ------------------------------------------------------------ */
524 private Runnable idleJobPoll() throws InterruptedException 523 private Runnable idleJobPoll() throws InterruptedException
525 { 524 {
526 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); 525 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
527 } 526 }
528 527
529 /* ------------------------------------------------------------ */ 528 /* ------------------------------------------------------------ */
530 private Runnable _runnable = new Runnable() 529 private Runnable _runnable = new Runnable()
531 { 530 {
532 public void run() 531 public void run()
533 { 532 {
534 boolean shrink=false; 533 boolean shrink=false;
535 try 534 try
536 { 535 {
537 Runnable job=_jobs.poll(); 536 Runnable job=_jobs.poll();
538 while (isRunning()) 537 while (isRunning())
539 { 538 {
540 // Job loop 539 // Job loop
541 while (job!=null && isRunning()) 540 while (job!=null && isRunning())
542 { 541 {
543 runJob(job); 542 runJob(job);
544 job=_jobs.poll(); 543 job=_jobs.poll();
545 } 544 }
546 545
547 // Idle loop 546 // Idle loop
548 try 547 try
549 { 548 {
550 _threadsIdle.incrementAndGet(); 549 _threadsIdle.incrementAndGet();
551 550
552 while (isRunning() && job==null) 551 while (isRunning() && job==null)
553 { 552 {
554 if (_maxIdleTimeMs<=0) 553 if (_maxIdleTimeMs<=0)
555 job=_jobs.take(); 554 job=_jobs.take();
556 else 555 else
557 { 556 {
558 // maybe we should shrink? 557 // maybe we should shrink?
559 final int size=_threadsStarted.get(); 558 final int size=_threadsStarted.get();
560 if (size>_minThreads) 559 if (size>_minThreads)
561 { 560 {
562 long last=_lastShrink.get(); 561 long last=_lastShrink.get();
563 long now=System.currentTimeMillis(); 562 long now=System.currentTimeMillis();
564 if (last==0 || (now-last)>_maxIdleTimeMs) 563 if (last==0 || (now-last)>_maxIdleTimeMs)
565 { 564 {
566 shrink=_lastShrink.compareAndSet(last,now) && 565 shrink=_lastShrink.compareAndSet(last,now) &&
567 _threadsStarted.compareAndSet(size,size-1); 566 _threadsStarted.compareAndSet(size,size-1);
568 if (shrink) 567 if (shrink)
569 return; 568 return;
570 } 569 }
571 } 570 }
572 job=idleJobPoll(); 571 job=idleJobPoll();
573 } 572 }
574 } 573 }
575 } 574 }
576 finally 575 finally
577 { 576 {
578 _threadsIdle.decrementAndGet(); 577 _threadsIdle.decrementAndGet();
579 } 578 }
580 } 579 }
581 } 580 }
582 catch(InterruptedException e) 581 catch(InterruptedException e)
583 { 582 {
584 LOG.trace("",e); 583 LOG.trace("",e);
585 } 584 }
586 catch(Exception e) 585 catch(Exception e)
587 { 586 {
588 LOG.warn("",e); 587 LOG.warn("",e);
589 } 588 }
590 finally 589 finally
591 { 590 {
592 if (!shrink) 591 if (!shrink)
593 _threadsStarted.decrementAndGet(); 592 _threadsStarted.decrementAndGet();
594 _threads.remove(Thread.currentThread()); 593 _threads.remove(Thread.currentThread());
595 } 594 }
596 } 595 }
597 }; 596 };
598 597
599 /* ------------------------------------------------------------ */ 598 /* ------------------------------------------------------------ */
600 /** 599 /**
601 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> 600 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
602 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> 601 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
603 * 602 *
604 * @param job the job to run 603 * @param job the job to run
605 */ 604 */
606 protected void runJob(Runnable job) 605 protected void runJob(Runnable job)
607 { 606 {
608 job.run(); 607 job.run();
609 } 608 }
610 609
611 /* ------------------------------------------------------------ */ 610 /* ------------------------------------------------------------ */
612 /** 611 /**
613 * @return the job queue 612 * @return the job queue
614 */ 613 */
615 protected BlockingQueue<Runnable> getQueue() 614 protected BlockingQueue<Runnable> getQueue()
616 { 615 {
617 return _jobs; 616 return _jobs;
618 } 617 }
619 618
620 /* ------------------------------------------------------------ */ 619 /* ------------------------------------------------------------ */
621 /** 620 /**
622 * @param id The thread ID to stop. 621 * @param id The thread ID to stop.
623 * @return true if the thread was found and stopped. 622 * @return true if the thread was found and stopped.
624 * @deprecated Use {@link #interruptThread(long)} in preference 623 * @deprecated Use {@link #interruptThread(long)} in preference
625 */ 624 */
626 @Deprecated 625 @Deprecated
627 public boolean stopThread(long id) 626 public boolean stopThread(long id)
628 { 627 {
629 for (Thread thread: _threads) 628 for (Thread thread: _threads)
630 { 629 {
631 if (thread.getId()==id) 630 if (thread.getId()==id)
632 { 631 {
633 thread.stop(); 632 thread.stop();
634 return true; 633 return true;
635 } 634 }
636 } 635 }
637 return false; 636 return false;
638 } 637 }
639 638
640 /* ------------------------------------------------------------ */ 639 /* ------------------------------------------------------------ */
641 /** 640 /**
642 * @param id The thread ID to interrupt. 641 * @param id The thread ID to interrupt.
643 * @return true if the thread was found and interrupted. 642 * @return true if the thread was found and interrupted.
644 */ 643 */
645 public boolean interruptThread(long id) 644 public boolean interruptThread(long id)
646 { 645 {
647 for (Thread thread: _threads) 646 for (Thread thread: _threads)
648 { 647 {
649 if (thread.getId()==id) 648 if (thread.getId()==id)
650 { 649 {
651 thread.interrupt(); 650 thread.interrupt();
652 return true; 651 return true;
653 } 652 }
654 } 653 }
655 return false; 654 return false;
656 } 655 }
657 656
658 /* ------------------------------------------------------------ */ 657 /* ------------------------------------------------------------ */
659 /** 658 /**
660 * @param id The thread ID to interrupt. 659 * @param id The thread ID to interrupt.
661 * @return true if the thread was found and interrupted. 660 * @return true if the thread was found and interrupted.
662 */ 661 */
663 public String dumpThread(long id) 662 public String dumpThread(long id)
664 { 663 {
665 for (Thread thread: _threads) 664 for (Thread thread: _threads)
666 { 665 {
667 if (thread.getId()==id) 666 if (thread.getId()==id)
668 { 667 {
669 StringBuilder buf = new StringBuilder(); 668 StringBuilder buf = new StringBuilder();
670 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); 669 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
671 for (StackTraceElement element : thread.getStackTrace()) 670 for (StackTraceElement element : thread.getStackTrace())
672 buf.append(" at ").append(element.toString()).append('\n'); 671 buf.append(" at ").append(element.toString()).append('\n');
673 return buf.toString(); 672 return buf.toString();
674 } 673 }
675 } 674 }
676 return null; 675 return null;
677 } 676 }
678 } 677 }