changeset 950:a778413aefc0

add SaneSelector
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 12 Oct 2016 14:37:56 -0600 (2016-10-12)
parents e9088af3787f
children e542a9cc75ef
files src/org/eclipse/jetty/io/nio/SaneSelector.java src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java src/org/eclipse/jetty/io/nio/SelectorManager.java src/org/eclipse/jetty/server/nio/SelectChannelConnector.java
diffstat 4 files changed, 99 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/org/eclipse/jetty/io/nio/SaneSelector.java	Wed Oct 12 14:37:56 2016 -0600
@@ -0,0 +1,72 @@
+/*
+Thread synchronization in java.nio.channels.Selector is completely fucked up, unsurprisingly since NIO was developed in this demented century.  This class works around the modern insanity.
+*/
+
+package org.eclipse.jetty.io.nio;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.ClosedChannelException;
+import java.util.Set;
+
+
+public final class SaneSelector {
+	private final Selector selector;
+	private boolean inSelect = false;
+	private boolean inUpdate = false;
+
+	public SaneSelector() throws IOException {
+		selector = Selector.open();
+	}
+
+	public void close() throws IOException {
+		selector.close();
+	}
+
+	public boolean isOpen() {
+		return selector.isOpen();
+	}
+
+	public int select() throws IOException {
+		synchronized(this) {
+			inSelect = true;
+		}
+		try {
+			while(true) {
+				int n = selector.select();
+				synchronized(this) {
+					boolean wasInUpdate = inUpdate;
+					inUpdate = false;
+					if( n > 0 || !wasInUpdate )
+						return n;
+				}
+			}
+		} finally {
+			synchronized(this) {
+				inSelect = false;
+			}
+		}
+	}
+
+	public Set<SelectionKey> selectedKeys() {
+		return selector.selectedKeys();
+	}
+
+	public Set<SelectionKey> keys() {
+		return selector.keys();
+	}
+
+	public synchronized SelectionKey register(SelectableChannel channel,int ops,Object att) throws ClosedChannelException {
+		update();
+		return channel.register(selector,ops,att);
+	}
+
+	public synchronized void update() {
+		if( inSelect ) {
+			inUpdate = true;
+			selector.wakeup();
+		}
+	}
+}
\ No newline at end of file
--- a/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java	Tue Oct 11 23:18:13 2016 -0600
+++ b/src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java	Wed Oct 12 14:37:56 2016 -0600
@@ -502,7 +502,7 @@
 		if(changed)
 		{
 			doUpdateKey();
-			_selectSet.wakeup();
+			_selectSet.getSelector().update();
 		}
 	}
 
