68
|
1 package fschmidt.util.executor;
|
|
2
|
|
3 import java.util.SortedMap;
|
|
4 import java.util.TreeMap;
|
|
5 import java.util.concurrent.Executor;
|
|
6 import java.util.concurrent.TimeUnit;
|
|
7 import java.util.concurrent.atomic.AtomicInteger;
|
|
8 import org.slf4j.Logger;
|
|
9 import org.slf4j.LoggerFactory;
|
|
10
|
|
11
|
|
12 public final class ScheduledThreadPool extends AbstractThreadPool {
|
|
13 private static final Logger logger = LoggerFactory.getLogger(ScheduledThreadPool.class);
|
|
14
|
|
15 private final SortedMap<Long,Runnable> queue = new TreeMap<Long,Runnable>();
|
|
16
|
|
17 public ScheduledThreadPool(int size) {
|
|
18 super(size);
|
|
19 start();
|
|
20 }
|
|
21
|
|
22 public final synchronized void schedule(Runnable command,long delay,TimeUnit unit) {
|
|
23 if( !isRunning() )
|
|
24 return;
|
|
25 long when = System.currentTimeMillis() + unit.toMillis(delay);
|
|
26 command = wrap(command);
|
|
27 while(true) {
|
|
28 command = queue.put(when,command);
|
|
29 if( command == null )
|
|
30 break;
|
|
31 when++;
|
|
32 }
|
|
33 notify();
|
|
34 }
|
|
35
|
|
36 @Override synchronized Runnable getCommand() {
|
|
37 while(true) {
|
|
38 try {
|
|
39 while( queue.isEmpty() ) {
|
|
40 if( !isRunning() )
|
|
41 return null;
|
|
42 wait();
|
|
43 }
|
|
44 long first = queue.firstKey();
|
|
45 long now = System.currentTimeMillis();
|
|
46 if( first <= now )
|
|
47 return queue.remove(first);
|
|
48 if( !isRunning() )
|
|
49 return null;
|
|
50 wait( first - now );
|
|
51 } catch(InterruptedException e) {
|
|
52 logger.error("",e);
|
|
53 }
|
|
54 }
|
|
55 }
|
|
56
|
|
57 @Override public final synchronized int getQueueSize() {
|
|
58 return queue.size();
|
|
59 }
|
|
60
|
|
61 }
|