comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 914:54308d65265a

simplify SelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 09 Oct 2016 01:22:55 -0600
parents 6b210bb66c63
children b77d631b9e28
comparison
equal deleted inserted replaced
913:17f4fe8271de 914:54308d65265a
64 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); 64 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
65 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); 65 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
66 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); 66 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
67 67
68 private int _maxIdleTime; 68 private int _maxIdleTime;
69 private int _lowResourcesMaxIdleTime;
70 private long _lowResourcesConnections; 69 private long _lowResourcesConnections;
71 private SelectSet[] _selectSet; 70 private SelectSet[] _selectSet;
72 private int _selectSets=1; 71 private int _selectSets=1;
73 private volatile int _set=0; 72 private volatile int _set=0;
74 private boolean _deferringInterestedOps0=true;
75 private int _selectorPriorityDelta=0;
76 73
77 /* ------------------------------------------------------------ */ 74 /* ------------------------------------------------------------ */
78 /** 75 /**
79 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. 76 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
80 * @see #setLowResourcesMaxIdleTime(long) 77 * @see #setLowResourcesMaxIdleTime(long)
184 SelectSet set=_selectSet[s]; 181 SelectSet set=_selectSet[s];
185 set.addChange(acceptChannel); 182 set.addChange(acceptChannel);
186 set.wakeup(); 183 set.wakeup();
187 } 184 }
188 185
189 /* ------------------------------------------------------------ */
190 /**
191 * @return delta The value to add to the selector thread priority.
192 */
193 public int getSelectorPriorityDelta()
194 {
195 return _selectorPriorityDelta;
196 }
197
198 /* ------------------------------------------------------------ */
199 /** Set the selector thread priorty delta.
200 * @param delta The value to add to the selector thread priority.
201 */
202 public void setSelectorPriorityDelta(int delta)
203 {
204 _selectorPriorityDelta=delta;
205 }
206
207 186
208 /* ------------------------------------------------------------ */ 187 /* ------------------------------------------------------------ */
209 /** 188 /**
210 * @return the lowResourcesConnections 189 * @return the lowResourcesConnections
211 */ 190 */
224 public void setLowResourcesConnections(long lowResourcesConnections) 203 public void setLowResourcesConnections(long lowResourcesConnections)
225 { 204 {
226 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; 205 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
227 } 206 }
228 207
229 /* ------------------------------------------------------------ */ 208
230 /**
231 * @return the lowResourcesMaxIdleTime
232 */
233 public long getLowResourcesMaxIdleTime()
234 {
235 return _lowResourcesMaxIdleTime;
236 }
237
238 /* ------------------------------------------------------------ */
239 /**
240 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
241 * @see #setMaxIdleTime(long)
242 */
243 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
244 {
245 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
246 }
247
248
249 /* ------------------------------------------------------------------------------- */
250 public abstract void execute(Runnable task); 209 public abstract void execute(Runnable task);
251 210
252 /* ------------------------------------------------------------ */ 211 /* ------------------------------------------------------------ */
253 /* (non-Javadoc) 212 /* (non-Javadoc)
254 * @see org.eclipse.component.AbstractLifeCycle#doStart() 213 * @see org.eclipse.component.AbstractLifeCycle#doStart()
269 execute(new Runnable() 228 execute(new Runnable()
270 { 229 {
271 public void run() 230 public void run()
272 { 231 {
273 String name=Thread.currentThread().getName(); 232 String name=Thread.currentThread().getName();
274 int priority=Thread.currentThread().getPriority();
275 try 233 try
276 { 234 {
277 SelectSet[] sets=_selectSet; 235 SelectSet[] sets=_selectSet;
278 if (sets==null) 236 if (sets==null)
279 return; 237 return;
280 SelectSet set=sets[id]; 238 SelectSet set=sets[id];
281 239
282 Thread.currentThread().setName(name+" Selector"+id); 240 Thread.currentThread().setName(name+" Selector"+id);
283 if (getSelectorPriorityDelta()!=0)
284 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
285 LOG.debug("Starting {} on {}",Thread.currentThread(),this); 241 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
286 while (isRunning()) 242 while (isRunning())
287 { 243 {
288 try 244 try
289 { 245 {
301 } 257 }
302 finally 258 finally
303 { 259 {
304 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); 260 LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
305 Thread.currentThread().setName(name); 261 Thread.currentThread().setName(name);
306 if (getSelectorPriorityDelta()!=0)
307 Thread.currentThread().setPriority(priority);
308 } 262 }
309 } 263 }
310 264
311 }); 265 });
312 } 266 }
333 /* ------------------------------------------------------------ */ 287 /* ------------------------------------------------------------ */
334 /** 288 /**
335 * @param endpoint 289 * @param endpoint
336 */ 290 */
337 protected abstract void endPointClosed(SelectChannelEndPoint endpoint); 291 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
338
339 /* ------------------------------------------------------------ */
340 /**
341 * @param endpoint
342 */
343 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
344
345 /* ------------------------------------------------------------ */
346 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
347 292
348 /* ------------------------------------------------------------------------------- */ 293 /* ------------------------------------------------------------------------------- */
349 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); 294 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
350 295
351 /* ------------------------------------------------------------ */ 296 /* ------------------------------------------------------------ */
698 if (now-_idleTick>__IDLE_TICK) 643 if (now-_idleTick>__IDLE_TICK)
699 { 644 {
700 _idleTick=now; 645 _idleTick=now;
701 646
702 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) 647 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
703 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) 648 ?(now+_maxIdleTime)
704 :now; 649 :now;
705 650
706 execute(new Runnable() 651 execute(new Runnable()
707 { 652 {
708 public void run() 653 public void run()
742 _selecting=null; 687 _selecting=null;
743 } 688 }
744 } 689 }
745 690
746 691
747 /* ------------------------------------------------------------ */
748 private void renewSelector() 692 private void renewSelector()
749 { 693 {
750 try 694 try
751 { 695 {
752 synchronized (this) 696 synchronized (this)
776 { 720 {
777 throw new RuntimeException("recreating selector",e); 721 throw new RuntimeException("recreating selector",e);
778 } 722 }
779 } 723 }
780 724
781 /* ------------------------------------------------------------ */
782 public SelectorManager getManager() 725 public SelectorManager getManager()
783 { 726 {
784 return SelectorManager.this; 727 return SelectorManager.this;
785 } 728 }
786 729
787 /* ------------------------------------------------------------ */
788 public long getNow() 730 public long getNow()
789 { 731 {
790 return _timeout.getNow(); 732 return _timeout.getNow();
791 } 733 }
792 734
802 if (!(task instanceof Runnable)) 744 if (!(task instanceof Runnable))
803 throw new IllegalArgumentException("!Runnable"); 745 throw new IllegalArgumentException("!Runnable");
804 _timeout.schedule(task, timeoutMs); 746 _timeout.schedule(task, timeoutMs);
805 } 747 }
806 748
807 /* ------------------------------------------------------------ */
808 public void cancelTimeout(Timeout.Task task) 749 public void cancelTimeout(Timeout.Task task)
809 { 750 {
810 task.cancel(); 751 task.cancel();
811 } 752 }
812 753
813 /* ------------------------------------------------------------ */
814 public void wakeup() 754 public void wakeup()
815 { 755 {
816 try 756 try
817 { 757 {
818 Selector selector = _selector; 758 Selector selector = _selector;
831 771
832 renewSelector(); 772 renewSelector();
833 } 773 }
834 } 774 }
835 775
836 /* ------------------------------------------------------------ */
837 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException 776 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
838 { 777 {
839 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); 778 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
840 LOG.debug("created {}",endp); 779 LOG.debug("created {}",endp);
841 endPointOpened(endp);
842 _endPoints.put(endp,this); 780 _endPoints.put(endp,this);
843 return endp; 781 return endp;
844 } 782 }
845 783
846 /* ------------------------------------------------------------ */
847 public void destroyEndPoint(SelectChannelEndPoint endp) 784 public void destroyEndPoint(SelectChannelEndPoint endp)
848 { 785 {
849 LOG.debug("destroyEndPoint {}",endp); 786 LOG.debug("destroyEndPoint {}",endp);
850 _endPoints.remove(endp); 787 _endPoints.remove(endp);
851 endPointClosed(endp); 788 endPointClosed(endp);
852 } 789 }
853 790
854 /* ------------------------------------------------------------ */
855 Selector getSelector() 791 Selector getSelector()
856 { 792 {
857 return _selector; 793 return _selector;
858 } 794 }
859 795
860 /* ------------------------------------------------------------ */
861 void stop() throws Exception 796 void stop() throws Exception
862 { 797 {
863 // Spin for a while waiting for selector to complete 798 // Spin for a while waiting for selector to complete
864 // to avoid unneccessary closed channel exceptions 799 // to avoid unneccessary closed channel exceptions
865 try 800 try
912 } 847 }
913 _selector=null; 848 _selector=null;
914 } 849 }
915 } 850 }
916 851
917 /* ------------------------------------------------------------ */
918 public String dump() 852 public String dump()
919 { 853 {
920 return AggregateLifeCycle.dump(this); 854 return AggregateLifeCycle.dump(this);
921 } 855 }
922 856
923 /* ------------------------------------------------------------ */
924 public void dump(Appendable out, String indent) throws IOException 857 public void dump(Appendable out, String indent) throws IOException
925 { 858 {
926 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); 859 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
927 860
928 Thread selecting = _selecting; 861 Thread selecting = _selecting;
967 900
968 AggregateLifeCycle.dump(out,indent,dump); 901 AggregateLifeCycle.dump(out,indent,dump);
969 } 902 }
970 } 903 }
971 904
972 /* ------------------------------------------------------------ */
973 public void dumpKeyState(List<Object> dumpto) 905 public void dumpKeyState(List<Object> dumpto)
974 { 906 {
975 Selector selector=_selector; 907 Selector selector=_selector;
976 Set<SelectionKey> keys = selector.keys(); 908 Set<SelectionKey> keys = selector.keys();
977 dumpto.add(selector + " keys=" + keys.size()); 909 dumpto.add(selector + " keys=" + keys.size());
982 else 914 else
983 dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); 915 dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
984 } 916 }
985 } 917 }
986 918
987 /* ------------------------------------------------------------ */
988 public String toString() 919 public String toString()
989 { 920 {
990 Selector selector=_selector; 921 Selector selector=_selector;
991 return String.format("%s keys=%d selected=%d", 922 return String.format("%s keys=%d selected=%d",
992 super.toString(), 923 super.toString(),
993 selector != null && selector.isOpen() ? selector.keys().size() : -1, 924 selector != null && selector.isOpen() ? selector.keys().size() : -1,
994 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); 925 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
995 } 926 }
996 } 927 }
997 928
998 /* ------------------------------------------------------------ */
999 private static class ChannelAndAttachment 929 private static class ChannelAndAttachment
1000 { 930 {
1001 final SelectableChannel _channel; 931 final SelectableChannel _channel;
1002 final Object _attachment; 932 final Object _attachment;
1003 933
1007 _channel = channel; 937 _channel = channel;
1008 _attachment = attachment; 938 _attachment = attachment;
1009 } 939 }
1010 } 940 }
1011 941
1012 /* ------------------------------------------------------------ */ 942
1013 public boolean isDeferringInterestedOps0()
1014 {
1015 return _deferringInterestedOps0;
1016 }
1017
1018 /* ------------------------------------------------------------ */
1019 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1020 {
1021 _deferringInterestedOps0 = deferringInterestedOps0;
1022 }
1023
1024
1025 /* ------------------------------------------------------------ */
1026 /* ------------------------------------------------------------ */
1027 /* ------------------------------------------------------------ */
1028 private interface ChangeTask extends Runnable 943 private interface ChangeTask extends Runnable
1029 {} 944 {}
1030 945
1031 } 946 }