@@ -528,7 +528,7 @@
 					{
 						try
 						{
-							_key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
+							_key = _selectSet.getSelector().register((SelectableChannel)getChannel(),_interestOps,this);
 						}
 						catch (Exception e)
 						{
--- a/src/org/eclipse/jetty/io/nio/SelectorManager.java	Tue Oct 11 23:18:13 2016 -0600
+++ b/src/org/eclipse/jetty/io/nio/SelectorManager.java	Wed Oct 12 14:37:56 2016 -0600
@@ -24,7 +24,6 @@
 import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
@@ -110,7 +109,6 @@
 		{
 			SelectSet set=sets[s];
 			set.addChange(channel);
-			set.wakeup();
 		}
 	}
 
@@ -234,7 +232,7 @@
 		private final int _setID;
 		private volatile long _now = System.currentTimeMillis();
 
-		private volatile Selector _selector;
+		private volatile SaneSelector _selector;
 
 		private volatile Thread _selecting;
 		private int _busySelects;
@@ -251,14 +249,14 @@
 			_idleTick = System.currentTimeMillis();
 
 			// create a selector;
-			_selector = Selector.open();
+			_selector = new SaneSelector();
 			_monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
 		}
 
 		private void addChange(SocketChannel channel)
 		{
 			try {
-				SelectionKey key = channel.register(_selector,SelectionKey.OP_READ,null);
+				SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null);
 				SelectChannelEndPoint endpoint = createEndPoint(channel,key);
 				key.attach(endpoint);
 				endpoint.schedule();
@@ -282,19 +280,21 @@
 			try
 			{
 				_selecting=Thread.currentThread();
-				final Selector selector=_selector;
+				final SaneSelector selector = _selector;
 				// Stopped concurrently ?
 				if (selector == null)
 					return;
 
 				// Do and instant select to see if any connections can be handled.
-				int selected=selector.selectNow();
+//				int selected = selector.selectNow();
+				int selected = selector.select();
 
 				_now = System.currentTimeMillis();
-
+/*
 				// if no immediate things to do
 				if (selected==0 && selector.selectedKeys().isEmpty())
 				{
+
 					// If we are in pausing mode
 					if (_pausing)
 					{
@@ -317,6 +317,7 @@
 					{
 						long before = _now;
 						selector.select(wait);
+//						selector.select(10000L);
 						_now = System.currentTimeMillis();
 
 						// If we are monitoring for busy selector
@@ -340,7 +341,7 @@
 						}
 					}
 				}
-
+*/
 				// have we been destroyed while sleeping
 				if (_selector==null || !selector.isOpen())
 					return;
@@ -438,7 +439,7 @@
 				selector.selectedKeys().clear();
 
 				_now = System.currentTimeMillis();
-
+/*
 				// Idle tick
 				if (_now-_idleTick>__IDLE_TICK)
 				{
@@ -461,7 +462,7 @@
 					});
 
 				}
-
+*/
 				// Reset busy select monitor counts
 				if (__MONITOR_PERIOD>0 && _now>_monitorNext)
 				{
@@ -497,14 +498,14 @@
 		{
 			return _now;
 		}
-
+/*
 		public void wakeup()
 		{
-			Selector selector = _selector;
+			SaneSelector selector = _selector;
 			if (selector!=null)
 				selector.wakeup();
 		}
-
+*/
 		private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
 		{
 			SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,this,sKey, _maxIdleTime);
@@ -521,7 +522,7 @@
 			endp.getConnection().onClose();
 		}
 
-		Selector getSelector()
+		SaneSelector getSelector()
 		{
 			return _selector;
 		}
@@ -530,11 +531,12 @@
 		{
 			// Spin for a while waiting for selector to complete
 			// to avoid unneccessary closed channel exceptions
+/*
 			try
 			{
 				for (int i=0;i<100 && _selecting!=null;i++)
 				{
-					wakeup();
+					_selector.wakeup();
 					Thread.sleep(10);
 				}
 			}
@@ -542,11 +544,11 @@
 			{
 				LOG.warn("",e);
 			}
-
+*/
 			// close endpoints and selector
 			synchronized (this)
 			{
-				Selector selector=_selector;
+				SaneSelector selector=_selector;
 				for (SelectionKey key:selector.keys())
 				{
 					if (key==null)
@@ -593,7 +595,7 @@
 
 		public String toString()
 		{
-			Selector selector=_selector;
+			SaneSelector selector=_selector;
 			return String.format("%s keys=%d selected=%d",
 					super.toString(),
 					selector != null && selector.isOpen() ? selector.keys().size() : -1,
--- a/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java	Tue Oct 11 23:18:13 2016 -0600
+++ b/src/org/eclipse/jetty/server/nio/SelectChannelConnector.java	Wed Oct 12 14:37:56 2016 -0600
@@ -72,11 +72,13 @@
 
 		if (server!=null && server.isOpen() && _manager.isStarted())
 		{
-			SocketChannel channel = server.accept();
+			final SocketChannel channel = server.accept();
 			channel.configureBlocking(false);
 			Socket socket = channel.socket();
 			configure(socket);
-			_manager.register(channel);
+			this.server.threadPool.execute(new Runnable(){public void run(){
+				_manager.register(channel);
+			}});
 		}
 	}