Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 944:1d24b6e422fa
simplify SelectorManager
| author | Franklin Schmidt <fschmidt@gmail.com> |
|---|---|
| date | Tue, 11 Oct 2016 20:16:03 -0600 |
| parents | 96f60ce98949 |
| children | f5aefdc4a81a |
comparison
equal
deleted
inserted
replaced
| 943:96f60ce98949 | 944:1d24b6e422fa |
|---|---|
| 27 import java.nio.channels.Selector; | 27 import java.nio.channels.Selector; |
| 28 import java.nio.channels.ServerSocketChannel; | 28 import java.nio.channels.ServerSocketChannel; |
| 29 import java.nio.channels.SocketChannel; | 29 import java.nio.channels.SocketChannel; |
| 30 import java.util.ArrayList; | 30 import java.util.ArrayList; |
| 31 import java.util.List; | 31 import java.util.List; |
| 32 import java.util.Collections; | |
| 32 import java.util.Set; | 33 import java.util.Set; |
| 33 import java.util.concurrent.ConcurrentHashMap; | 34 import java.util.concurrent.ConcurrentHashMap; |
| 34 import java.util.concurrent.ConcurrentLinkedQueue; | 35 import java.util.concurrent.ConcurrentLinkedQueue; |
| 35 import java.util.concurrent.ConcurrentMap; | 36 import java.util.concurrent.ConcurrentMap; |
| 36 import java.util.concurrent.CountDownLatch; | 37 import java.util.concurrent.CountDownLatch; |
| 115 */ | 116 */ |
| 116 public SelectSet getSelectSet(int i) | 117 public SelectSet getSelectSet(int i) |
| 117 { | 118 { |
| 118 return _selectSet[i]; | 119 return _selectSet[i]; |
| 119 } | 120 } |
| 120 | |
| 121 /* ------------------------------------------------------------ */ | |
| 122 /** Register a channel | |
| 123 * @param channel | |
| 124 * @param att Attached Object | |
| 125 */ | |
| 126 public void register(SocketChannel channel, Object att) | |
| 127 { | |
| 128 // The ++ increment here is not atomic, but it does not matter. | |
| 129 // so long as the value changes sometimes, then connections will | |
| 130 // be distributed over the available sets. | |
| 131 | |
| 132 int s=_set++; | |
| 133 if (s<0) | |
| 134 s=-s; | |
| 135 s=s%_selectSets; | |
| 136 SelectSet[] sets=_selectSet; | |
| 137 if (sets!=null) | |
| 138 { | |
| 139 SelectSet set=sets[s]; | |
| 140 set.addChange(channel,att); | |
| 141 set.wakeup(); | |
| 142 } | |
| 143 } | |
| 144 | |
| 145 | 121 |
| 146 /* ------------------------------------------------------------ */ | 122 /* ------------------------------------------------------------ */ |
| 147 /** Register a channel | 123 /** Register a channel |
| 148 * @param channel | 124 * @param channel |
| 149 */ | 125 */ |
| 341 private boolean _pausing; | 317 private boolean _pausing; |
| 342 private boolean _paused; | 318 private boolean _paused; |
| 343 private volatile long _idleTick; | 319 private volatile long _idleTick; |
| 344 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); | 320 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); |
| 345 | 321 |
| 346 /* ------------------------------------------------------------ */ | |
| 347 SelectSet(int acceptorID) throws Exception | 322 SelectSet(int acceptorID) throws Exception |
| 348 { | 323 { |
| 349 _setID=acceptorID; | 324 _setID=acceptorID; |
| 350 | 325 |
| 351 _idleTick = System.currentTimeMillis(); | 326 _idleTick = System.currentTimeMillis(); |
| 353 // create a selector; | 328 // create a selector; |
| 354 _selector = Selector.open(); | 329 _selector = Selector.open(); |
| 355 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; | 330 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; |
| 356 } | 331 } |
| 357 | 332 |
| 358 /* ------------------------------------------------------------ */ | |
| 359 public void addChange(Object change) | 333 public void addChange(Object change) |
| 360 { | 334 { |
| 361 _changes.add(change); | 335 _changes.add(change); |
| 362 } | |
| 363 | |
| 364 /* ------------------------------------------------------------ */ | |
| 365 public void addChange(SelectableChannel channel, Object att) | |
| 366 { | |
| 367 if (att==null) | |
| 368 addChange(channel); | |
| 369 else if (att instanceof EndPoint) | |
| 370 addChange(att); | |
| 371 else | |
| 372 addChange(new ChannelAndAttachment(channel,att)); | |
| 373 } | 336 } |
| 374 | 337 |
| 375 /* ------------------------------------------------------------ */ | 338 /* ------------------------------------------------------------ */ |
| 376 /** | 339 /** |
| 377 * Select and dispatch tasks found from changes and the selector. | 340 * Select and dispatch tasks found from changes and the selector. |
| 403 // Update the operations for a key. | 366 // Update the operations for a key. |
| 404 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; | 367 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; |
| 405 ch=endpoint.getChannel(); | 368 ch=endpoint.getChannel(); |
| 406 endpoint.doUpdateKey(); | 369 endpoint.doUpdateKey(); |
| 407 } | 370 } |
| 408 else if (change instanceof ChannelAndAttachment) | |
| 409 { | |
| 410 // finish accepting/connecting this connection | |
| 411 final ChannelAndAttachment asc = (ChannelAndAttachment)change; | |
| 412 final SelectableChannel channel=asc._channel; | |
| 413 ch=channel; | |
| 414 final Object att = asc._attachment; | |
| 415 | |
| 416 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) | |
| 417 { | |
| 418 key = channel.register(selector,SelectionKey.OP_READ,att); | |
| 419 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); | |
| 420 key.attach(endpoint); | |
| 421 endpoint.schedule(); | |
| 422 } | |
| 423 else if (channel.isOpen()) | |
| 424 { | |
| 425 key = channel.register(selector,SelectionKey.OP_CONNECT,att); | |
| 426 } | |
| 427 } | |
| 428 else if (change instanceof SocketChannel) | 371 else if (change instanceof SocketChannel) |
| 429 { | 372 { |
| 430 // Newly registered channel | 373 // Newly registered channel |
| 431 final SocketChannel channel=(SocketChannel)change; | 374 final SocketChannel channel=(SocketChannel)change; |
| 432 ch=channel; | 375 ch=channel; |
| 433 key = channel.register(selector,SelectionKey.OP_READ,null); | 376 key = channel.register(selector,SelectionKey.OP_READ,null); |
| 434 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | 377 SelectChannelEndPoint endpoint = createEndPoint(channel,key); |
| 435 key.attach(endpoint); | 378 key.attach(endpoint); |
| 436 endpoint.schedule(); | 379 endpoint.schedule(); |
| 437 } | |
| 438 else if (change instanceof ChangeTask) | |
| 439 { | |
| 440 ((Runnable)change).run(); | |
| 441 } | 380 } |
| 442 else if (change instanceof Runnable) | 381 else if (change instanceof Runnable) |
| 443 { | 382 { |
| 444 execute((Runnable)change); | 383 execute((Runnable)change); |
| 445 } | 384 } |
| 668 { | 607 { |
| 669 _selecting=null; | 608 _selecting=null; |
| 670 } | 609 } |
| 671 } | 610 } |
| 672 | 611 |
| 673 | |
| 674 private void renewSelector() | |
| 675 { | |
| 676 try | |
| 677 { | |
| 678 synchronized (this) | |
| 679 { | |
| 680 Selector selector=_selector; | |
| 681 if (selector==null) | |
| 682 return; | |
| 683 final Selector new_selector = Selector.open(); | |
| 684 for (SelectionKey k: selector.keys()) | |
| 685 { | |
| 686 if (!k.isValid() || k.interestOps()==0) | |
| 687 continue; | |
| 688 | |
| 689 final SelectableChannel channel = k.channel(); | |
| 690 final Object attachment = k.attachment(); | |
| 691 | |
| 692 if (attachment==null) | |
| 693 addChange(channel); | |
| 694 else | |
| 695 addChange(channel,attachment); | |
| 696 } | |
| 697 _selector.close(); | |
| 698 _selector=new_selector; | |
| 699 } | |
| 700 } | |
| 701 catch(IOException e) | |
| 702 { | |
| 703 throw new RuntimeException("recreating selector",e); | |
| 704 } | |
| 705 } | |
| 706 | |
| 707 public SelectorManager getManager() | 612 public SelectorManager getManager() |
| 708 { | 613 { |
| 709 return SelectorManager.this; | 614 return SelectorManager.this; |
| 710 } | 615 } |
| 711 | 616 |
| 714 return _now; | 619 return _now; |
| 715 } | 620 } |
| 716 | 621 |
| 717 public void wakeup() | 622 public void wakeup() |
| 718 { | 623 { |
| 719 try | 624 Selector selector = _selector; |
| 720 { | 625 if (selector!=null) |
| 721 Selector selector = _selector; | 626 selector.wakeup(); |
| 722 if (selector!=null) | |
| 723 selector.wakeup(); | |
| 724 } | |
| 725 catch(Exception e) | |
| 726 { | |
| 727 addChange(new ChangeTask() | |
| 728 { | |
| 729 public void run() | |
| 730 { | |
| 731 renewSelector(); | |
| 732 } | |
| 733 }); | |
| 734 | |
| 735 renewSelector(); | |
| 736 } | |
| 737 } | 627 } |
| 738 | 628 |
| 739 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException | 629 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException |
| 740 { | 630 { |
| 741 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); | 631 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); |
| 768 Thread.sleep(10); | 658 Thread.sleep(10); |
| 769 } | 659 } |
| 770 } | 660 } |
| 771 catch(Exception e) | 661 catch(Exception e) |
| 772 { | 662 { |
| 773 LOG.trace("",e); | 663 LOG.warn("",e); |
| 774 } | 664 } |
| 775 | 665 |
| 776 // close endpoints and selector | 666 // close endpoints and selector |
| 777 synchronized (this) | 667 synchronized (this) |
| 778 { | 668 { |
| 804 } | 694 } |
| 805 catch (IOException e) | 695 catch (IOException e) |
| 806 { | 696 { |
| 807 LOG.trace("",e); | 697 LOG.trace("",e); |
| 808 } | 698 } |
| 809 _selector=null; | 699 _selector = null; |
| 810 } | 700 } |
| 811 } | 701 } |
| 812 | 702 |
| 813 public String dump() | 703 public String dump() |
| 814 { | 704 { |
| 816 } | 706 } |
| 817 | 707 |
| 818 public void dump(Appendable out, String indent) throws IOException | 708 public void dump(Appendable out, String indent) throws IOException |
| 819 { | 709 { |
| 820 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); | 710 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); |
| 821 | 711 AggregateLifeCycle.dump(out,indent,Collections.emptyList()); |
| 822 Thread selecting = _selecting; | |
| 823 | |
| 824 Object where = "not selecting"; | |
| 825 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); | |
| 826 if (trace!=null) | |
| 827 { | |
| 828 for (StackTraceElement t:trace) | |
| 829 if (t.getClassName().startsWith("org.eclipse.jetty.")) | |
| 830 { | |
| 831 where=t; | |
| 832 break; | |
| 833 } | |
| 834 } | |
| 835 | |
| 836 Selector selector=_selector; | |
| 837 if (selector!=null) | |
| 838 { | |
| 839 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); | |
| 840 dump.add(where); | |
| 841 | |
| 842 final CountDownLatch latch = new CountDownLatch(1); | |
| 843 | |
| 844 addChange(new ChangeTask() | |
| 845 { | |
| 846 public void run() | |
| 847 { | |
| 848 dumpKeyState(dump); | |
| 849 latch.countDown(); | |
| 850 } | |
| 851 }); | |
| 852 | |
| 853 try | |
| 854 { | |
| 855 latch.await(5,TimeUnit.SECONDS); | |
| 856 } | |
| 857 catch(InterruptedException e) | |
| 858 { | |
| 859 LOG.trace("",e); | |
| 860 } | |
| 861 | |
| 862 AggregateLifeCycle.dump(out,indent,dump); | |
| 863 } | |
| 864 } | |
| 865 | |
| 866 public void dumpKeyState(List<Object> dumpto) | |
| 867 { | |
| 868 Selector selector=_selector; | |
| 869 Set<SelectionKey> keys = selector.keys(); | |
| 870 dumpto.add(selector + " keys=" + keys.size()); | |
| 871 for (SelectionKey key: keys) | |
| 872 { | |
| 873 if (key.isValid()) | |
| 874 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); | |
| 875 else | |
| 876 dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); | |
| 877 } | |
| 878 } | 712 } |
| 879 | 713 |
| 880 public String toString() | 714 public String toString() |
| 881 { | 715 { |
| 882 Selector selector=_selector; | 716 Selector selector=_selector; |
| 885 selector != null && selector.isOpen() ? selector.keys().size() : -1, | 719 selector != null && selector.isOpen() ? selector.keys().size() : -1, |
| 886 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); | 720 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); |
| 887 } | 721 } |
| 888 } | 722 } |
| 889 | 723 |
| 890 private static class ChannelAndAttachment | |
| 891 { | |
| 892 final SelectableChannel _channel; | |
| 893 final Object _attachment; | |
| 894 | |
| 895 public ChannelAndAttachment(SelectableChannel channel, Object attachment) | |
| 896 { | |
| 897 super(); | |
| 898 _channel = channel; | |
| 899 _attachment = attachment; | |
| 900 } | |
| 901 } | |
| 902 | |
| 903 | |
| 904 private interface ChangeTask extends Runnable | |
| 905 {} | |
| 906 | |
| 907 } | 724 } |
