changeset 840:0f53601ea489

remove ConcurrentHashSet
author Franklin Schmidt <fschmidt@gmail.com>
date Mon, 19 Sep 2016 14:20:18 -0600 (2016-09-19)
parents b8d717b228c6
children f4d7e9fd3f67
files src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java src/org/eclipse/jetty/util/ConcurrentHashSet.java
diffstat 2 files changed, 274 insertions(+), 400 deletions(-) [+]
line wrap: on
line diff
--- a/src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java	Fri Sep 16 13:32:45 2016 -0600
+++ b/src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java	Mon Sep 19 14:20:18 2016 -0600
@@ -26,6 +26,7 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.eclipse.jetty.http.HttpException;
 import org.eclipse.jetty.io.Buffer;
@@ -36,7 +37,6 @@
 import org.eclipse.jetty.io.nio.ChannelEndPoint;
 import org.eclipse.jetty.server.BlockingHttpConnection;
 import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,312 +55,312 @@
  */
 public class BlockingChannelConnector extends AbstractNIOConnector
 {
-    private static final Logger LOG = LoggerFactory.getLogger(BlockingChannelConnector.class);
+	private static final Logger LOG = LoggerFactory.getLogger(BlockingChannelConnector.class);
 
-    private transient ServerSocketChannel _acceptChannel;
-    private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
+	private transient ServerSocketChannel _acceptChannel;
+	private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashMap<BlockingChannelEndPoint,Boolean>().newKeySet();
 
 
-    /* ------------------------------------------------------------ */
-    /** Constructor.
-     *
-     */
-    public BlockingChannelConnector()
-    {
-    }
+	/* ------------------------------------------------------------ */
+	/** Constructor.
+	 *
+	 */
+	public BlockingChannelConnector()
+	{
+	}
 
-    /* ------------------------------------------------------------ */
-    public Object getConnection()
-    {
-        return _acceptChannel;
-    }
+	/* ------------------------------------------------------------ */
+	public Object getConnection()
+	{
+		return _acceptChannel;
+	}
 
-    /* ------------------------------------------------------------ */
-    /**
-     * @see org.eclipse.jetty.server.AbstractConnector#doStart()
-     */
-    @Override
-    protected void doStart() throws Exception
-    {
-        super.doStart();
-        getThreadPool().dispatch(new Runnable()
-        {
+	/* ------------------------------------------------------------ */
+	/**
+	 * @see org.eclipse.jetty.server.AbstractConnector#doStart()
+	 */
+	@Override
+	protected void doStart() throws Exception
+	{
+		super.doStart();
+		getThreadPool().dispatch(new Runnable()
+		{
 
-            public void run()
-            {
-                while (isRunning())
-                {
-                    try
-                    {
-                        Thread.sleep(400);
-                        long now=System.currentTimeMillis();
-                        for (BlockingChannelEndPoint endp : _endpoints)
-                        {
-                            endp.checkIdleTimestamp(now);
-                        }
-                    }
-                    catch(InterruptedException e)
-                    {
-                        LOG.trace("",e);
-                    }
-                    catch(Exception e)
-                    {
-                        LOG.warn("",e);
-                    }
-                }
-            }
+			public void run()
+			{
+				while (isRunning())
+				{
+					try
+					{
+						Thread.sleep(400);
+						long now=System.currentTimeMillis();
+						for (BlockingChannelEndPoint endp : _endpoints)
+						{
+							endp.checkIdleTimestamp(now);
+						}
+					}
+					catch(InterruptedException e)
+					{
+						LOG.trace("",e);
+					}
+					catch(Exception e)
+					{
+						LOG.warn("",e);
+					}
+				}
+			}
 
-        });
+		});
 
-    }
+	}
 
 
-    /* ------------------------------------------------------------ */
-    public void open() throws IOException
-    {
-        // Create a new server socket and set to non blocking mode
-        _acceptChannel= ServerSocketChannel.open();
-        _acceptChannel.configureBlocking(true);
+	/* ------------------------------------------------------------ */
+	public void open() throws IOException
+	{
+		// Create a new server socket and set to non blocking mode
+		_acceptChannel= ServerSocketChannel.open();
+		_acceptChannel.configureBlocking(true);
 
-        // Bind the server socket to the local host and port
-        InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
-        _acceptChannel.socket().bind(addr,getAcceptQueueSize());
-    }
+		// Bind the server socket to the local host and port
+		InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
+		_acceptChannel.socket().bind(addr,getAcceptQueueSize());
+	}
 
-    /* ------------------------------------------------------------ */
-    public void close() throws IOException
-    {
-        if (_acceptChannel != null)
-            _acceptChannel.close();
-        _acceptChannel=null;
-    }
+	/* ------------------------------------------------------------ */
+	public void close() throws IOException
+	{
+		if (_acceptChannel != null)
+			_acceptChannel.close();
+		_acceptChannel=null;
+	}
 
-    /* ------------------------------------------------------------ */
-    @Override
-    public void accept(int acceptorID)
-    	throws IOException, InterruptedException
-    {
-        SocketChannel channel = _acceptChannel.accept();
-        channel.configureBlocking(true);
-        Socket socket=channel.socket();
-        configure(socket);
+	/* ------------------------------------------------------------ */
+	@Override
+	public void accept(int acceptorID)
+		throws IOException, InterruptedException
+	{
+		SocketChannel channel = _acceptChannel.accept();
+		channel.configureBlocking(true);
+		Socket socket=channel.socket();
+		configure(socket);
 
-        BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
-        connection.dispatch();
-    }
+		BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
+		connection.dispatch();
+	}
 
-    /* ------------------------------------------------------------------------------- */
-    @Override
-    public void customize(EndPoint endpoint, Request request)
-        throws IOException
-    {
-        super.customize(endpoint, request);
-        endpoint.setMaxIdleTime(_maxIdleTime);
-        configure(((SocketChannel)endpoint.getTransport()).socket());
-    }
+	/* ------------------------------------------------------------------------------- */
+	@Override
+	public void customize(EndPoint endpoint, Request request)
+		throws IOException
+	{
+		super.customize(endpoint, request);
+		endpoint.setMaxIdleTime(_maxIdleTime);
+		configure(((SocketChannel)endpoint.getTransport()).socket());
+	}
 
 
-    /* ------------------------------------------------------------------------------- */
-    public int getLocalPort()
-    {
-        if (_acceptChannel==null || !_acceptChannel.isOpen())
-            return -1;
-        return _acceptChannel.socket().getLocalPort();
-    }
+	/* ------------------------------------------------------------------------------- */
+	public int getLocalPort()
+	{
+		if (_acceptChannel==null || !_acceptChannel.isOpen())
+			return -1;
+		return _acceptChannel.socket().getLocalPort();
+	}
 
-    /* ------------------------------------------------------------------------------- */
-    /* ------------------------------------------------------------------------------- */
-    /* ------------------------------------------------------------------------------- */
-    private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
-    {
-        private Connection _connection;
-        private int _timeout;
-        private volatile long _idleTimestamp;
+	/* ------------------------------------------------------------------------------- */
+	/* ------------------------------------------------------------------------------- */
+	/* ------------------------------------------------------------------------------- */
+	private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
+	{
+		private Connection _connection;
+		private int _timeout;
+		private volatile long _idleTimestamp;
 
-        BlockingChannelEndPoint(ByteChannel channel)
-            throws IOException
-        {
-            super(channel,BlockingChannelConnector.this._maxIdleTime);
-            _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
-        }
+		BlockingChannelEndPoint(ByteChannel channel)
+			throws IOException
+		{
+			super(channel,BlockingChannelConnector.this._maxIdleTime);
+			_connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
+		}
 
-        /* ------------------------------------------------------------ */
-        /** Get the connection.
-         * @return the connection
-         */
-        public Connection getConnection()
-        {
-            return _connection;
-        }
+		/* ------------------------------------------------------------ */
+		/** Get the connection.
+		 * @return the connection
+		 */
+		public Connection getConnection()
+		{
+			return _connection;
+		}
 
-        /* ------------------------------------------------------------ */
-        public void setConnection(Connection connection)
-        {
-            _connection=connection;
-        }
+		/* ------------------------------------------------------------ */
+		public void setConnection(Connection connection)
+		{
+			_connection=connection;
+		}
 
-        /* ------------------------------------------------------------ */
-        public void checkIdleTimestamp(long now)
-        {
-            if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
-            {
-                idleExpired();
-            }
-        }
+		/* ------------------------------------------------------------ */
+		public void checkIdleTimestamp(long now)
+		{
+			if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
+			{
+				idleExpired();
+			}
+		}
 
-        /* ------------------------------------------------------------ */
-        protected void idleExpired()
-        {
-            try
-            {
-                super.close();
-            }
-            catch (IOException e)
-            {
-                LOG.trace("",e);
-            }
-        }
+		/* ------------------------------------------------------------ */
+		protected void idleExpired()
+		{
+			try
+			{
+				super.close();
+			}
+			catch (IOException e)
+			{
+				LOG.trace("",e);
+			}
+		}
 
-        /* ------------------------------------------------------------ */
-        void dispatch() throws IOException
-        {
-            if (!getThreadPool().dispatch(this))
-            {
-                LOG.warn("dispatch failed for  {}",_connection);
-                super.close();
-            }
-        }
+		/* ------------------------------------------------------------ */
+		void dispatch() throws IOException
+		{
+			if (!getThreadPool().dispatch(this))
+			{
+				LOG.warn("dispatch failed for  {}",_connection);
+				super.close();
+			}
+		}
 
-        /* ------------------------------------------------------------ */
-        /**
-         * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
-         */
-        @Override
-        public int fill(Buffer buffer) throws IOException
-        {
-            _idleTimestamp=System.currentTimeMillis();
-            return super.fill(buffer);
-        }
+		/* ------------------------------------------------------------ */
+		/**
+		 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
+		 */
+		@Override
+		public int fill(Buffer buffer) throws IOException
+		{
+			_idleTimestamp=System.currentTimeMillis();
+			return super.fill(buffer);
+		}
 
-        /* ------------------------------------------------------------ */
-        /**
-         * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
-         */
-        @Override
-        public int flush(Buffer buffer) throws IOException
-        {
-            _idleTimestamp=System.currentTimeMillis();
-            return super.flush(buffer);
-        }
+		/* ------------------------------------------------------------ */
+		/**
+		 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
+		 */
+		@Override
+		public int flush(Buffer buffer) throws IOException
+		{
+			_idleTimestamp=System.currentTimeMillis();
+			return super.flush(buffer);
+		}
 
-        /* ------------------------------------------------------------ */
-        /**
-         * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
-         */
-        @Override
-        public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
-        {
-            _idleTimestamp=System.currentTimeMillis();
-            return super.flush(header,buffer,trailer);
-        }
+		/* ------------------------------------------------------------ */
+		/**
+		 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
+		 */
+		@Override
+		public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
+		{
+			_idleTimestamp=System.currentTimeMillis();
+			return super.flush(header,buffer,trailer);
+		}
 
-        /* ------------------------------------------------------------ */
-        public void run()
-        {
-            try
-            {
-                _timeout=getMaxIdleTime();
-                connectionOpened(_connection);
-                _endpoints.add(this);
+		/* ------------------------------------------------------------ */
+		public void run()
+		{
+			try
+			{
+				_timeout=getMaxIdleTime();
+				connectionOpened(_connection);
+				_endpoints.add(this);
 
-                while (isOpen())
-                {
-                    _idleTimestamp=System.currentTimeMillis();
-                    if (_connection.isIdle())
-                    {
-                        if (getServer().getThreadPool().isLowOnThreads())
-                        {
-                            int lrmit = getLowResourcesMaxIdleTime();
-                            if (lrmit>=0 && _timeout!= lrmit)
-                            {
-                                _timeout=lrmit;
-                            }
-                        }
-                    }
-                    else
-                    {
-                        if (_timeout!=getMaxIdleTime())
-                        {
-                            _timeout=getMaxIdleTime();
-                        }
-                    }
+				while (isOpen())
+				{
+					_idleTimestamp=System.currentTimeMillis();
+					if (_connection.isIdle())
+					{
+						if (getServer().getThreadPool().isLowOnThreads())
+						{
+							int lrmit = getLowResourcesMaxIdleTime();
+							if (lrmit>=0 && _timeout!= lrmit)
+							{
+								_timeout=lrmit;
+							}
+						}
+					}
+					else
+					{
+						if (_timeout!=getMaxIdleTime())
+						{
+							_timeout=getMaxIdleTime();
+						}
+					}
 
-                    _connection = _connection.handle();
+					_connection = _connection.handle();
 
-                }
-            }
-            catch (EofException e)
-            {
-                LOG.debug("EOF", e);
-                try{BlockingChannelEndPoint.this.close();}
-                catch(IOException e2){LOG.trace("",e2);}
-            }
-            catch (HttpException e)
-            {
-                LOG.debug("BAD", e);
-                try{super.close();}
-                catch(IOException e2){LOG.trace("",e2);}
-            }
-            catch(Throwable e)
-            {
-                LOG.warn("handle failed",e);
-                try{super.close();}
-                catch(IOException e2){LOG.trace("",e2);}
-            }
-            finally
-            {
-                connectionClosed(_connection);
-                _endpoints.remove(this);
+				}
+			}
+			catch (EofException e)
+			{
+				LOG.debug("EOF", e);
+				try{BlockingChannelEndPoint.this.close();}
+				catch(IOException e2){LOG.trace("",e2);}
+			}
+			catch (HttpException e)
+			{
+				LOG.debug("BAD", e);
+				try{super.close();}
+				catch(IOException e2){LOG.trace("",e2);}
+			}
+			catch(Throwable e)
+			{
+				LOG.warn("handle failed",e);
+				try{super.close();}
+				catch(IOException e2){LOG.trace("",e2);}
+			}
+			finally
+			{
+				connectionClosed(_connection);
+				_endpoints.remove(this);
 
-                // wait for client to close, but if not, close ourselves.
-                try
-                {
-                    if (!_socket.isClosed())
-                    {
-                        long timestamp=System.currentTimeMillis();
-                        int max_idle=getMaxIdleTime();
+				// wait for client to close, but if not, close ourselves.
+				try
+				{
+					if (!_socket.isClosed())
+					{
+						long timestamp=System.currentTimeMillis();
+						int max_idle=getMaxIdleTime();
 
-                        _socket.setSoTimeout(getMaxIdleTime());
-                        int c=0;
-                        do
-                        {
-                            c = _socket.getInputStream().read();
-                        }
-                        while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
-                        if (!_socket.isClosed())
-                            _socket.close();
-                    }
-                }
-                catch(IOException e)
-                {
-                    LOG.trace("",e);
-                }
-            }
-        }
+						_socket.setSoTimeout(getMaxIdleTime());
+						int c=0;
+						do
+						{
+							c = _socket.getInputStream().read();
+						}
+						while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
+						if (!_socket.isClosed())
+							_socket.close();
+					}
+				}
+				catch(IOException e)
+				{
+					LOG.trace("",e);
+				}
+			}
+		}
 
-        /* ------------------------------------------------------------ */
-        @Override
-        public String toString()
-        {
-            return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}",
-                    hashCode(),
-                    _socket.getRemoteSocketAddress(),
-                    _socket.getLocalSocketAddress(),
-                    isOpen(),
-                    isInputShutdown(),
-                    isOutputShutdown(),
-                    _connection);
-        }
+		/* ------------------------------------------------------------ */
+		@Override
+		public String toString()
+		{
+			return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}",
+					hashCode(),
+					_socket.getRemoteSocketAddress(),
+					_socket.getLocalSocketAddress(),
+					isOpen(),
+					isInputShutdown(),
+					isOutputShutdown(),
+					_connection);
+		}
 
