Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 944:1d24b6e422fa
simplify SelectorManager
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Tue, 11 Oct 2016 20:16:03 -0600 |
parents | 96f60ce98949 |
children | f5aefdc4a81a |
comparison
equal
deleted
inserted
replaced
943:96f60ce98949 | 944:1d24b6e422fa |
---|---|
27 import java.nio.channels.Selector; | 27 import java.nio.channels.Selector; |
28 import java.nio.channels.ServerSocketChannel; | 28 import java.nio.channels.ServerSocketChannel; |
29 import java.nio.channels.SocketChannel; | 29 import java.nio.channels.SocketChannel; |
30 import java.util.ArrayList; | 30 import java.util.ArrayList; |
31 import java.util.List; | 31 import java.util.List; |
32 import java.util.Collections; | |
32 import java.util.Set; | 33 import java.util.Set; |
33 import java.util.concurrent.ConcurrentHashMap; | 34 import java.util.concurrent.ConcurrentHashMap; |
34 import java.util.concurrent.ConcurrentLinkedQueue; | 35 import java.util.concurrent.ConcurrentLinkedQueue; |
35 import java.util.concurrent.ConcurrentMap; | 36 import java.util.concurrent.ConcurrentMap; |
36 import java.util.concurrent.CountDownLatch; | 37 import java.util.concurrent.CountDownLatch; |
115 */ | 116 */ |
116 public SelectSet getSelectSet(int i) | 117 public SelectSet getSelectSet(int i) |
117 { | 118 { |
118 return _selectSet[i]; | 119 return _selectSet[i]; |
119 } | 120 } |
120 | |
121 /* ------------------------------------------------------------ */ | |
122 /** Register a channel | |
123 * @param channel | |
124 * @param att Attached Object | |
125 */ | |
126 public void register(SocketChannel channel, Object att) | |
127 { | |
128 // The ++ increment here is not atomic, but it does not matter. | |
129 // so long as the value changes sometimes, then connections will | |
130 // be distributed over the available sets. | |
131 | |
132 int s=_set++; | |
133 if (s<0) | |
134 s=-s; | |
135 s=s%_selectSets; | |
136 SelectSet[] sets=_selectSet; | |
137 if (sets!=null) | |
138 { | |
139 SelectSet set=sets[s]; | |
140 set.addChange(channel,att); | |
141 set.wakeup(); | |
142 } | |
143 } | |
144 | |
145 | 121 |
146 /* ------------------------------------------------------------ */ | 122 /* ------------------------------------------------------------ */ |
147 /** Register a channel | 123 /** Register a channel |
148 * @param channel | 124 * @param channel |
149 */ | 125 */ |
341 private boolean _pausing; | 317 private boolean _pausing; |
342 private boolean _paused; | 318 private boolean _paused; |
343 private volatile long _idleTick; | 319 private volatile long _idleTick; |
344 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); | 320 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); |
345 | 321 |
346 /* ------------------------------------------------------------ */ | |
347 SelectSet(int acceptorID) throws Exception | 322 SelectSet(int acceptorID) throws Exception |
348 { | 323 { |
349 _setID=acceptorID; | 324 _setID=acceptorID; |
350 | 325 |
351 _idleTick = System.currentTimeMillis(); | 326 _idleTick = System.currentTimeMillis(); |
353 // create a selector; | 328 // create a selector; |
354 _selector = Selector.open(); | 329 _selector = Selector.open(); |
355 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; | 330 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; |
356 } | 331 } |
357 | 332 |
358 /* ------------------------------------------------------------ */ | |
359 public void addChange(Object change) | 333 public void addChange(Object change) |
360 { | 334 { |
361 _changes.add(change); | 335 _changes.add(change); |
362 } | |
363 | |
364 /* ------------------------------------------------------------ */ | |
365 public void addChange(SelectableChannel channel, Object att) | |
366 { | |
367 if (att==null) | |
368 addChange(channel); | |
369 else if (att instanceof EndPoint) | |
370 addChange(att); | |
371 else | |
372 addChange(new ChannelAndAttachment(channel,att)); | |
373 } | 336 } |
374 | 337 |
375 /* ------------------------------------------------------------ */ | 338 /* ------------------------------------------------------------ */ |
376 /** | 339 /** |
377 * Select and dispatch tasks found from changes and the selector. | 340 * Select and dispatch tasks found from changes and the selector. |
403 // Update the operations for a key. | 366 // Update the operations for a key. |
404 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; | 367 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; |
405 ch=endpoint.getChannel(); | 368 ch=endpoint.getChannel(); |
406 endpoint.doUpdateKey(); | 369 endpoint.doUpdateKey(); |
407 } | 370 } |
408 else if (change instanceof ChannelAndAttachment) | |
409 { | |
410 // finish accepting/connecting this connection | |
411 final ChannelAndAttachment asc = (ChannelAndAttachment)change; | |
412 final SelectableChannel channel=asc._channel; | |
413 ch=channel; | |
414 final Object att = asc._attachment; | |
415 | |
416 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) | |
417 { | |
418 key = channel.register(selector,SelectionKey.OP_READ,att); | |
419 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); | |
420 key.attach(endpoint); | |
421 endpoint.schedule(); | |
422 } | |
423 else if (channel.isOpen()) | |
424 { | |
425 key = channel.register(selector,SelectionKey.OP_CONNECT,att); | |
426 } | |
427 } | |
428 else if (change instanceof SocketChannel) | 371 else if (change instanceof SocketChannel) |
429 { | 372 { |
430 // Newly registered channel | 373 // Newly registered channel |
431 final SocketChannel channel=(SocketChannel)change; | 374 final SocketChannel channel=(SocketChannel)change; |
432 ch=channel; | 375 ch=channel; |
433 key = channel.register(selector,SelectionKey.OP_READ,null); | 376 key = channel.register(selector,SelectionKey.OP_READ,null); |
434 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | 377 SelectChannelEndPoint endpoint = createEndPoint(channel,key); |
435 key.attach(endpoint); | 378 key.attach(endpoint); |
436 endpoint.schedule(); | 379 endpoint.schedule(); |
437 } | |
438 else if (change instanceof ChangeTask) | |
439 { | |
440 ((Runnable)change).run(); | |
441 } | 380 } |
442 else if (change instanceof Runnable) | 381 else if (change instanceof Runnable) |
443 { | 382 { |
444 execute((Runnable)change); | 383 execute((Runnable)change); |
445 } | 384 } |
668 { | 607 { |
669 _selecting=null; | 608 _selecting=null; |
670 } | 609 } |
671 } | 610 } |
672 | 611 |
673 | |
674 private void renewSelector() | |
675 { | |
676 try | |
677 { | |
678 synchronized (this) | |
679 { | |
680 Selector selector=_selector; | |
681 if (selector==null) | |
682 return; | |
683 final Selector new_selector = Selector.open(); | |
684 for (SelectionKey k: selector.keys()) | |
685 { | |
686 if (!k.isValid() || k.interestOps()==0) | |
687 continue; | |
688 | |
689 final SelectableChannel channel = k.channel(); | |
690 final Object attachment = k.attachment(); | |
691 | |
692 if (attachment==null) | |
693 addChange(channel); | |
694 else | |
695 addChange(channel,attachment); | |
696 } | |
697 _selector.close(); | |
698 _selector=new_selector; | |
699 } | |
700 } | |
701 catch(IOException e) | |
702 { | |
703 throw new RuntimeException("recreating selector",e); | |
704 } | |
705 } | |
706 | |
707 public SelectorManager getManager() | 612 public SelectorManager getManager() |
708 { | 613 { |
709 return SelectorManager.this; | 614 return SelectorManager.this; |
710 } | 615 } |
711 | 616 |
714 return _now; | 619 return _now; |
715 } | 620 } |
716 | 621 |
717 public void wakeup() | 622 public void wakeup() |
718 { | 623 { |
719 try | 624 Selector selector = _selector; |
720 { | 625 if (selector!=null) |
721 Selector selector = _selector; | 626 selector.wakeup(); |
722 if (selector!=null) | |
723 selector.wakeup(); | |
724 } | |
725 catch(Exception e) | |
726 { | |
727 addChange(new ChangeTask() | |
728 { | |
729 public void run() | |
730 { | |
731 renewSelector(); | |
732 } | |
733 }); | |
734 | |
735 renewSelector(); | |
736 } | |
737 } | 627 } |
738 | 628 |
739 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException | 629 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException |
740 { | 630 { |
741 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); | 631 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); |
768 Thread.sleep(10); | 658 Thread.sleep(10); |
769 } | 659 } |
770 } | 660 } |
771 catch(Exception e) | 661 catch(Exception e) |
772 { | 662 { |
773 LOG.trace("",e); | 663 LOG.warn("",e); |
774 } | 664 } |
775 | 665 |
776 // close endpoints and selector | 666 // close endpoints and selector |
777 synchronized (this) | 667 synchronized (this) |
778 { | 668 { |
804 } | 694 } |
805 catch (IOException e) | 695 catch (IOException e) |
806 { | 696 { |
807 LOG.trace("",e); | 697 LOG.trace("",e); |
808 } | 698 } |
809 _selector=null; | 699 _selector = null; |
810 } | 700 } |
811 } | 701 } |
812 | 702 |
813 public String dump() | 703 public String dump() |
814 { | 704 { |
816 } | 706 } |
817 | 707 |
818 public void dump(Appendable out, String indent) throws IOException | 708 public void dump(Appendable out, String indent) throws IOException |
819 { | 709 { |
820 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); | 710 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); |
821 | 711 AggregateLifeCycle.dump(out,indent,Collections.emptyList()); |
822 Thread selecting = _selecting; | |
823 | |
824 Object where = "not selecting"; | |
825 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); | |
826 if (trace!=null) | |
827 { | |
828 for (StackTraceElement t:trace) | |
829 if (t.getClassName().startsWith("org.eclipse.jetty.")) | |
830 { | |
831 where=t; | |
832 break; | |
833 } | |
834 } | |
835 | |
836 Selector selector=_selector; | |
837 if (selector!=null) | |
838 { | |
839 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); | |
840 dump.add(where); | |
841 | |
842 final CountDownLatch latch = new CountDownLatch(1); | |
843 | |
844 addChange(new ChangeTask() | |
845 { | |
846 public void run() | |
847 { | |
848 dumpKeyState(dump); | |
849 latch.countDown(); | |
850 } | |
851 }); | |
852 | |
853 try | |
854 { | |
855 latch.await(5,TimeUnit.SECONDS); | |
856 } | |
857 catch(InterruptedException e) | |
858 { | |
859 LOG.trace("",e); | |
860 } | |
861 | |
862 AggregateLifeCycle.dump(out,indent,dump); | |
863 } | |
864 } | |
865 | |
866 public void dumpKeyState(List<Object> dumpto) | |
867 { | |
868 Selector selector=_selector; | |
869 Set<SelectionKey> keys = selector.keys(); | |
870 dumpto.add(selector + " keys=" + keys.size()); | |
871 for (SelectionKey key: keys) | |
872 { | |
873 if (key.isValid()) | |
874 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); | |
875 else | |
876 dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); | |
877 } | |
878 } | 712 } |
879 | 713 |
880 public String toString() | 714 public String toString() |
881 { | 715 { |
882 Selector selector=_selector; | 716 Selector selector=_selector; |
885 selector != null && selector.isOpen() ? selector.keys().size() : -1, | 719 selector != null && selector.isOpen() ? selector.keys().size() : -1, |
886 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); | 720 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); |
887 } | 721 } |
888 } | 722 } |
889 | 723 |
890 private static class ChannelAndAttachment | |
891 { | |
892 final SelectableChannel _channel; | |
893 final Object _attachment; | |
894 | |
895 public ChannelAndAttachment(SelectableChannel channel, Object attachment) | |
896 { | |
897 super(); | |
898 _channel = channel; | |
899 _attachment = attachment; | |
900 } | |
901 } | |
902 | |
903 | |
904 private interface ChangeTask extends Runnable | |
905 {} | |
906 | |
907 } | 724 } |