Mercurial Hosting > nabble
diff src/fschmidt/util/executor/AbstractThreadPool.java @ 68:00520880ad02
add fschmidt source
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 05 Oct 2025 17:24:15 -0600 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/fschmidt/util/executor/AbstractThreadPool.java Sun Oct 05 17:24:15 2025 -0600 @@ -0,0 +1,120 @@ +package fschmidt.util.executor; + +import java.util.List; +import java.util.Queue; +import java.util.LinkedList; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +abstract class AbstractThreadPool implements RunnableWrapper { + private static final Logger logger = LoggerFactory.getLogger(AbstractThreadPool.class); + + private final Thread[] threads; + private boolean isRunning = true; + private final AtomicInteger activeCount = new AtomicInteger(0); + private final List<RunnableWrapper> runnableWrappers = new CopyOnWriteArrayList<RunnableWrapper>(); + + private final Runnable worker = new Runnable() { + + public void run() { + Runnable command; + while( (command = getCommand()) != null ) { + activeCount.incrementAndGet(); + try { + command.run(); + } catch(RuntimeException e) { + logger.error("",e); + } catch(Error e) { + logger.error("",e); + } finally { + activeCount.decrementAndGet(); + } + } + } + + }; + + public AbstractThreadPool(int size) { + threads = new Thread[size]; + } + + final void start() { + for( int i=0; i<threads.length; i++ ) { + Thread thread = new Thread(worker); + thread.start(); + threads[i] = thread; + } + } + + public final void addRunnableWrapper(RunnableWrapper wrapper) { + runnableWrappers.add(wrapper); + } + + @Override final public Runnable wrap(Runnable command) { + for( RunnableWrapper wrapper : runnableWrappers ) { + command = wrapper.wrap(command); + } + return command; + } + + boolean isRunning() { + return isRunning; + } + + abstract Runnable getCommand(); + + public final synchronized void shutdown() { + isRunning = false; + notifyAll(); + } + + public final boolean isTerminated() { + for( Thread thread : threads ) { + if( thread.isAlive() ) + return false; + } + return true; + } + + public final void join() throws InterruptedException { + shutdown(); + for( Thread thread : threads ) { + thread.join(); + } + } + + public final void join(long timeoutMillis) throws InterruptedException { + long until = System.currentTimeMillis() + timeoutMillis; + long now; + int i = 0; + while( i < threads.length && (now = System.currentTimeMillis()) < until ) { + threads[i].join(until-now); + i++; + } + } + + public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long timeoutMillis = unit.toMillis(timeout); + join(timeoutMillis); + return isTerminated(); + } + + public final int getPoolSize() { + return threads.length; + } + + public final int getActiveCount() { + return activeCount.get(); + } + + public abstract int getQueueSize(); + + public final Thread[] getThreads() { + return threads; + } +}