| 
68
 | 
     1 package fschmidt.util.executor;
 | 
| 
 | 
     2 
 | 
| 
 | 
     3 import java.util.List;
 | 
| 
 | 
     4 import java.util.Queue;
 | 
| 
 | 
     5 import java.util.LinkedList;
 | 
| 
 | 
     6 import java.util.concurrent.Executor;
 | 
| 
 | 
     7 import java.util.concurrent.TimeUnit;
 | 
| 
 | 
     8 import java.util.concurrent.CopyOnWriteArrayList;
 | 
| 
 | 
     9 import java.util.concurrent.atomic.AtomicInteger;
 | 
| 
 | 
    10 import org.slf4j.Logger;
 | 
| 
 | 
    11 import org.slf4j.LoggerFactory;
 | 
| 
 | 
    12 
 | 
| 
 | 
    13 
 | 
| 
 | 
    14 abstract class AbstractThreadPool implements RunnableWrapper {
 | 
| 
 | 
    15 	private static final Logger logger = LoggerFactory.getLogger(AbstractThreadPool.class);
 | 
| 
 | 
    16 
 | 
| 
 | 
    17 	private final Thread[] threads;
 | 
| 
 | 
    18 	private boolean isRunning = true;
 | 
| 
 | 
    19 	private final AtomicInteger activeCount = new AtomicInteger(0);
 | 
| 
 | 
    20 	private final List<RunnableWrapper> runnableWrappers = new CopyOnWriteArrayList<RunnableWrapper>();
 | 
| 
 | 
    21 
 | 
| 
 | 
    22 	private final Runnable worker = new Runnable() {
 | 
| 
 | 
    23 
 | 
| 
 | 
    24 		public void run() {
 | 
| 
 | 
    25 			Runnable command;
 | 
| 
 | 
    26 			while( (command = getCommand()) != null ) {
 | 
| 
 | 
    27 				activeCount.incrementAndGet();
 | 
| 
 | 
    28 				try {
 | 
| 
 | 
    29 					command.run();
 | 
| 
 | 
    30 				} catch(RuntimeException e) {
 | 
| 
 | 
    31 					logger.error("",e);
 | 
| 
 | 
    32 				} catch(Error e) {
 | 
| 
 | 
    33 					logger.error("",e);
 | 
| 
 | 
    34 				} finally {
 | 
| 
 | 
    35 					activeCount.decrementAndGet();
 | 
| 
 | 
    36 				}
 | 
| 
 | 
    37 			}
 | 
| 
 | 
    38 		}
 | 
| 
 | 
    39 
 | 
| 
 | 
    40 	};
 | 
| 
 | 
    41 
 | 
| 
 | 
    42 	public AbstractThreadPool(int size) {
 | 
| 
 | 
    43 		threads = new Thread[size];
 | 
| 
 | 
    44 	}
 | 
| 
 | 
    45 
 | 
| 
 | 
    46 	final void start() {
 | 
| 
 | 
    47 		for( int i=0; i<threads.length; i++ ) {
 | 
| 
 | 
    48 			Thread thread = new Thread(worker);
 | 
| 
 | 
    49 			thread.start();
 | 
| 
 | 
    50 			threads[i] = thread;
 | 
| 
 | 
    51 		}
 | 
| 
 | 
    52 	}
 | 
| 
 | 
    53 
 | 
| 
 | 
    54 	public final void addRunnableWrapper(RunnableWrapper wrapper) {
 | 
| 
 | 
    55 		runnableWrappers.add(wrapper);
 | 
| 
 | 
    56 	}
 | 
| 
 | 
    57 
 | 
| 
 | 
    58 	@Override final public Runnable wrap(Runnable command) {
 | 
| 
 | 
    59 		for( RunnableWrapper wrapper : runnableWrappers ) {
 | 
| 
 | 
    60 			command = wrapper.wrap(command);
 | 
| 
 | 
    61 		}
 | 
| 
 | 
    62 		return command;
 | 
| 
 | 
    63 	}
 | 
| 
 | 
    64 
 | 
| 
 | 
    65 	boolean isRunning() {
 | 
| 
 | 
    66 		return isRunning;
 | 
| 
 | 
    67 	}
 | 
| 
 | 
    68 
 | 
| 
 | 
    69 	abstract Runnable getCommand();
 | 
| 
 | 
    70 
 | 
| 
 | 
    71 	public final synchronized void shutdown() {
 | 
| 
 | 
    72 		isRunning = false;
 | 
| 
 | 
    73 		notifyAll();
 | 
| 
 | 
    74 	}
 | 
| 
 | 
    75 
 | 
| 
 | 
    76 	public final boolean isTerminated() {
 | 
| 
 | 
    77 		for( Thread thread : threads ) {
 | 
| 
 | 
    78 			if( thread.isAlive() )
 | 
| 
 | 
    79 				return false;
 | 
| 
 | 
    80 		}
 | 
| 
 | 
    81 		return true;
 | 
| 
 | 
    82 	}
 | 
| 
 | 
    83 
 | 
| 
 | 
    84 	public final void join() throws InterruptedException {
 | 
| 
 | 
    85 		shutdown();
 | 
| 
 | 
    86 		for( Thread thread : threads ) {
 | 
| 
 | 
    87 			thread.join();
 | 
| 
 | 
    88 		}
 | 
| 
 | 
    89 	}
 | 
| 
 | 
    90 
 | 
| 
 | 
    91 	public final void join(long timeoutMillis) throws InterruptedException {
 | 
| 
 | 
    92 		long until = System.currentTimeMillis() + timeoutMillis;
 | 
| 
 | 
    93 		long now;
 | 
| 
 | 
    94 		int i = 0;
 | 
| 
 | 
    95 		while( i < threads.length && (now = System.currentTimeMillis()) < until ) {
 | 
| 
 | 
    96 			threads[i].join(until-now);
 | 
| 
 | 
    97 			i++;
 | 
| 
 | 
    98 		}
 | 
| 
 | 
    99 	}
 | 
| 
 | 
   100 
 | 
| 
 | 
   101 	public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
 | 
| 
 | 
   102 		long timeoutMillis = unit.toMillis(timeout);
 | 
| 
 | 
   103 		join(timeoutMillis);
 | 
| 
 | 
   104 		return isTerminated();
 | 
| 
 | 
   105 	}
 | 
| 
 | 
   106 
 | 
| 
 | 
   107 	public final int getPoolSize() {
 | 
| 
 | 
   108 		return threads.length;
 | 
| 
 | 
   109 	}
 | 
| 
 | 
   110 
 | 
| 
 | 
   111 	public final int getActiveCount() {
 | 
| 
 | 
   112 		return activeCount.get();
 | 
| 
 | 
   113 	}
 | 
| 
 | 
   114 
 | 
| 
 | 
   115 	public abstract int getQueueSize();
 | 
| 
 | 
   116 
 | 
| 
 | 
   117 	public final Thread[] getThreads() {
 | 
| 
 | 
   118 		return threads;
 | 
| 
 | 
   119 	}
 | 
| 
 | 
   120 }
 |