-    }
+	}
 }
--- a/src/org/eclipse/jetty/util/ConcurrentHashSet.java	Fri Sep 16 13:32:45 2016 -0600
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,126 +0,0 @@
-//
-//  ========================================================================
-//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
-//  ------------------------------------------------------------------------
-//  All rights reserved. This program and the accompanying materials
-//  are made available under the terms of the Eclipse Public License v1.0
-//  and Apache License v2.0 which accompanies this distribution.
-//
-//      The Eclipse Public License is available at
-//      http://www.eclipse.org/legal/epl-v10.html
-//
-//      The Apache License v2.0 is available at
-//      http://www.opensource.org/licenses/apache2.0.php
-//
-//  You may elect to redistribute this code under either of these licenses.
-//  ========================================================================
-//
-
-package org.eclipse.jetty.util;
-
-import java.util.AbstractSet;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class ConcurrentHashSet<E> extends AbstractSet<E> implements Set<E>
-{
-    private final Map<E, Boolean> _map = new ConcurrentHashMap<E, Boolean>();
-    private transient Set<E> _keys = _map.keySet();
-
-    public ConcurrentHashSet()
-    {
-    }
-
-    @Override
-    public boolean add(E e)
-    {
-        return _map.put(e,Boolean.TRUE) == null;
-    }
-
-    @Override
-    public void clear()
-    {
-        _map.clear();
-    }
-
-    @Override
-    public boolean contains(Object o)
-    {
-        return _map.containsKey(o);
-    }
-
-    @Override
-    public boolean containsAll(Collection<?> c)
-    {
-        return _keys.containsAll(c);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        return o == this || _keys.equals(o);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return _keys.hashCode();
-    }
-
-    @Override
-    public boolean isEmpty()
-    {
-        return _map.isEmpty();
-    }
-
-    @Override
-    public Iterator<E> iterator()
-    {
-        return _keys.iterator();
-    }
-
-    @Override
-    public boolean remove(Object o)
-    {
-        return _map.remove(o) != null;
-    }
-
-    @Override
-    public boolean removeAll(Collection<?> c)
-    {
-        return _keys.removeAll(c);
-    }
-
-    @Override
-    public boolean retainAll(Collection<?> c)
-    {
-        return _keys.retainAll(c);
-    }
-
-    @Override
-    public int size()
-    {
-        return _map.size();
-    }
-
-    @Override
-    public Object[] toArray()
-    {
-        return _keys.toArray();
-    }
-
-    @Override
-    public <T> T[] toArray(T[] a)
-    {
-        return _keys.toArray(a);
-    }
-
-    @Override
-    public String toString()
-    {
-        return _keys.toString();
-    }
-}