view src/fschmidt/util/executor/AbstractThreadPool.java @ 68:00520880ad02

add fschmidt source
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 05 Oct 2025 17:24:15 -0600
parents
children
line wrap: on
line source

package fschmidt.util.executor;

import java.util.List;
import java.util.Queue;
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


abstract class AbstractThreadPool implements RunnableWrapper {
	private static final Logger logger = LoggerFactory.getLogger(AbstractThreadPool.class);

	private final Thread[] threads;
	private boolean isRunning = true;
	private final AtomicInteger activeCount = new AtomicInteger(0);
	private final List<RunnableWrapper> runnableWrappers = new CopyOnWriteArrayList<RunnableWrapper>();

	private final Runnable worker = new Runnable() {

		public void run() {
			Runnable command;
			while( (command = getCommand()) != null ) {
				activeCount.incrementAndGet();
				try {
					command.run();
				} catch(RuntimeException e) {
					logger.error("",e);
				} catch(Error e) {
					logger.error("",e);
				} finally {
					activeCount.decrementAndGet();
				}
			}
		}

	};

	public AbstractThreadPool(int size) {
		threads = new Thread[size];
	}

	final void start() {
		for( int i=0; i<threads.length; i++ ) {
			Thread thread = new Thread(worker);
			thread.start();
			threads[i] = thread;
		}
	}

	public final void addRunnableWrapper(RunnableWrapper wrapper) {
		runnableWrappers.add(wrapper);
	}

	@Override final public Runnable wrap(Runnable command) {
		for( RunnableWrapper wrapper : runnableWrappers ) {
			command = wrapper.wrap(command);
		}
		return command;
	}

	boolean isRunning() {
		return isRunning;
	}

	abstract Runnable getCommand();

	public final synchronized void shutdown() {
		isRunning = false;
		notifyAll();
	}

	public final boolean isTerminated() {
		for( Thread thread : threads ) {
			if( thread.isAlive() )
				return false;
		}
		return true;
	}

	public final void join() throws InterruptedException {
		shutdown();
		for( Thread thread : threads ) {
			thread.join();
		}
	}

	public final void join(long timeoutMillis) throws InterruptedException {
		long until = System.currentTimeMillis() + timeoutMillis;
		long now;
		int i = 0;
		while( i < threads.length && (now = System.currentTimeMillis()) < until ) {
			threads[i].join(until-now);
			i++;
		}
	}

	public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
		long timeoutMillis = unit.toMillis(timeout);
		join(timeoutMillis);
		return isTerminated();
	}

	public final int getPoolSize() {
		return threads.length;
	}

	public final int getActiveCount() {
		return activeCount.get();
	}

	public abstract int getQueueSize();

	public final Thread[] getThreads() {
		return threads;
	}
}