diff src/fschmidt/util/executor/AbstractThreadPool.java @ 68:00520880ad02

add fschmidt source
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 05 Oct 2025 17:24:15 -0600
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fschmidt/util/executor/AbstractThreadPool.java	Sun Oct 05 17:24:15 2025 -0600
@@ -0,0 +1,120 @@
+package fschmidt.util.executor;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+abstract class AbstractThreadPool implements RunnableWrapper {
+	private static final Logger logger = LoggerFactory.getLogger(AbstractThreadPool.class);
+
+	private final Thread[] threads;
+	private boolean isRunning = true;
+	private final AtomicInteger activeCount = new AtomicInteger(0);
+	private final List<RunnableWrapper> runnableWrappers = new CopyOnWriteArrayList<RunnableWrapper>();
+
+	private final Runnable worker = new Runnable() {
+
+		public void run() {
+			Runnable command;
+			while( (command = getCommand()) != null ) {
+				activeCount.incrementAndGet();
+				try {
+					command.run();
+				} catch(RuntimeException e) {
+					logger.error("",e);
+				} catch(Error e) {
+					logger.error("",e);
+				} finally {
+					activeCount.decrementAndGet();
+				}
+			}
+		}
+
+	};
+
+	public AbstractThreadPool(int size) {
+		threads = new Thread[size];
+	}
+
+	final void start() {
+		for( int i=0; i<threads.length; i++ ) {
+			Thread thread = new Thread(worker);
+			thread.start();
+			threads[i] = thread;
+		}
+	}
+
+	public final void addRunnableWrapper(RunnableWrapper wrapper) {
+		runnableWrappers.add(wrapper);
+	}
+
+	@Override final public Runnable wrap(Runnable command) {
+		for( RunnableWrapper wrapper : runnableWrappers ) {
+			command = wrapper.wrap(command);
+		}
+		return command;
+	}
+
+	boolean isRunning() {
+		return isRunning;
+	}
+
+	abstract Runnable getCommand();
+
+	public final synchronized void shutdown() {
+		isRunning = false;
+		notifyAll();
+	}
+
+	public final boolean isTerminated() {
+		for( Thread thread : threads ) {
+			if( thread.isAlive() )
+				return false;
+		}
+		return true;
+	}
+
+	public final void join() throws InterruptedException {
+		shutdown();
+		for( Thread thread : threads ) {
+			thread.join();
+		}
+	}
+
+	public final void join(long timeoutMillis) throws InterruptedException {
+		long until = System.currentTimeMillis() + timeoutMillis;
+		long now;
+		int i = 0;
+		while( i < threads.length && (now = System.currentTimeMillis()) < until ) {
+			threads[i].join(until-now);
+			i++;
+		}
+	}
+
+	public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+		long timeoutMillis = unit.toMillis(timeout);
+		join(timeoutMillis);
+		return isTerminated();
+	}
+
+	public final int getPoolSize() {
+		return threads.length;
+	}
+
+	public final int getActiveCount() {
+		return activeCount.get();
+	}
+
+	public abstract int getQueueSize();
+
+	public final Thread[] getThreads() {
+		return threads;
+	}
+}