Mercurial Hosting > luan
changeset 950:a778413aefc0
add SaneSelector
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 12 Oct 2016 14:37:56 -0600 |
parents | e9088af3787f |
children | e542a9cc75ef |
files | src/org/eclipse/jetty/io/nio/SaneSelector.java src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java src/org/eclipse/jetty/io/nio/SelectorManager.java src/org/eclipse/jetty/server/nio/SelectChannelConnector.java |
diffstat | 4 files changed, 99 insertions(+), 23 deletions(-) [+] |
line wrap: on
line diff
diff -r e9088af3787f -r a778413aefc0 src/org/eclipse/jetty/io/nio/SaneSelector.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/eclipse/jetty/io/nio/SaneSelector.java Wed Oct 12 14:37:56 2016 -0600 @@ -0,0 +1,72 @@ +/* +Thread synchronization in java.nio.channels.Selector is completely fucked up, unsurprisingly since NIO was developed in this demented century. This class works around the modern insanity. +*/ + +package org.eclipse.jetty.io.nio; + +import java.io.IOException; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import java.nio.channels.SelectableChannel; +import java.nio.channels.ClosedChannelException; +import java.util.Set; + + +public final class SaneSelector { + private final Selector selector; + private boolean inSelect = false; + private boolean inUpdate = false; + + public SaneSelector() throws IOException { + selector = Selector.open(); + } + + public void close() throws IOException { + selector.close(); + } + + public boolean isOpen() { + return selector.isOpen(); + } + + public int select() throws IOException { + synchronized(this) { + inSelect = true; + } + try { + while(true) { + int n = selector.select(); + synchronized(this) { + boolean wasInUpdate = inUpdate; + inUpdate = false; + if( n > 0 || !wasInUpdate ) + return n; + } + } + } finally { + synchronized(this) { + inSelect = false; + } + } + } + + public Set<SelectionKey> selectedKeys() { + return selector.selectedKeys(); + } + + public Set<SelectionKey> keys() { + return selector.keys(); + } + + public synchronized SelectionKey register(SelectableChannel channel,int ops,Object att) throws ClosedChannelException { + update(); + return channel.register(selector,ops,att); + } + + public synchronized void update() { + if( inSelect ) { + inUpdate = true; + selector.wakeup(); + } + } +} \ No newline at end of file
diff -r e9088af3787f -r a778413aefc0 src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java --- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Tue Oct 11 23:18:13 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java Wed Oct 12 14:37:56 2016 -0600 @@ -502,7 +502,7 @@ if(changed) { doUpdateKey(); - _selectSet.wakeup(); + _selectSet.getSelector().update(); } } @@ -528,7 +528,7 @@ { try { - _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this); + _key = _selectSet.getSelector().register((SelectableChannel)getChannel(),_interestOps,this); } catch (Exception e) {
diff -r e9088af3787f -r a778413aefc0 src/org/eclipse/jetty/io/nio/SelectorManager.java --- a/src/org/eclipse/jetty/io/nio/SelectorManager.java Tue Oct 11 23:18:13 2016 -0600 +++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java Wed Oct 12 14:37:56 2016 -0600 @@ -24,7 +24,6 @@ import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -110,7 +109,6 @@ { SelectSet set=sets[s]; set.addChange(channel); - set.wakeup(); } } @@ -234,7 +232,7 @@ private final int _setID; private volatile long _now = System.currentTimeMillis(); - private volatile Selector _selector; + private volatile SaneSelector _selector; private volatile Thread _selecting; private int _busySelects; @@ -251,14 +249,14 @@ _idleTick = System.currentTimeMillis(); // create a selector; - _selector = Selector.open(); + _selector = new SaneSelector(); _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; } private void addChange(SocketChannel channel) { try { - SelectionKey key = channel.register(_selector,SelectionKey.OP_READ,null); + SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null); SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); endpoint.schedule(); @@ -282,19 +280,21 @@ try { _selecting=Thread.currentThread(); - final Selector selector=_selector; + final SaneSelector selector = _selector; // Stopped concurrently ? if (selector == null) return; // Do and instant select to see if any connections can be handled. - int selected=selector.selectNow(); +// int selected = selector.selectNow(); + int selected = selector.select(); _now = System.currentTimeMillis(); - +/* // if no immediate things to do if (selected==0 && selector.selectedKeys().isEmpty()) { + // If we are in pausing mode if (_pausing) { @@ -317,6 +317,7 @@ { long before = _now; selector.select(wait); +// selector.select(10000L); _now = System.currentTimeMillis(); // If we are monitoring for busy selector @@ -340,7 +341,7 @@ } } } - +*/ // have we been destroyed while sleeping if (_selector==null || !selector.isOpen()) return; @@ -438,7 +439,7 @@ selector.selectedKeys().clear(); _now = System.currentTimeMillis(); - +/* // Idle tick if (_now-_idleTick>__IDLE_TICK) { @@ -461,7 +462,7 @@ }); } - +*/ // Reset busy select monitor counts if (__MONITOR_PERIOD>0 && _now>_monitorNext) { @@ -497,14 +498,14 @@ { return _now; } - +/* public void wakeup() { - Selector selector = _selector; + SaneSelector selector = _selector; if (selector!=null) selector.wakeup(); } - +*/ private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException { SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,this,sKey, _maxIdleTime); @@ -521,7 +522,7 @@ endp.getConnection().onClose(); } - Selector getSelector() + SaneSelector getSelector() { return _selector; } @@ -530,11 +531,12 @@ { // Spin for a while waiting for selector to complete // to avoid unneccessary closed channel exceptions +/* try { for (int i=0;i<100 && _selecting!=null;i++) { - wakeup(); + _selector.wakeup(); Thread.sleep(10); } } @@ -542,11 +544,11 @@ { LOG.warn("",e); } - +*/ // close endpoints and selector synchronized (this) { - Selector selector=_selector; + SaneSelector selector=_selector; for (SelectionKey key:selector.keys()) { if (key==null) @@ -593,7 +595,7 @@ public String toString() { - Selector selector=_selector; + SaneSelector selector=_selector; return String.format("%s keys=%d selected=%d", super.toString(), selector != null && selector.isOpen() ? selector.keys().size() : -1,
diff -r e9088af3787f -r a778413aefc0 src/org/eclipse/jetty/server/nio/SelectChannelConnector.java --- a/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java Tue Oct 11 23:18:13 2016 -0600 +++ b/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java Wed Oct 12 14:37:56 2016 -0600 @@ -72,11 +72,13 @@ if (server!=null && server.isOpen() && _manager.isStarted()) { - SocketChannel channel = server.accept(); + final SocketChannel channel = server.accept(); channel.configureBlocking(false); Socket socket = channel.socket(); configure(socket); - _manager.register(channel); + this.server.threadPool.execute(new Runnable(){public void run(){ + _manager.register(channel); + }}); } }