Mercurial Hosting > luan
comparison src/org/eclipse/jetty/util/thread/QueuedThreadPool.java @ 802:3428c60d7cfc
replace jetty jars with source
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 07 Sep 2016 21:15:48 -0600 |
parents | |
children | 8e9db0bbf4f9 |
comparison
equal
deleted
inserted
replaced
801:6a21393191c1 | 802:3428c60d7cfc |
---|---|
1 // | |
2 // ======================================================================== | |
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. | |
4 // ------------------------------------------------------------------------ | |
5 // All rights reserved. This program and the accompanying materials | |
6 // are made available under the terms of the Eclipse Public License v1.0 | |
7 // and Apache License v2.0 which accompanies this distribution. | |
8 // | |
9 // The Eclipse Public License is available at | |
10 // http://www.eclipse.org/legal/epl-v10.html | |
11 // | |
12 // The Apache License v2.0 is available at | |
13 // http://www.opensource.org/licenses/apache2.0.php | |
14 // | |
15 // You may elect to redistribute this code under either of these licenses. | |
16 // ======================================================================== | |
17 // | |
18 | |
19 | |
20 package org.eclipse.jetty.util.thread; | |
21 | |
22 import java.io.IOException; | |
23 import java.util.ArrayList; | |
24 import java.util.Arrays; | |
25 import java.util.List; | |
26 import java.util.concurrent.ArrayBlockingQueue; | |
27 import java.util.concurrent.BlockingQueue; | |
28 import java.util.concurrent.ConcurrentLinkedQueue; | |
29 import java.util.concurrent.Executor; | |
30 import java.util.concurrent.RejectedExecutionException; | |
31 import java.util.concurrent.TimeUnit; | |
32 import java.util.concurrent.atomic.AtomicInteger; | |
33 import java.util.concurrent.atomic.AtomicLong; | |
34 | |
35 import org.eclipse.jetty.util.BlockingArrayQueue; | |
36 import org.eclipse.jetty.util.component.AbstractLifeCycle; | |
37 import org.eclipse.jetty.util.component.AggregateLifeCycle; | |
38 import org.eclipse.jetty.util.component.Dumpable; | |
39 import org.eclipse.jetty.util.component.LifeCycle; | |
40 import org.eclipse.jetty.util.log.Log; | |
41 import org.eclipse.jetty.util.log.Logger; | |
42 import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; | |
43 | |
44 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable | |
45 { | |
46 private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); | |
47 | |
48 private final AtomicInteger _threadsStarted = new AtomicInteger(); | |
49 private final AtomicInteger _threadsIdle = new AtomicInteger(); | |
50 private final AtomicLong _lastShrink = new AtomicLong(); | |
51 private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>(); | |
52 private final Object _joinLock = new Object(); | |
53 private BlockingQueue<Runnable> _jobs; | |
54 private String _name; | |
55 private int _maxIdleTimeMs=60000; | |
56 private int _maxThreads=254; | |
57 private int _minThreads=8; | |
58 private int _maxQueued=-1; | |
59 private int _priority=Thread.NORM_PRIORITY; | |
60 private boolean _daemon=false; | |
61 private int _maxStopTime=100; | |
62 private boolean _detailedDump=false; | |
63 | |
64 /* ------------------------------------------------------------------- */ | |
65 /** Construct | |
66 */ | |
67 public QueuedThreadPool() | |
68 { | |
69 _name="qtp"+super.hashCode(); | |
70 } | |
71 | |
72 /* ------------------------------------------------------------------- */ | |
73 /** Construct | |
74 */ | |
75 public QueuedThreadPool(int maxThreads) | |
76 { | |
77 this(); | |
78 setMaxThreads(maxThreads); | |
79 } | |
80 | |
81 /* ------------------------------------------------------------------- */ | |
82 /** Construct | |
83 */ | |
84 public QueuedThreadPool(BlockingQueue<Runnable> jobQ) | |
85 { | |
86 this(); | |
87 _jobs=jobQ; | |
88 _jobs.clear(); | |
89 } | |
90 | |
91 | |
92 /* ------------------------------------------------------------ */ | |
93 @Override | |
94 protected void doStart() throws Exception | |
95 { | |
96 super.doStart(); | |
97 _threadsStarted.set(0); | |
98 | |
99 if (_jobs==null) | |
100 { | |
101 _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) | |
102 :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); | |
103 } | |
104 | |
105 int threads=_threadsStarted.get(); | |
106 while (isRunning() && threads<_minThreads) | |
107 { | |
108 startThread(threads); | |
109 threads=_threadsStarted.get(); | |
110 } | |
111 } | |
112 | |
113 /* ------------------------------------------------------------ */ | |
114 @Override | |
115 protected void doStop() throws Exception | |
116 { | |
117 super.doStop(); | |
118 long start=System.currentTimeMillis(); | |
119 | |
120 // let jobs complete naturally for a while | |
121 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2)) | |
122 Thread.sleep(1); | |
123 | |
124 // kill queued jobs and flush out idle jobs | |
125 _jobs.clear(); | |
126 Runnable noop = new Runnable(){public void run(){}}; | |
127 for (int i=_threadsIdle.get();i-->0;) | |
128 _jobs.offer(noop); | |
129 Thread.yield(); | |
130 | |
131 // interrupt remaining threads | |
132 if (_threadsStarted.get()>0) | |
133 for (Thread thread : _threads) | |
134 thread.interrupt(); | |
135 | |
136 // wait for remaining threads to die | |
137 while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime) | |
138 { | |
139 Thread.sleep(1); | |
140 } | |
141 Thread.yield(); | |
142 int size=_threads.size(); | |
143 if (size>0) | |
144 { | |
145 LOG.warn(size+" threads could not be stopped"); | |
146 | |
147 if (size==1 || LOG.isDebugEnabled()) | |
148 { | |
149 for (Thread unstopped : _threads) | |
150 { | |
151 LOG.info("Couldn't stop "+unstopped); | |
152 for (StackTraceElement element : unstopped.getStackTrace()) | |
153 { | |
154 LOG.info(" at "+element); | |
155 } | |
156 } | |
157 } | |
158 } | |
159 | |
160 synchronized (_joinLock) | |
161 { | |
162 _joinLock.notifyAll(); | |
163 } | |
164 } | |
165 | |
166 /* ------------------------------------------------------------ */ | |
167 /** | |
168 * Delegated to the named or anonymous Pool. | |
169 */ | |
170 public void setDaemon(boolean daemon) | |
171 { | |
172 _daemon=daemon; | |
173 } | |
174 | |
175 /* ------------------------------------------------------------ */ | |
176 /** Set the maximum thread idle time. | |
177 * Threads that are idle for longer than this period may be | |
178 * stopped. | |
179 * Delegated to the named or anonymous Pool. | |
180 * @see #getMaxIdleTimeMs | |
181 * @param maxIdleTimeMs Max idle time in ms. | |
182 */ | |
183 public void setMaxIdleTimeMs(int maxIdleTimeMs) | |
184 { | |
185 _maxIdleTimeMs=maxIdleTimeMs; | |
186 } | |
187 | |
188 /* ------------------------------------------------------------ */ | |
189 /** | |
190 * @param stopTimeMs maximum total time that stop() will wait for threads to die. | |
191 */ | |
192 public void setMaxStopTimeMs(int stopTimeMs) | |
193 { | |
194 _maxStopTime = stopTimeMs; | |
195 } | |
196 | |
197 /* ------------------------------------------------------------ */ | |
198 /** Set the maximum number of threads. | |
199 * Delegated to the named or anonymous Pool. | |
200 * @see #getMaxThreads | |
201 * @param maxThreads maximum number of threads. | |
202 */ | |
203 public void setMaxThreads(int maxThreads) | |
204 { | |
205 _maxThreads=maxThreads; | |
206 if (_minThreads>_maxThreads) | |
207 _minThreads=_maxThreads; | |
208 } | |
209 | |
210 /* ------------------------------------------------------------ */ | |
211 /** Set the minimum number of threads. | |
212 * Delegated to the named or anonymous Pool. | |
213 * @see #getMinThreads | |
214 * @param minThreads minimum number of threads | |
215 */ | |
216 public void setMinThreads(int minThreads) | |
217 { | |
218 _minThreads=minThreads; | |
219 | |
220 if (_minThreads>_maxThreads) | |
221 _maxThreads=_minThreads; | |
222 | |
223 int threads=_threadsStarted.get(); | |
224 while (isStarted() && threads<_minThreads) | |
225 { | |
226 startThread(threads); | |
227 threads=_threadsStarted.get(); | |
228 } | |
229 } | |
230 | |
231 /* ------------------------------------------------------------ */ | |
232 /** | |
233 * @param name Name of the BoundedThreadPool to use when naming Threads. | |
234 */ | |
235 public void setName(String name) | |
236 { | |
237 if (isRunning()) | |
238 throw new IllegalStateException("started"); | |
239 _name= name; | |
240 } | |
241 | |
242 /* ------------------------------------------------------------ */ | |
243 /** Set the priority of the pool threads. | |
244 * @param priority the new thread priority. | |
245 */ | |
246 public void setThreadsPriority(int priority) | |
247 { | |
248 _priority=priority; | |
249 } | |
250 | |
251 /* ------------------------------------------------------------ */ | |
252 /** | |
253 * @return maximum queue size | |
254 */ | |
255 public int getMaxQueued() | |
256 { | |
257 return _maxQueued; | |
258 } | |
259 | |
260 /* ------------------------------------------------------------ */ | |
261 /** | |
262 * @param max job queue size | |
263 */ | |
264 public void setMaxQueued(int max) | |
265 { | |
266 if (isRunning()) | |
267 throw new IllegalStateException("started"); | |
268 _maxQueued=max; | |
269 } | |
270 | |
271 /* ------------------------------------------------------------ */ | |
272 /** Get the maximum thread idle time. | |
273 * Delegated to the named or anonymous Pool. | |
274 * @see #setMaxIdleTimeMs | |
275 * @return Max idle time in ms. | |
276 */ | |
277 public int getMaxIdleTimeMs() | |
278 { | |
279 return _maxIdleTimeMs; | |
280 } | |
281 | |
282 /* ------------------------------------------------------------ */ | |
283 /** | |
284 * @return maximum total time that stop() will wait for threads to die. | |
285 */ | |
286 public int getMaxStopTimeMs() | |
287 { | |
288 return _maxStopTime; | |
289 } | |
290 | |
291 /* ------------------------------------------------------------ */ | |
292 /** Set the maximum number of threads. | |
293 * Delegated to the named or anonymous Pool. | |
294 * @see #setMaxThreads | |
295 * @return maximum number of threads. | |
296 */ | |
297 public int getMaxThreads() | |
298 { | |
299 return _maxThreads; | |
300 } | |
301 | |
302 /* ------------------------------------------------------------ */ | |
303 /** Get the minimum number of threads. | |
304 * Delegated to the named or anonymous Pool. | |
305 * @see #setMinThreads | |
306 * @return minimum number of threads. | |
307 */ | |
308 public int getMinThreads() | |
309 { | |
310 return _minThreads; | |
311 } | |
312 | |
313 /* ------------------------------------------------------------ */ | |
314 /** | |
315 * @return The name of the BoundedThreadPool. | |
316 */ | |
317 public String getName() | |
318 { | |
319 return _name; | |
320 } | |
321 | |
322 /* ------------------------------------------------------------ */ | |
323 /** Get the priority of the pool threads. | |
324 * @return the priority of the pool threads. | |
325 */ | |
326 public int getThreadsPriority() | |
327 { | |
328 return _priority; | |
329 } | |
330 | |
331 /* ------------------------------------------------------------ */ | |
332 /** | |
333 * Delegated to the named or anonymous Pool. | |
334 */ | |
335 public boolean isDaemon() | |
336 { | |
337 return _daemon; | |
338 } | |
339 | |
340 /* ------------------------------------------------------------ */ | |
341 public boolean isDetailedDump() | |
342 { | |
343 return _detailedDump; | |
344 } | |
345 | |
346 /* ------------------------------------------------------------ */ | |
347 public void setDetailedDump(boolean detailedDump) | |
348 { | |
349 _detailedDump = detailedDump; | |
350 } | |
351 | |
352 /* ------------------------------------------------------------ */ | |
353 public boolean dispatch(Runnable job) | |
354 { | |
355 if (isRunning()) | |
356 { | |
357 final int jobQ = _jobs.size(); | |
358 final int idle = getIdleThreads(); | |
359 if(_jobs.offer(job)) | |
360 { | |
361 // If we had no idle threads or the jobQ is greater than the idle threads | |
362 if (idle==0 || jobQ>idle) | |
363 { | |
364 int threads=_threadsStarted.get(); | |
365 if (threads<_maxThreads) | |
366 startThread(threads); | |
367 } | |
368 return true; | |
369 } | |
370 } | |
371 LOG.debug("Dispatched {} to stopped {}",job,this); | |
372 return false; | |
373 } | |
374 | |
375 /* ------------------------------------------------------------ */ | |
376 public void execute(Runnable job) | |
377 { | |
378 if (!dispatch(job)) | |
379 throw new RejectedExecutionException(); | |
380 } | |
381 | |
382 /* ------------------------------------------------------------ */ | |
383 /** | |
384 * Blocks until the thread pool is {@link LifeCycle#stop stopped}. | |
385 */ | |
386 public void join() throws InterruptedException | |
387 { | |
388 synchronized (_joinLock) | |
389 { | |
390 while (isRunning()) | |
391 _joinLock.wait(); | |
392 } | |
393 | |
394 while (isStopping()) | |
395 Thread.sleep(1); | |
396 } | |
397 | |
398 /* ------------------------------------------------------------ */ | |
399 /** | |
400 * @return The total number of threads currently in the pool | |
401 */ | |
402 public int getThreads() | |
403 { | |
404 return _threadsStarted.get(); | |
405 } | |
406 | |
407 /* ------------------------------------------------------------ */ | |
408 /** | |
409 * @return The number of idle threads in the pool | |
410 */ | |
411 public int getIdleThreads() | |
412 { | |
413 return _threadsIdle.get(); | |
414 } | |
415 | |
416 /* ------------------------------------------------------------ */ | |
417 /** | |
418 * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs | |
419 */ | |
420 public boolean isLowOnThreads() | |
421 { | |
422 return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); | |
423 } | |
424 | |
425 /* ------------------------------------------------------------ */ | |
426 private boolean startThread(int threads) | |
427 { | |
428 final int next=threads+1; | |
429 if (!_threadsStarted.compareAndSet(threads,next)) | |
430 return false; | |
431 | |
432 boolean started=false; | |
433 try | |
434 { | |
435 Thread thread=newThread(_runnable); | |
436 thread.setDaemon(_daemon); | |
437 thread.setPriority(_priority); | |
438 thread.setName(_name+"-"+thread.getId()); | |
439 _threads.add(thread); | |
440 | |
441 thread.start(); | |
442 started=true; | |
443 } | |
444 finally | |
445 { | |
446 if (!started) | |
447 _threadsStarted.decrementAndGet(); | |
448 } | |
449 return started; | |
450 } | |
451 | |
452 /* ------------------------------------------------------------ */ | |
453 protected Thread newThread(Runnable runnable) | |
454 { | |
455 return new Thread(runnable); | |
456 } | |
457 | |
458 | |
459 /* ------------------------------------------------------------ */ | |
460 public String dump() | |
461 { | |
462 return AggregateLifeCycle.dump(this); | |
463 } | |
464 | |
465 /* ------------------------------------------------------------ */ | |
466 public void dump(Appendable out, String indent) throws IOException | |
467 { | |
468 List<Object> dump = new ArrayList<Object>(getMaxThreads()); | |
469 for (final Thread thread: _threads) | |
470 { | |
471 final StackTraceElement[] trace=thread.getStackTrace(); | |
472 boolean inIdleJobPoll=false; | |
473 // trace can be null on early java 6 jvms | |
474 if (trace != null) | |
475 { | |
476 for (StackTraceElement t : trace) | |
477 { | |
478 if ("idleJobPoll".equals(t.getMethodName())) | |
479 { | |
480 inIdleJobPoll = true; | |
481 break; | |
482 } | |
483 } | |
484 } | |
485 final boolean idle=inIdleJobPoll; | |
486 | |
487 if (_detailedDump) | |
488 { | |
489 dump.add(new Dumpable() | |
490 { | |
491 public void dump(Appendable out, String indent) throws IOException | |
492 { | |
493 out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); | |
494 if (!idle) | |
495 AggregateLifeCycle.dump(out,indent,Arrays.asList(trace)); | |
496 } | |
497 | |
498 public String dump() | |
499 { | |
500 return null; | |
501 } | |
502 }); | |
503 } | |
504 else | |
505 { | |
506 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); | |
507 } | |
508 } | |
509 | |
510 AggregateLifeCycle.dumpObject(out,this); | |
511 AggregateLifeCycle.dump(out,indent,dump); | |
512 | |
513 } | |
514 | |
515 | |
516 /* ------------------------------------------------------------ */ | |
517 @Override | |
518 public String toString() | |
519 { | |
520 return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}"; | |
521 } | |
522 | |
523 /* ------------------------------------------------------------ */ | |
524 private Runnable idleJobPoll() throws InterruptedException | |
525 { | |
526 return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); | |
527 } | |
528 | |
529 /* ------------------------------------------------------------ */ | |
530 private Runnable _runnable = new Runnable() | |
531 { | |
532 public void run() | |
533 { | |
534 boolean shrink=false; | |
535 try | |
536 { | |
537 Runnable job=_jobs.poll(); | |
538 while (isRunning()) | |
539 { | |
540 // Job loop | |
541 while (job!=null && isRunning()) | |
542 { | |
543 runJob(job); | |
544 job=_jobs.poll(); | |
545 } | |
546 | |
547 // Idle loop | |
548 try | |
549 { | |
550 _threadsIdle.incrementAndGet(); | |
551 | |
552 while (isRunning() && job==null) | |
553 { | |
554 if (_maxIdleTimeMs<=0) | |
555 job=_jobs.take(); | |
556 else | |
557 { | |
558 // maybe we should shrink? | |
559 final int size=_threadsStarted.get(); | |
560 if (size>_minThreads) | |
561 { | |
562 long last=_lastShrink.get(); | |
563 long now=System.currentTimeMillis(); | |
564 if (last==0 || (now-last)>_maxIdleTimeMs) | |
565 { | |
566 shrink=_lastShrink.compareAndSet(last,now) && | |
567 _threadsStarted.compareAndSet(size,size-1); | |
568 if (shrink) | |
569 return; | |
570 } | |
571 } | |
572 job=idleJobPoll(); | |
573 } | |
574 } | |
575 } | |
576 finally | |
577 { | |
578 _threadsIdle.decrementAndGet(); | |
579 } | |
580 } | |
581 } | |
582 catch(InterruptedException e) | |
583 { | |
584 LOG.ignore(e); | |
585 } | |
586 catch(Exception e) | |
587 { | |
588 LOG.warn(e); | |
589 } | |
590 finally | |
591 { | |
592 if (!shrink) | |
593 _threadsStarted.decrementAndGet(); | |
594 _threads.remove(Thread.currentThread()); | |
595 } | |
596 } | |
597 }; | |
598 | |
599 /* ------------------------------------------------------------ */ | |
600 /** | |
601 * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p> | |
602 * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p> | |
603 * | |
604 * @param job the job to run | |
605 */ | |
606 protected void runJob(Runnable job) | |
607 { | |
608 job.run(); | |
609 } | |
610 | |
611 /* ------------------------------------------------------------ */ | |
612 /** | |
613 * @return the job queue | |
614 */ | |
615 protected BlockingQueue<Runnable> getQueue() | |
616 { | |
617 return _jobs; | |
618 } | |
619 | |
620 /* ------------------------------------------------------------ */ | |
621 /** | |
622 * @param id The thread ID to stop. | |
623 * @return true if the thread was found and stopped. | |
624 * @deprecated Use {@link #interruptThread(long)} in preference | |
625 */ | |
626 @Deprecated | |
627 public boolean stopThread(long id) | |
628 { | |
629 for (Thread thread: _threads) | |
630 { | |
631 if (thread.getId()==id) | |
632 { | |
633 thread.stop(); | |
634 return true; | |
635 } | |
636 } | |
637 return false; | |
638 } | |
639 | |
640 /* ------------------------------------------------------------ */ | |
641 /** | |
642 * @param id The thread ID to interrupt. | |
643 * @return true if the thread was found and interrupted. | |
644 */ | |
645 public boolean interruptThread(long id) | |
646 { | |
647 for (Thread thread: _threads) | |
648 { | |
649 if (thread.getId()==id) | |
650 { | |
651 thread.interrupt(); | |
652 return true; | |
653 } | |
654 } | |
655 return false; | |
656 } | |
657 | |
658 /* ------------------------------------------------------------ */ | |
659 /** | |
660 * @param id The thread ID to interrupt. | |
661 * @return true if the thread was found and interrupted. | |
662 */ | |
663 public String dumpThread(long id) | |
664 { | |
665 for (Thread thread: _threads) | |
666 { | |
667 if (thread.getId()==id) | |
668 { | |
669 StringBuilder buf = new StringBuilder(); | |
670 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); | |
671 for (StackTraceElement element : thread.getStackTrace()) | |
672 buf.append(" at ").append(element.toString()).append('\n'); | |
673 return buf.toString(); | |
674 } | |
675 } | |
676 return null; | |
677 } | |
678 } |