Mercurial Hosting > luan
comparison src/org/eclipse/jetty/util/thread/QueuedThreadPool.java @ 863:88d3c8ff242a
remove SizedThreadPool
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 05:22:55 -0600 |
parents | 8e9db0bbf4f9 |
children |
comparison
equal
deleted
inserted
replaced
862:2bb375e94f64 | 863:88d3c8ff242a |
---|---|
37 import org.eclipse.jetty.util.component.AggregateLifeCycle; | 37 import org.eclipse.jetty.util.component.AggregateLifeCycle; |
38 import org.eclipse.jetty.util.component.Dumpable; | 38 import org.eclipse.jetty.util.component.Dumpable; |
39 import org.eclipse.jetty.util.component.LifeCycle; | 39 import org.eclipse.jetty.util.component.LifeCycle; |
40 import org.slf4j.Logger; | 40 import org.slf4j.Logger; |
41 import org.slf4j.LoggerFactory; | 41 import org.slf4j.LoggerFactory; |
42 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; | 42 |
43 | 43 public class QueuedThreadPool extends AbstractLifeCycle implements ThreadPool, Executor, Dumpable |
44 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable | |
45 { | 44 { |
46 private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class); | 45 private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class); |
47 | 46 |
48 private final AtomicInteger _threadsStarted = new AtomicInteger(); | 47 private final AtomicInteger _threadsStarted = new AtomicInteger(); |
49 private final AtomicInteger _threadsIdle = new AtomicInteger(); | 48 private final AtomicInteger _threadsIdle = new AtomicInteger(); |
50 private final AtomicLong _lastShrink = new AtomicLong(); | 49 private final AtomicLong _lastShrink = new AtomicLong(); |
51 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); | 50 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); |
52 private final Object _joinLock = new Object(); | 51 private final Object _joinLock = new Object(); |
53 private BlockingQueue<Runnable> _jobs; | 52 private BlockingQueue<Runnable> _jobs; |
54 private String _name; | 53 private String _name; |
55 private int _maxIdleTimeMs=60000; | 54 private int _maxIdleTimeMs=60000; |
56 private int _maxThreads=254; | 55 private int _maxThreads=254; |
57 private int _minThreads=8; | 56 private int _minThreads=8; |
58 private int _maxQueued=-1; | 57 private int _maxQueued=-1; |
59 private int _priority=Thread.NORM_PRIORITY; | 58 private int _priority=Thread.NORM_PRIORITY; |
60 private boolean _daemon=false; | 59 private boolean _daemon=false; |
61 private int _maxStopTime=100; | 60 private int _maxStopTime=100; |
62 private boolean _detailedDump=false; | 61 private boolean _detailedDump=false; |
63 | 62 |
64 /* ------------------------------------------------------------------- */ | 63 /* ------------------------------------------------------------------- */ |
65 /** Construct | 64 /** Construct |
66 */ | 65 */ |
67 public QueuedThreadPool() | 66 public QueuedThreadPool() |
68 { | 67 { |
69 _name="qtp"+super.hashCode(); | 68 _name="qtp"+super.hashCode(); |
70 } | 69 } |
71 | 70 |
72 /* ------------------------------------------------------------------- */ | 71 /* ------------------------------------------------------------------- */ |
73 /** Construct | 72 /** Construct |
74 */ | 73 */ |
75 public QueuedThreadPool(int maxThreads) | 74 public QueuedThreadPool(int maxThreads) |
76 { | 75 { |
77 this(); | 76 this(); |
78 setMaxThreads(maxThreads); | 77 setMaxThreads(maxThreads); |
79 } | 78 } |
80 | 79 |
81 /* ------------------------------------------------------------------- */ | 80 /* ------------------------------------------------------------------- */ |
82 /** Construct | 81 /** Construct |
83 */ | 82 */ |
84 public QueuedThreadPool(BlockingQueue<Runnable> jobQ) | 83 public QueuedThreadPool(BlockingQueue<Runnable> jobQ) |
85 { | 84 { |
86 this(); | 85 this(); |
87 _jobs=jobQ; | 86 _jobs=jobQ; |
88 _jobs.clear(); | 87 _jobs.clear(); |
89 } | 88 } |
90 | 89 |
91 | 90 |
92 /* ------------------------------------------------------------ */ | 91 /* ------------------------------------------------------------ */ |
93 @Override | 92 @Override |
94 protected void doStart() throws Exception | 93 protected void doStart() throws Exception |
95 { | 94 { |
96 super.doStart(); | 95 super.doStart(); |
97 _threadsStarted.set(0); | 96 _threadsStarted.set(0); |
98 | 97 |
99 if (_jobs==null) | 98 if (_jobs==null) |
100 { | 99 { |
101 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) | 100 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) |
102 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); | 101 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); |
103 } | 102 } |
104 | 103 |
105 int threads=_threadsStarted.get(); | 104 int threads=_threadsStarted.get(); |
106 while (isRunning() && threads<_minThreads) | 105 while (isRunning() && threads<_minThreads) |
107 { | 106 { |
108 startThread(threads); | 107 startThread(threads); |
109 threads=_threadsStarted.get(); | 108 threads=_threadsStarted.get(); |
110 } | 109 } |
111 } | 110 } |
112 | 111 |
113 /* ------------------------------------------------------------ */ | 112 /* ------------------------------------------------------------ */ |
114 @Override | 113 @Override |
115 protected void doStop() throws Exception | 114 protected void doStop() throws Exception |
116 { | 115 { |
117 super.doStop(); | 116 super.doStop(); |
118 long start=System.currentTimeMillis(); | 117 long start=System.currentTimeMillis(); |
119 | 118 |
120 // let jobs complete naturally for a while | 119 // let jobs complete naturally for a while |
121 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) | 120 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) |
122 Thread.sleep(1); | 121 Thread.sleep(1); |
123 | 122 |
124 // kill queued jobs and flush out idle jobs | 123 // kill queued jobs and flush out idle jobs |
125 _jobs.clear(); | 124 _jobs.clear(); |
126 Runnable noop = new Runnable(){public void run(){}}; | 125 Runnable noop = new Runnable(){public void run(){}}; |
127 for (int i=_threadsIdle.get();i-->0;) | 126 for (int i=_threadsIdle.get();i-->0;) |
128 _jobs.offer(noop); | 127 _jobs.offer(noop); |
129 Thread.yield(); | 128 Thread.yield(); |
130 | 129 |
131 // interrupt remaining threads | 130 // interrupt remaining threads |
132 if (_threadsStarted.get()>0) | 131 if (_threadsStarted.get()>0) |
133 for (Thread thread : _threads) | 132 for (Thread thread : _threads) |
134 thread.interrupt(); | 133 thread.interrupt(); |
135 | 134 |
136 // wait for remaining threads to die | 135 // wait for remaining threads to die |
137 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) | 136 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) |
138 { | 137 { |
139 Thread.sleep(1); | 138 Thread.sleep(1); |
140 } | 139 } |
141 Thread.yield(); | 140 Thread.yield(); |
142 int size=_threads.size(); | 141 int size=_threads.size(); |
143 if (size>0) | 142 if (size>0) |
144 { | 143 { |
145 LOG.warn(size+" threads could not be stopped"); | 144 LOG.warn(size+" threads could not be stopped"); |
146 | 145 |
147 if (size==1 || LOG.isDebugEnabled()) | 146 if (size==1 || LOG.isDebugEnabled()) |
148 { | 147 { |
149 for (Thread unstopped : _threads) | 148 for (Thread unstopped : _threads) |
150 { | 149 { |
151 LOG.info("Couldn't stop "+unstopped); | 150 LOG.info("Couldn't stop "+unstopped); |
152 for (StackTraceElement element : unstopped.getStackTrace()) | 151 for (StackTraceElement element : unstopped.getStackTrace()) |
153 { | 152 { |
154 LOG.info(" at "+element); | 153 LOG.info(" at "+element); |
155 } | 154 } |
156 } | 155 } |
157 } | 156 } |
158 } | 157 } |
159 | 158 |
160 synchronized (_joinLock) | 159 synchronized (_joinLock) |
161 { | 160 { |
162 _joinLock.notifyAll(); | 161 _joinLock.notifyAll(); |
163 } | 162 } |
164 } | 163 } |
165 | 164 |
166 /* ------------------------------------------------------------ */ | 165 /* ------------------------------------------------------------ */ |
167 /** | 166 /** |
168 * Delegated to the named or anonymous Pool. | 167 * Delegated to the named or anonymous Pool. |
169 */ | 168 */ |
170 public void setDaemon(boolean daemon) | 169 public void setDaemon(boolean daemon) |
171 { | 170 { |
172 _daemon=daemon; | 171 _daemon=daemon; |
173 } | 172 } |
174 | 173 |
175 /* ------------------------------------------------------------ */ | 174 /* ------------------------------------------------------------ */ |
176 /** Set the maximum thread idle time. | 175 /** Set the maximum thread idle time. |
177 * Threads that are idle for longer than this period may be | 176 * Threads that are idle for longer than this period may be |
178 * stopped. | 177 * stopped. |
179 * Delegated to the named or anonymous Pool. | 178 * Delegated to the named or anonymous Pool. |
180 * @see #getMaxIdleTimeMs | 179 * @see #getMaxIdleTimeMs |
181 * @param maxIdleTimeMs Max idle time in ms. | 180 * @param maxIdleTimeMs Max idle time in ms. |
182 */ | 181 */ |
183 public void setMaxIdleTimeMs(int maxIdleTimeMs) | 182 public void setMaxIdleTimeMs(int maxIdleTimeMs) |
184 { | 183 { |
185 _maxIdleTimeMs=maxIdleTimeMs; | 184 _maxIdleTimeMs=maxIdleTimeMs; |
186 } | 185 } |
187 | 186 |
188 /* ------------------------------------------------------------ */ | 187 /* ------------------------------------------------------------ */ |
189 /** | 188 /** |
190 * @param stopTimeMs maximum total time that stop() will wait for threads to die. | 189 * @param stopTimeMs maximum total time that stop() will wait for threads to die. |
191 */ | 190 */ |
192 public void setMaxStopTimeMs(int stopTimeMs) | 191 public void setMaxStopTimeMs(int stopTimeMs) |
193 { | 192 { |
194 _maxStopTime = stopTimeMs; | 193 _maxStopTime = stopTimeMs; |
195 } | 194 } |
196 | 195 |
197 /* ------------------------------------------------------------ */ | 196 /* ------------------------------------------------------------ */ |
198 /** Set the maximum number of threads. | 197 /** Set the maximum number of threads. |
199 * Delegated to the named or anonymous Pool. | 198 * Delegated to the named or anonymous Pool. |
200 * @see #getMaxThreads | 199 * @see #getMaxThreads |
201 * @param maxThreads maximum number of threads. | 200 * @param maxThreads maximum number of threads. |
202 */ | 201 */ |
203 public void setMaxThreads(int maxThreads) | 202 public void setMaxThreads(int maxThreads) |
204 { | 203 { |
205 _maxThreads=maxThreads; | 204 _maxThreads=maxThreads; |
206 if (_minThreads>_maxThreads) | 205 if (_minThreads>_maxThreads) |
207 _minThreads=_maxThreads; | 206 _minThreads=_maxThreads; |
208 } | 207 } |
209 | 208 |
210 /* ------------------------------------------------------------ */ | 209 /* ------------------------------------------------------------ */ |
211 /** Set the minimum number of threads. | 210 /** Set the minimum number of threads. |
212 * Delegated to the named or anonymous Pool. | 211 * Delegated to the named or anonymous Pool. |
213 * @see #getMinThreads | 212 * @see #getMinThreads |
214 * @param minThreads minimum number of threads | 213 * @param minThreads minimum number of threads |
215 */ | 214 */ |
216 public void setMinThreads(int minThreads) | 215 public void setMinThreads(int minThreads) |
217 { | 216 { |
218 _minThreads=minThreads; | 217 _minThreads=minThreads; |
219 | 218 |
220 if (_minThreads>_maxThreads) | 219 if (_minThreads>_maxThreads) |
221 _maxThreads=_minThreads; | 220 _maxThreads=_minThreads; |
222 | 221 |
223 int threads=_threadsStarted.get(); | 222 int threads=_threadsStarted.get(); |
224 while (isStarted() && threads<_minThreads) | 223 while (isStarted() && threads<_minThreads) |
225 { | 224 { |
226 startThread(threads); | 225 startThread(threads); |
227 threads=_threadsStarted.get(); | 226 threads=_threadsStarted.get(); |
228 } | 227 } |
229 } | 228 } |
230 | 229 |
231 /* ------------------------------------------------------------ */ | 230 /* ------------------------------------------------------------ */ |
232 /** | 231 /** |
233 * @param name Name of the BoundedThreadPool to use when naming Threads. | 232 * @param name Name of the BoundedThreadPool to use when naming Threads. |
234 */ | 233 */ |
235 public void setName(String name) | 234 public void setName(String name) |
236 { | 235 { |
237 if (isRunning()) | 236 if (isRunning()) |
238 throw new IllegalStateException("started"); | 237 throw new IllegalStateException("started"); |
239 _name= name; | 238 _name= name; |
240 } | 239 } |
241 | 240 |
242 /* ------------------------------------------------------------ */ | 241 /* ------------------------------------------------------------ */ |
243 /** Set the priority of the pool threads. | 242 /** Set the priority of the pool threads. |
244 * @param priority the new thread priority. | 243 * @param priority the new thread priority. |
245 */ | 244 */ |
246 public void setThreadsPriority(int priority) | 245 public void setThreadsPriority(int priority) |
247 { | 246 { |
248 _priority=priority; | 247 _priority=priority; |
249 } | 248 } |
250 | 249 |
251 /* ------------------------------------------------------------ */ | 250 /* ------------------------------------------------------------ */ |
252 /** | 251 /** |
253 * @return maximum queue size | 252 * @return maximum queue size |
254 */ | 253 */ |
255 public int getMaxQueued() | 254 public int getMaxQueued() |
256 { | 255 { |
257 return _maxQueued; | 256 return _maxQueued; |
258 } | 257 } |
259 | 258 |
260 /* ------------------------------------------------------------ */ | 259 /* ------------------------------------------------------------ */ |
261 /** | 260 /** |
262 * @param max job queue size | 261 * @param max job queue size |
263 */ | 262 */ |
264 public void setMaxQueued(int max) | 263 public void setMaxQueued(int max) |
265 { | 264 { |
266 if (isRunning()) | 265 if (isRunning()) |
267 throw new IllegalStateException("started"); | 266 throw new IllegalStateException("started"); |
268 _maxQueued=max; | 267 _maxQueued=max; |
269 } | 268 } |
270 | 269 |
271 /* ------------------------------------------------------------ */ | 270 /* ------------------------------------------------------------ */ |
272 /** Get the maximum thread idle time. | 271 /** Get the maximum thread idle time. |
273 * Delegated to the named or anonymous Pool. | 272 * Delegated to the named or anonymous Pool. |
274 * @see #setMaxIdleTimeMs | 273 * @see #setMaxIdleTimeMs |
275 * @return Max idle time in ms. | 274 * @return Max idle time in ms. |
276 */ | 275 */ |
277 public int getMaxIdleTimeMs() | 276 public int getMaxIdleTimeMs() |
278 { | 277 { |
279 return _maxIdleTimeMs; | 278 return _maxIdleTimeMs; |
280 } | 279 } |
281 | 280 |
282 /* ------------------------------------------------------------ */ | 281 /* ------------------------------------------------------------ */ |
283 /** | 282 /** |
284 * @return maximum total time that stop() will wait for threads to die. | 283 * @return maximum total time that stop() will wait for threads to die. |
285 */ | 284 */ |
286 public int getMaxStopTimeMs() | 285 public int getMaxStopTimeMs() |
287 { | 286 { |
288 return _maxStopTime; | 287 return _maxStopTime; |
289 } | 288 } |
290 | 289 |
291 /* ------------------------------------------------------------ */ | 290 /* ------------------------------------------------------------ */ |
292 /** Set the maximum number of threads. | 291 /** Set the maximum number of threads. |
293 * Delegated to the named or anonymous Pool. | 292 * Delegated to the named or anonymous Pool. |
294 * @see #setMaxThreads | 293 * @see #setMaxThreads |
295 * @return maximum number of threads. | 294 * @return maximum number of threads. |
296 */ | 295 */ |
297 public int getMaxThreads() | 296 public int getMaxThreads() |
298 { | 297 { |
299 return _maxThreads; | 298 return _maxThreads; |
300 } | 299 } |
301 | 300 |
302 /* ------------------------------------------------------------ */ | 301 /* ------------------------------------------------------------ */ |
303 /** Get the minimum number of threads. | 302 /** Get the minimum number of threads. |
304 * Delegated to the named or anonymous Pool. | 303 * Delegated to the named or anonymous Pool. |
305 * @see #setMinThreads | 304 * @see #setMinThreads |
306 * @return minimum number of threads. | 305 * @return minimum number of threads. |
307 */ | 306 */ |
308 public int getMinThreads() | 307 public int getMinThreads() |
309 { | 308 { |
310 return _minThreads; | 309 return _minThreads; |
311 } | 310 } |
312 | 311 |
313 /* ------------------------------------------------------------ */ | 312 /* ------------------------------------------------------------ */ |
314 /** | 313 /** |
315 * @return The name of the BoundedThreadPool. | 314 * @return The name of the BoundedThreadPool. |
316 */ | 315 */ |
317 public String getName() | 316 public String getName() |
318 { | 317 { |
319 return _name; | 318 return _name; |
320 } | 319 } |
321 | 320 |
322 /* ------------------------------------------------------------ */ | 321 /* ------------------------------------------------------------ */ |
323 /** Get the priority of the pool threads. | 322 /** Get the priority of the pool threads. |
324 * @return the priority of the pool threads. | 323 * @return the priority of the pool threads. |
325 */ | 324 */ |
326 public int getThreadsPriority() | 325 public int getThreadsPriority() |
327 { | 326 { |
328 return _priority; | 327 return _priority; |
329 } | 328 } |
330 | 329 |
331 /* ------------------------------------------------------------ */ | 330 /* ------------------------------------------------------------ */ |
332 /** | 331 /** |
333 * Delegated to the named or anonymous Pool. | 332 * Delegated to the named or anonymous Pool. |
334 */ | 333 */ |
335 public boolean isDaemon() | 334 public boolean isDaemon() |
336 { | 335 { |
337 return _daemon; | 336 return _daemon; |
338 } | 337 } |
339 | 338 |
340 /* ------------------------------------------------------------ */ | 339 /* ------------------------------------------------------------ */ |
341 public boolean isDetailedDump() | 340 public boolean isDetailedDump() |
342 { | 341 { |
343 return _detailedDump; | 342 return _detailedDump; |
344 } | 343 } |
345 | 344 |
346 /* ------------------------------------------------------------ */ | 345 /* ------------------------------------------------------------ */ |
347 public void setDetailedDump(boolean detailedDump) | 346 public void setDetailedDump(boolean detailedDump) |
348 { | 347 { |
349 _detailedDump = detailedDump; | 348 _detailedDump = detailedDump; |
350 } | 349 } |
351 | 350 |
352 /* ------------------------------------------------------------ */ | 351 /* ------------------------------------------------------------ */ |
353 public boolean dispatch(Runnable job) | 352 public boolean dispatch(Runnable job) |
354 { | 353 { |
355 if (isRunning()) | 354 if (isRunning()) |
356 { | 355 { |
357 final int jobQ = _jobs.size(); | 356 final int jobQ = _jobs.size(); |
358 final int idle = getIdleThreads(); | 357 final int idle = getIdleThreads(); |
359 if(_jobs.offer(job)) | 358 if(_jobs.offer(job)) |
360 { | 359 { |
361 // If we had no idle threads or the jobQ is greater than the idle threads | 360 // If we had no idle threads or the jobQ is greater than the idle threads |
362 if (idle==0 || jobQ>idle) | 361 if (idle==0 || jobQ>idle) |
363 { | 362 { |
364 int threads=_threadsStarted.get(); | 363 int threads=_threadsStarted.get(); |
365 if (threads<_maxThreads) | 364 if (threads<_maxThreads) |
366 startThread(threads); | 365 startThread(threads); |
367 } | 366 } |
368 return true; | 367 return true; |
369 } | 368 } |
370 } | 369 } |
371 LOG.debug("Dispatched {} to stopped {}",job,this); | 370 LOG.debug("Dispatched {} to stopped {}",job,this); |
372 return false; | 371 return false; |
373 } | 372 } |
374 | 373 |
375 /* ------------------------------------------------------------ */ | 374 /* ------------------------------------------------------------ */ |
376 public void execute(Runnable job) | 375 public void execute(Runnable job) |
377 { | 376 { |
378 if (!dispatch(job)) | 377 if (!dispatch(job)) |
379 throw new RejectedExecutionException(); | 378 throw new RejectedExecutionException(); |
380 } | 379 } |
381 | 380 |
382 /* ------------------------------------------------------------ */ | 381 /* ------------------------------------------------------------ */ |
383 /** | 382 /** |
384 * Blocks until the thread pool is {@link LifeCycle#stop stopped}. | 383 * Blocks until the thread pool is {@link LifeCycle#stop stopped}. |
385 */ | 384 */ |
386 public void join() throws InterruptedException | 385 public void join() throws InterruptedException |
387 { | 386 { |
388 synchronized (_joinLock) | 387 synchronized (_joinLock) |
389 { | 388 { |
390 while (isRunning()) | 389 while (isRunning()) |
391 _joinLock.wait(); | 390 _joinLock.wait(); |
392 } | 391 } |
393 | 392 |
394 while (isStopping()) | 393 while (isStopping()) |
395 Thread.sleep(1); | 394 Thread.sleep(1); |
396 } | 395 } |
397 | 396 |
398 /* ------------------------------------------------------------ */ | 397 /* ------------------------------------------------------------ */ |
399 /** | 398 /** |
400 * @return The total number of threads currently in the pool | 399 * @return The total number of threads currently in the pool |
401 */ | 400 */ |
402 public int getThreads() | 401 public int getThreads() |
403 { | 402 { |
404 return _threadsStarted.get(); | 403 return _threadsStarted.get(); |
405 } | 404 } |
406 | 405 |
407 /* ------------------------------------------------------------ */ | 406 /* ------------------------------------------------------------ */ |
408 /** | 407 /** |
409 * @return The number of idle threads in the pool | 408 * @return The number of idle threads in the pool |
410 */ | 409 */ |
411 public int getIdleThreads() | 410 public int getIdleThreads() |
412 { | 411 { |
413 return _threadsIdle.get(); | 412 return _threadsIdle.get(); |
414 } | 413 } |
415 | 414 |
416 /* ------------------------------------------------------------ */ | 415 /* ------------------------------------------------------------ */ |
417 /** | 416 /** |
418 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs | 417 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs |
419 */ | 418 */ |
420 public boolean isLowOnThreads() | 419 public boolean isLowOnThreads() |
421 { | 420 { |
422 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); | 421 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); |
423 } | 422 } |
424 | 423 |
425 /* ------------------------------------------------------------ */ | 424 /* ------------------------------------------------------------ */ |
426 private boolean startThread(int threads) | 425 private boolean startThread(int threads) |
427 { | 426 { |
428 final int next=threads+1; | 427 final int next=threads+1; |
429 if (!_threadsStarted.compareAndSet(threads,next)) | 428 if (!_threadsStarted.compareAndSet(threads,next)) |
430 return false; | 429 return false; |
431 | 430 |
432 boolean started=false; | 431 boolean started=false; |
433 try | 432 try |
434 { | 433 { |
435 Thread thread=newThread(_runnable); | 434 Thread thread=newThread(_runnable); |
436 thread.setDaemon(_daemon); | 435 thread.setDaemon(_daemon); |
437 thread.setPriority(_priority); | 436 thread.setPriority(_priority); |
438 thread.setName(_name+"-"+thread.getId()); | 437 thread.setName(_name+"-"+thread.getId()); |
439 _threads.add(thread); | 438 _threads.add(thread); |
440 | 439 |
441 thread.start(); | 440 thread.start(); |
442 started=true; | 441 started=true; |
443 } | 442 } |
444 finally | 443 finally |
445 { | 444 { |
446 if (!started) | 445 if (!started) |
447 _threadsStarted.decrementAndGet(); | 446 _threadsStarted.decrementAndGet(); |
448 } | 447 } |
449 return started; | 448 return started; |
450 } | 449 } |
451 | 450 |
452 /* ------------------------------------------------------------ */ | 451 /* ------------------------------------------------------------ */ |
453 protected Thread newThread(Runnable runnable) | 452 protected Thread newThread(Runnable runnable) |
454 { | 453 { |
455 return new Thread(runnable); | 454 return new Thread(runnable); |
456 } | 455 } |
457 | 456 |
458 | 457 |
459 /* ------------------------------------------------------------ */ | 458 /* ------------------------------------------------------------ */ |
460 public String dump() | 459 public String dump() |
461 { | 460 { |
462 return AggregateLifeCycle.dump(this); | 461 return AggregateLifeCycle.dump(this); |
463 } | 462 } |
464 | 463 |
465 /* ------------------------------------------------------------ */ | 464 /* ------------------------------------------------------------ */ |
466 public void dump(Appendable out, String indent) throws IOException | 465 public void dump(Appendable out, String indent) throws IOException |
467 { | 466 { |
468 List<Object> dump = new ArrayList<Object>(getMaxThreads()); | 467 List<Object> dump = new ArrayList<Object>(getMaxThreads()); |
469 for (final Thread thread: _threads) | 468 for (final Thread thread: _threads) |
470 { | 469 { |
471 final StackTraceElement[] trace=thread.getStackTrace(); | 470 final StackTraceElement[] trace=thread.getStackTrace(); |
472 boolean inIdleJobPoll=false; | 471 boolean inIdleJobPoll=false; |
473 // trace can be null on early java 6 jvms | 472 // trace can be null on early java 6 jvms |
474 if (trace != null) | 473 if (trace != null) |
475 { | 474 { |
476 for (StackTraceElement t : trace) | 475 for (StackTraceElement t : trace) |
477 { | 476 { |
478 if ("idleJobPoll".equals(t.getMethodName())) | 477 if ("idleJobPoll".equals(t.getMethodName())) |
479 { | 478 { |
480 inIdleJobPoll = true; | 479 inIdleJobPoll = true; |
481 break; | 480 break; |
482 } | 481 } |
483 } | 482 } |
484 } | 483 } |
485 final boolean idle=inIdleJobPoll; | 484 final boolean idle=inIdleJobPoll; |
486 | 485 |
487 if (_detailedDump) | 486 if (_detailedDump) |
488 { | 487 { |
489 dump.add(new Dumpable() | 488 dump.add(new Dumpable() |
490 { | 489 { |
491 public void dump(Appendable out, String indent) throws IOException | 490 public void dump(Appendable out, String indent) throws IOException |
492 { | 491 { |
493 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); | 492 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); |
494 if (!idle) | 493 if (!idle) |
495 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); | 494 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); |
496 } | 495 } |
497 | 496 |
498 public String dump() | 497 public String dump() |
499 { | 498 { |
500 return null; | 499 return null; |
501 } | 500 } |
502 }); | 501 }); |
503 } | 502 } |
504 else | 503 else |
505 { | 504 { |
506 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); | 505 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); |
507 } | 506 } |
508 } | 507 } |
509 | 508 |
510 AggregateLifeCycle.dumpObject(out,this); | 509 AggregateLifeCycle.dumpObject(out,this); |
511 AggregateLifeCycle.dump(out,indent,dump); | 510 AggregateLifeCycle.dump(out,indent,dump); |
512 | 511 |
513 } | 512 } |
514 | 513 |
515 | 514 |
516 /* ------------------------------------------------------------ */ | 515 /* ------------------------------------------------------------ */ |
517 @Override | 516 @Override |
518 public String toString() | 517 public String toString() |
519 { | 518 { |
520 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; | 519 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; |
521 } | 520 } |
522 | 521 |
523 /* ------------------------------------------------------------ */ | 522 /* ------------------------------------------------------------ */ |
524 private Runnable idleJobPoll() throws InterruptedException | 523 private Runnable idleJobPoll() throws InterruptedException |
525 { | 524 { |
526 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); | 525 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); |
527 } | 526 } |
528 | 527 |
529 /* ------------------------------------------------------------ */ | 528 /* ------------------------------------------------------------ */ |
530 private Runnable _runnable = new Runnable() | 529 private Runnable _runnable = new Runnable() |
531 { | 530 { |
532 public void run() | 531 public void run() |
533 { | 532 { |
534 boolean shrink=false; | 533 boolean shrink=false; |
535 try | 534 try |
536 { | 535 { |
537 Runnable job=_jobs.poll(); | 536 Runnable job=_jobs.poll(); |
538 while (isRunning()) | 537 while (isRunning()) |
539 { | 538 { |
540 // Job loop | 539 // Job loop |
541 while (job!=null && isRunning()) | 540 while (job!=null && isRunning()) |
542 { | 541 { |
543 runJob(job); | 542 runJob(job); |
544 job=_jobs.poll(); | 543 job=_jobs.poll(); |
545 } | 544 } |
546 | 545 |
547 // Idle loop | 546 // Idle loop |
548 try | 547 try |
549 { | 548 { |
550 _threadsIdle.incrementAndGet(); | 549 _threadsIdle.incrementAndGet(); |
551 | 550 |
552 while (isRunning() && job==null) | 551 while (isRunning() && job==null) |
553 { | 552 { |
554 if (_maxIdleTimeMs<=0) | 553 if (_maxIdleTimeMs<=0) |
555 job=_jobs.take(); | 554 job=_jobs.take(); |
556 else | 555 else |
557 { | 556 { |
558 // maybe we should shrink? | 557 // maybe we should shrink? |
559 final int size=_threadsStarted.get(); | 558 final int size=_threadsStarted.get(); |
560 if (size>_minThreads) | 559 if (size>_minThreads) |
561 { | 560 { |
562 long last=_lastShrink.get(); | 561 long last=_lastShrink.get(); |
563 long now=System.currentTimeMillis(); | 562 long now=System.currentTimeMillis(); |
564 if (last==0 || (now-last)>_maxIdleTimeMs) | 563 if (last==0 || (now-last)>_maxIdleTimeMs) |
565 { | 564 { |
566 shrink=_lastShrink.compareAndSet(last,now) && | 565 shrink=_lastShrink.compareAndSet(last,now) && |
567 _threadsStarted.compareAndSet(size,size-1); | 566 _threadsStarted.compareAndSet(size,size-1); |
568 if (shrink) | 567 if (shrink) |
569 return; | 568 return; |
570 } | 569 } |
571 } | 570 } |
572 job=idleJobPoll(); | 571 job=idleJobPoll(); |
573 } | 572 } |
574 } | 573 } |
575 } | 574 } |
576 finally | 575 finally |
577 { | 576 { |
578 _threadsIdle.decrementAndGet(); | 577 _threadsIdle.decrementAndGet(); |
579 } | 578 } |
580 } | 579 } |
581 } | 580 } |
582 catch(InterruptedException e) | 581 catch(InterruptedException e) |
583 { | 582 { |
584 LOG.trace("",e); | 583 LOG.trace("",e); |
585 } | 584 } |
586 catch(Exception e) | 585 catch(Exception e) |
587 { | 586 { |
588 LOG.warn("",e); | 587 LOG.warn("",e); |
589 } | 588 } |
590 finally | 589 finally |
591 { | 590 { |
592 if (!shrink) | 591 if (!shrink) |
593 _threadsStarted.decrementAndGet(); | 592 _threadsStarted.decrementAndGet(); |
594 _threads.remove(Thread.currentThread()); | 593 _threads.remove(Thread.currentThread()); |
595 } | 594 } |
596 } | 595 } |
597 }; | 596 }; |
598 | 597 |
599 /* ------------------------------------------------------------ */ | 598 /* ------------------------------------------------------------ */ |
600 /** | 599 /** |
601 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> | 600 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> |
602 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> | 601 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> |
603 * | 602 * |
604 * @param job the job to run | 603 * @param job the job to run |
605 */ | 604 */ |
606 protected void runJob(Runnable job) | 605 protected void runJob(Runnable job) |
607 { | 606 { |
608 job.run(); | 607 job.run(); |
609 } | 608 } |
610 | 609 |
611 /* ------------------------------------------------------------ */ | 610 /* ------------------------------------------------------------ */ |
612 /** | 611 /** |
613 * @return the job queue | 612 * @return the job queue |
614 */ | 613 */ |
615 protected BlockingQueue<Runnable> getQueue() | 614 protected BlockingQueue<Runnable> getQueue() |
616 { | 615 { |
617 return _jobs; | 616 return _jobs; |
618 } | 617 } |
619 | 618 |
620 /* ------------------------------------------------------------ */ | 619 /* ------------------------------------------------------------ */ |
621 /** | 620 /** |
622 * @param id The thread ID to stop. | 621 * @param id The thread ID to stop. |
623 * @return true if the thread was found and stopped. | 622 * @return true if the thread was found and stopped. |
624 * @deprecated Use {@link #interruptThread(long)} in preference | 623 * @deprecated Use {@link #interruptThread(long)} in preference |
625 */ | 624 */ |
626 @Deprecated | 625 @Deprecated |
627 public boolean stopThread(long id) | 626 public boolean stopThread(long id) |
628 { | 627 { |
629 for (Thread thread: _threads) | 628 for (Thread thread: _threads) |
630 { | 629 { |
631 if (thread.getId()==id) | 630 if (thread.getId()==id) |
632 { | 631 { |
633 thread.stop(); | 632 thread.stop(); |
634 return true; | 633 return true; |
635 } | 634 } |
636 } | 635 } |
637 return false; | 636 return false; |
638 } | 637 } |
639 | 638 |
640 /* ------------------------------------------------------------ */ | 639 /* ------------------------------------------------------------ */ |
641 /** | 640 /** |
642 * @param id The thread ID to interrupt. | 641 * @param id The thread ID to interrupt. |
643 * @return true if the thread was found and interrupted. | 642 * @return true if the thread was found and interrupted. |
644 */ | 643 */ |
645 public boolean interruptThread(long id) | 644 public boolean interruptThread(long id) |
646 { | 645 { |
647 for (Thread thread: _threads) | 646 for (Thread thread: _threads) |
648 { | 647 { |
649 if (thread.getId()==id) | 648 if (thread.getId()==id) |
650 { | 649 { |
651 thread.interrupt(); | 650 thread.interrupt(); |
652 return true; | 651 return true; |
653 } | 652 } |
654 } | 653 } |
655 return false; | 654 return false; |
656 } | 655 } |
657 | 656 |
658 /* ------------------------------------------------------------ */ | 657 /* ------------------------------------------------------------ */ |
659 /** | 658 /** |
660 * @param id The thread ID to interrupt. | 659 * @param id The thread ID to interrupt. |
661 * @return true if the thread was found and interrupted. | 660 * @return true if the thread was found and interrupted. |
662 */ | 661 */ |
663 public String dumpThread(long id) | 662 public String dumpThread(long id) |
664 { | 663 { |
665 for (Thread thread: _threads) | 664 for (Thread thread: _threads) |
666 { | 665 { |
667 if (thread.getId()==id) | 666 if (thread.getId()==id) |
668 { | 667 { |
669 StringBuilder buf = new StringBuilder(); | 668 StringBuilder buf = new StringBuilder(); |
670 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); | 669 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); |
671 for (StackTraceElement element : thread.getStackTrace()) | 670 for (StackTraceElement element : thread.getStackTrace()) |
672 buf.append(" at ").append(element.toString()).append('\n'); | 671 buf.append(" at ").append(element.toString()).append('\n'); |
673 return buf.toString(); | 672 return buf.toString(); |
674 } | 673 } |
675 } | 674 } |
676 return null; | 675 return null; |
677 } | 676 } |
678 } | 677 } |