comparison src/fschmidt/util/executor/AbstractThreadPool.java @ 68:00520880ad02

add fschmidt source
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 05 Oct 2025 17:24:15 -0600
parents
children
comparison
equal deleted inserted replaced
67:9d0fefce6985 68:00520880ad02
1 package fschmidt.util.executor;
2
3 import java.util.List;
4 import java.util.Queue;
5 import java.util.LinkedList;
6 import java.util.concurrent.Executor;
7 import java.util.concurrent.TimeUnit;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13
14 abstract class AbstractThreadPool implements RunnableWrapper {
15 private static final Logger logger = LoggerFactory.getLogger(AbstractThreadPool.class);
16
17 private final Thread[] threads;
18 private boolean isRunning = true;
19 private final AtomicInteger activeCount = new AtomicInteger(0);
20 private final List<RunnableWrapper> runnableWrappers = new CopyOnWriteArrayList<RunnableWrapper>();
21
22 private final Runnable worker = new Runnable() {
23
24 public void run() {
25 Runnable command;
26 while( (command = getCommand()) != null ) {
27 activeCount.incrementAndGet();
28 try {
29 command.run();
30 } catch(RuntimeException e) {
31 logger.error("",e);
32 } catch(Error e) {
33 logger.error("",e);
34 } finally {
35 activeCount.decrementAndGet();
36 }
37 }
38 }
39
40 };
41
42 public AbstractThreadPool(int size) {
43 threads = new Thread[size];
44 }
45
46 final void start() {
47 for( int i=0; i<threads.length; i++ ) {
48 Thread thread = new Thread(worker);
49 thread.start();
50 threads[i] = thread;
51 }
52 }
53
54 public final void addRunnableWrapper(RunnableWrapper wrapper) {
55 runnableWrappers.add(wrapper);
56 }
57
58 @Override final public Runnable wrap(Runnable command) {
59 for( RunnableWrapper wrapper : runnableWrappers ) {
60 command = wrapper.wrap(command);
61 }
62 return command;
63 }
64
65 boolean isRunning() {
66 return isRunning;
67 }
68
69 abstract Runnable getCommand();
70
71 public final synchronized void shutdown() {
72 isRunning = false;
73 notifyAll();
74 }
75
76 public final boolean isTerminated() {
77 for( Thread thread : threads ) {
78 if( thread.isAlive() )
79 return false;
80 }
81 return true;
82 }
83
84 public final void join() throws InterruptedException {
85 shutdown();
86 for( Thread thread : threads ) {
87 thread.join();
88 }
89 }
90
91 public final void join(long timeoutMillis) throws InterruptedException {
92 long until = System.currentTimeMillis() + timeoutMillis;
93 long now;
94 int i = 0;
95 while( i < threads.length && (now = System.currentTimeMillis()) < until ) {
96 threads[i].join(until-now);
97 i++;
98 }
99 }
100
101 public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
102 long timeoutMillis = unit.toMillis(timeout);
103 join(timeoutMillis);
104 return isTerminated();
105 }
106
107 public final int getPoolSize() {
108 return threads.length;
109 }
110
111 public final int getActiveCount() {
112 return activeCount.get();
113 }
114
115 public abstract int getQueueSize();
116
117 public final Thread[] getThreads() {
118 return threads;
119 }
120 }