Mercurial Hosting > nabble
view src/fschmidt/util/executor/AbstractThreadPool.java @ 68:00520880ad02
add fschmidt source
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 05 Oct 2025 17:24:15 -0600 |
parents | |
children |
line wrap: on
line source
package fschmidt.util.executor; import java.util.List; import java.util.Queue; import java.util.LinkedList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class AbstractThreadPool implements RunnableWrapper { private static final Logger logger = LoggerFactory.getLogger(AbstractThreadPool.class); private final Thread[] threads; private boolean isRunning = true; private final AtomicInteger activeCount = new AtomicInteger(0); private final List<RunnableWrapper> runnableWrappers = new CopyOnWriteArrayList<RunnableWrapper>(); private final Runnable worker = new Runnable() { public void run() { Runnable command; while( (command = getCommand()) != null ) { activeCount.incrementAndGet(); try { command.run(); } catch(RuntimeException e) { logger.error("",e); } catch(Error e) { logger.error("",e); } finally { activeCount.decrementAndGet(); } } } }; public AbstractThreadPool(int size) { threads = new Thread[size]; } final void start() { for( int i=0; i<threads.length; i++ ) { Thread thread = new Thread(worker); thread.start(); threads[i] = thread; } } public final void addRunnableWrapper(RunnableWrapper wrapper) { runnableWrappers.add(wrapper); } @Override final public Runnable wrap(Runnable command) { for( RunnableWrapper wrapper : runnableWrappers ) { command = wrapper.wrap(command); } return command; } boolean isRunning() { return isRunning; } abstract Runnable getCommand(); public final synchronized void shutdown() { isRunning = false; notifyAll(); } public final boolean isTerminated() { for( Thread thread : threads ) { if( thread.isAlive() ) return false; } return true; } public final void join() throws InterruptedException { shutdown(); for( Thread thread : threads ) { thread.join(); } } public final void join(long timeoutMillis) throws InterruptedException { long until = System.currentTimeMillis() + timeoutMillis; long now; int i = 0; while( i < threads.length && (now = System.currentTimeMillis()) < until ) { threads[i].join(until-now); i++; } } public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long timeoutMillis = unit.toMillis(timeout); join(timeoutMillis); return isTerminated(); } public final int getPoolSize() { return threads.length; } public final int getActiveCount() { return activeCount.get(); } public abstract int getQueueSize(); public final Thread[] getThreads() { return threads; } }