comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 944:1d24b6e422fa

simplify SelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Tue, 11 Oct 2016 20:16:03 -0600
parents 96f60ce98949
children f5aefdc4a81a
comparison
equal deleted inserted replaced
943:96f60ce98949 944:1d24b6e422fa
27 import java.nio.channels.Selector; 27 import java.nio.channels.Selector;
28 import java.nio.channels.ServerSocketChannel; 28 import java.nio.channels.ServerSocketChannel;
29 import java.nio.channels.SocketChannel; 29 import java.nio.channels.SocketChannel;
30 import java.util.ArrayList; 30 import java.util.ArrayList;
31 import java.util.List; 31 import java.util.List;
32 import java.util.Collections;
32 import java.util.Set; 33 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap; 34 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedQueue; 35 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.ConcurrentMap; 36 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.CountDownLatch; 37 import java.util.concurrent.CountDownLatch;
115 */ 116 */
116 public SelectSet getSelectSet(int i) 117 public SelectSet getSelectSet(int i)
117 { 118 {
118 return _selectSet[i]; 119 return _selectSet[i];
119 } 120 }
120
121 /* ------------------------------------------------------------ */
122 /** Register a channel
123 * @param channel
124 * @param att Attached Object
125 */
126 public void register(SocketChannel channel, Object att)
127 {
128 // The ++ increment here is not atomic, but it does not matter.
129 // so long as the value changes sometimes, then connections will
130 // be distributed over the available sets.
131
132 int s=_set++;
133 if (s<0)
134 s=-s;
135 s=s%_selectSets;
136 SelectSet[] sets=_selectSet;
137 if (sets!=null)
138 {
139 SelectSet set=sets[s];
140 set.addChange(channel,att);
141 set.wakeup();
142 }
143 }
144
145 121
146 /* ------------------------------------------------------------ */ 122 /* ------------------------------------------------------------ */
147 /** Register a channel 123 /** Register a channel
148 * @param channel 124 * @param channel
149 */ 125 */
341 private boolean _pausing; 317 private boolean _pausing;
342 private boolean _paused; 318 private boolean _paused;
343 private volatile long _idleTick; 319 private volatile long _idleTick;
344 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); 320 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
345 321
346 /* ------------------------------------------------------------ */
347 SelectSet(int acceptorID) throws Exception 322 SelectSet(int acceptorID) throws Exception
348 { 323 {
349 _setID=acceptorID; 324 _setID=acceptorID;
350 325
351 _idleTick = System.currentTimeMillis(); 326 _idleTick = System.currentTimeMillis();
353 // create a selector; 328 // create a selector;
354 _selector = Selector.open(); 329 _selector = Selector.open();
355 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; 330 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
356 } 331 }
357 332
358 /* ------------------------------------------------------------ */
359 public void addChange(Object change) 333 public void addChange(Object change)
360 { 334 {
361 _changes.add(change); 335 _changes.add(change);
362 }
363
364 /* ------------------------------------------------------------ */
365 public void addChange(SelectableChannel channel, Object att)
366 {
367 if (att==null)
368 addChange(channel);
369 else if (att instanceof EndPoint)
370 addChange(att);
371 else
372 addChange(new ChannelAndAttachment(channel,att));
373 } 336 }
374 337
375 /* ------------------------------------------------------------ */ 338 /* ------------------------------------------------------------ */
376 /** 339 /**
377 * Select and dispatch tasks found from changes and the selector. 340 * Select and dispatch tasks found from changes and the selector.
403 // Update the operations for a key. 366 // Update the operations for a key.
404 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; 367 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
405 ch=endpoint.getChannel(); 368 ch=endpoint.getChannel();
406 endpoint.doUpdateKey(); 369 endpoint.doUpdateKey();
407 } 370 }
408 else if (change instanceof ChannelAndAttachment)
409 {
410 // finish accepting/connecting this connection
411 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
412 final SelectableChannel channel=asc._channel;
413 ch=channel;
414 final Object att = asc._attachment;
415
416 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
417 {
418 key = channel.register(selector,SelectionKey.OP_READ,att);
419 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
420 key.attach(endpoint);
421 endpoint.schedule();
422 }
423 else if (channel.isOpen())
424 {
425 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
426 }
427 }
428 else if (change instanceof SocketChannel) 371 else if (change instanceof SocketChannel)
429 { 372 {
430 // Newly registered channel 373 // Newly registered channel
431 final SocketChannel channel=(SocketChannel)change; 374 final SocketChannel channel=(SocketChannel)change;
432 ch=channel; 375 ch=channel;
433 key = channel.register(selector,SelectionKey.OP_READ,null); 376 key = channel.register(selector,SelectionKey.OP_READ,null);
434 SelectChannelEndPoint endpoint = createEndPoint(channel,key); 377 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
435 key.attach(endpoint); 378 key.attach(endpoint);
436 endpoint.schedule(); 379 endpoint.schedule();
437 }
438 else if (change instanceof ChangeTask)
439 {
440 ((Runnable)change).run();
441 } 380 }
442 else if (change instanceof Runnable) 381 else if (change instanceof Runnable)
443 { 382 {
444 execute((Runnable)change); 383 execute((Runnable)change);
445 } 384 }
668 { 607 {
669 _selecting=null; 608 _selecting=null;
670 } 609 }
671 } 610 }
672 611
673
674 private void renewSelector()
675 {
676 try
677 {
678 synchronized (this)
679 {
680 Selector selector=_selector;
681 if (selector==null)
682 return;
683 final Selector new_selector = Selector.open();
684 for (SelectionKey k: selector.keys())
685 {
686 if (!k.isValid() || k.interestOps()==0)
687 continue;
688
689 final SelectableChannel channel = k.channel();
690 final Object attachment = k.attachment();
691
692 if (attachment==null)
693 addChange(channel);
694 else
695 addChange(channel,attachment);
696 }
697 _selector.close();
698 _selector=new_selector;
699 }
700 }
701 catch(IOException e)
702 {
703 throw new RuntimeException("recreating selector",e);
704 }
705 }
706
707 public SelectorManager getManager() 612 public SelectorManager getManager()
708 { 613 {
709 return SelectorManager.this; 614 return SelectorManager.this;
710 } 615 }
711 616
714 return _now; 619 return _now;
715 } 620 }
716 621
717 public void wakeup() 622 public void wakeup()
718 { 623 {
719 try 624 Selector selector = _selector;
720 { 625 if (selector!=null)
721 Selector selector = _selector; 626 selector.wakeup();
722 if (selector!=null)
723 selector.wakeup();
724 }
725 catch(Exception e)
726 {
727 addChange(new ChangeTask()
728 {
729 public void run()
730 {
731 renewSelector();
732 }
733 });
734
735 renewSelector();
736 }
737 } 627 }
738 628
739 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException 629 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
740 { 630 {
741 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); 631 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
768 Thread.sleep(10); 658 Thread.sleep(10);
769 } 659 }
770 } 660 }
771 catch(Exception e) 661 catch(Exception e)
772 { 662 {
773 LOG.trace("",e); 663 LOG.warn("",e);
774 } 664 }
775 665
776 // close endpoints and selector 666 // close endpoints and selector
777 synchronized (this) 667 synchronized (this)
778 { 668 {
804 } 694 }
805 catch (IOException e) 695 catch (IOException e)
806 { 696 {
807 LOG.trace("",e); 697 LOG.trace("",e);
808 } 698 }
809 _selector=null; 699 _selector = null;
810 } 700 }
811 } 701 }
812 702
813 public String dump() 703 public String dump()
814 { 704 {
816 } 706 }
817 707
818 public void dump(Appendable out, String indent) throws IOException 708 public void dump(Appendable out, String indent) throws IOException
819 { 709 {
820 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); 710 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
821 711 AggregateLifeCycle.dump(out,indent,Collections.emptyList());
822 Thread selecting = _selecting;
823
824 Object where = "not selecting";
825 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
826 if (trace!=null)
827 {
828 for (StackTraceElement t:trace)
829 if (t.getClassName().startsWith("org.eclipse.jetty."))
830 {
831 where=t;
832 break;
833 }
834 }
835
836 Selector selector=_selector;
837 if (selector!=null)
838 {
839 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
840 dump.add(where);
841
842 final CountDownLatch latch = new CountDownLatch(1);
843
844 addChange(new ChangeTask()
845 {
846 public void run()
847 {
848 dumpKeyState(dump);
849 latch.countDown();
850 }
851 });
852
853 try
854 {
855 latch.await(5,TimeUnit.SECONDS);
856 }
857 catch(InterruptedException e)
858 {
859 LOG.trace("",e);
860 }
861
862 AggregateLifeCycle.dump(out,indent,dump);
863 }
864 }
865
866 public void dumpKeyState(List<Object> dumpto)
867 {
868 Selector selector=_selector;
869 Set<SelectionKey> keys = selector.keys();
870 dumpto.add(selector + " keys=" + keys.size());
871 for (SelectionKey key: keys)
872 {
873 if (key.isValid())
874 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
875 else
876 dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
877 }
878 } 712 }
879 713
880 public String toString() 714 public String toString()
881 { 715 {
882 Selector selector=_selector; 716 Selector selector=_selector;
885 selector != null && selector.isOpen() ? selector.keys().size() : -1, 719 selector != null && selector.isOpen() ? selector.keys().size() : -1,
886 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); 720 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
887 } 721 }
888 } 722 }
889 723
890 private static class ChannelAndAttachment
891 {
892 final SelectableChannel _channel;
893 final Object _attachment;
894
895 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
896 {
897 super();
898 _channel = channel;
899 _attachment = attachment;
900 }
901 }
902
903
904 private interface ChangeTask extends Runnable
905 {}
906
907 } 724 }