Mercurial Hosting > luan
changeset 840:0f53601ea489
remove ConcurrentHashSet
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Mon, 19 Sep 2016 14:20:18 -0600 |
parents | b8d717b228c6 |
children | f4d7e9fd3f67 |
files | src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java src/org/eclipse/jetty/util/ConcurrentHashSet.java |
diffstat | 2 files changed, 274 insertions(+), 400 deletions(-) [+] |
line wrap: on
line diff
--- a/src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java Fri Sep 16 13:32:45 2016 -0600 +++ b/src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java Mon Sep 19 14:20:18 2016 -0600 @@ -26,6 +26,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.io.Buffer; @@ -36,7 +37,6 @@ import org.eclipse.jetty.io.nio.ChannelEndPoint; import org.eclipse.jetty.server.BlockingHttpConnection; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,312 +55,312 @@ */ public class BlockingChannelConnector extends AbstractNIOConnector { - private static final Logger LOG = LoggerFactory.getLogger(BlockingChannelConnector.class); + private static final Logger LOG = LoggerFactory.getLogger(BlockingChannelConnector.class); - private transient ServerSocketChannel _acceptChannel; - private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>(); + private transient ServerSocketChannel _acceptChannel; + private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashMap<BlockingChannelEndPoint,Boolean>().newKeySet(); - /* ------------------------------------------------------------ */ - /** Constructor. - * - */ - public BlockingChannelConnector() - { - } + /* ------------------------------------------------------------ */ + /** Constructor. + * + */ + public BlockingChannelConnector() + { + } - /* ------------------------------------------------------------ */ - public Object getConnection() - { - return _acceptChannel; - } + /* ------------------------------------------------------------ */ + public Object getConnection() + { + return _acceptChannel; + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.server.AbstractConnector#doStart() - */ - @Override - protected void doStart() throws Exception - { - super.doStart(); - getThreadPool().dispatch(new Runnable() - { + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.server.AbstractConnector#doStart() + */ + @Override + protected void doStart() throws Exception + { + super.doStart(); + getThreadPool().dispatch(new Runnable() + { - public void run() - { - while (isRunning()) - { - try - { - Thread.sleep(400); - long now=System.currentTimeMillis(); - for (BlockingChannelEndPoint endp : _endpoints) - { - endp.checkIdleTimestamp(now); - } - } - catch(InterruptedException e) - { - LOG.trace("",e); - } - catch(Exception e) - { - LOG.warn("",e); - } - } - } + public void run() + { + while (isRunning()) + { + try + { + Thread.sleep(400); + long now=System.currentTimeMillis(); + for (BlockingChannelEndPoint endp : _endpoints) + { + endp.checkIdleTimestamp(now); + } + } + catch(InterruptedException e) + { + LOG.trace("",e); + } + catch(Exception e) + { + LOG.warn("",e); + } + } + } - }); + }); - } + } - /* ------------------------------------------------------------ */ - public void open() throws IOException - { - // Create a new server socket and set to non blocking mode - _acceptChannel= ServerSocketChannel.open(); - _acceptChannel.configureBlocking(true); + /* ------------------------------------------------------------ */ + public void open() throws IOException + { + // Create a new server socket and set to non blocking mode + _acceptChannel= ServerSocketChannel.open(); + _acceptChannel.configureBlocking(true); - // Bind the server socket to the local host and port - InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); - _acceptChannel.socket().bind(addr,getAcceptQueueSize()); - } + // Bind the server socket to the local host and port + InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); + _acceptChannel.socket().bind(addr,getAcceptQueueSize()); + } - /* ------------------------------------------------------------ */ - public void close() throws IOException - { - if (_acceptChannel != null) - _acceptChannel.close(); - _acceptChannel=null; - } + /* ------------------------------------------------------------ */ + public void close() throws IOException + { + if (_acceptChannel != null) + _acceptChannel.close(); + _acceptChannel=null; + } - /* ------------------------------------------------------------ */ - @Override - public void accept(int acceptorID) - throws IOException, InterruptedException - { - SocketChannel channel = _acceptChannel.accept(); - channel.configureBlocking(true); - Socket socket=channel.socket(); - configure(socket); + /* ------------------------------------------------------------ */ + @Override + public void accept(int acceptorID) + throws IOException, InterruptedException + { + SocketChannel channel = _acceptChannel.accept(); + channel.configureBlocking(true); + Socket socket=channel.socket(); + configure(socket); - BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel); - connection.dispatch(); - } + BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel); + connection.dispatch(); + } - /* ------------------------------------------------------------------------------- */ - @Override - public void customize(EndPoint endpoint, Request request) - throws IOException - { - super.customize(endpoint, request); - endpoint.setMaxIdleTime(_maxIdleTime); - configure(((SocketChannel)endpoint.getTransport()).socket()); - } + /* ------------------------------------------------------------------------------- */ + @Override + public void customize(EndPoint endpoint, Request request) + throws IOException + { + super.customize(endpoint, request); + endpoint.setMaxIdleTime(_maxIdleTime); + configure(((SocketChannel)endpoint.getTransport()).socket()); + } - /* ------------------------------------------------------------------------------- */ - public int getLocalPort() - { - if (_acceptChannel==null || !_acceptChannel.isOpen()) - return -1; - return _acceptChannel.socket().getLocalPort(); - } + /* ------------------------------------------------------------------------------- */ + public int getLocalPort() + { + if (_acceptChannel==null || !_acceptChannel.isOpen()) + return -1; + return _acceptChannel.socket().getLocalPort(); + } - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - /* ------------------------------------------------------------------------------- */ - private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint - { - private Connection _connection; - private int _timeout; - private volatile long _idleTimestamp; + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + /* ------------------------------------------------------------------------------- */ + private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint + { + private Connection _connection; + private int _timeout; + private volatile long _idleTimestamp; - BlockingChannelEndPoint(ByteChannel channel) - throws IOException - { - super(channel,BlockingChannelConnector.this._maxIdleTime); - _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer()); - } + BlockingChannelEndPoint(ByteChannel channel) + throws IOException + { + super(channel,BlockingChannelConnector.this._maxIdleTime); + _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer()); + } - /* ------------------------------------------------------------ */ - /** Get the connection. - * @return the connection - */ - public Connection getConnection() - { - return _connection; - } + /* ------------------------------------------------------------ */ + /** Get the connection. + * @return the connection + */ + public Connection getConnection() + { + return _connection; + } - /* ------------------------------------------------------------ */ - public void setConnection(Connection connection) - { - _connection=connection; - } + /* ------------------------------------------------------------ */ + public void setConnection(Connection connection) + { + _connection=connection; + } - /* ------------------------------------------------------------ */ - public void checkIdleTimestamp(long now) - { - if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout)) - { - idleExpired(); - } - } + /* ------------------------------------------------------------ */ + public void checkIdleTimestamp(long now) + { + if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout)) + { + idleExpired(); + } + } - /* ------------------------------------------------------------ */ - protected void idleExpired() - { - try - { - super.close(); - } - catch (IOException e) - { - LOG.trace("",e); - } - } + /* ------------------------------------------------------------ */ + protected void idleExpired() + { + try + { + super.close(); + } + catch (IOException e) + { + LOG.trace("",e); + } + } - /* ------------------------------------------------------------ */ - void dispatch() throws IOException - { - if (!getThreadPool().dispatch(this)) - { - LOG.warn("dispatch failed for {}",_connection); - super.close(); - } - } + /* ------------------------------------------------------------ */ + void dispatch() throws IOException + { + if (!getThreadPool().dispatch(this)) + { + LOG.warn("dispatch failed for {}",_connection); + super.close(); + } + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer) - */ - @Override - public int fill(Buffer buffer) throws IOException - { - _idleTimestamp=System.currentTimeMillis(); - return super.fill(buffer); - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer) + */ + @Override + public int fill(Buffer buffer) throws IOException + { + _idleTimestamp=System.currentTimeMillis(); + return super.fill(buffer); + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer) - */ - @Override - public int flush(Buffer buffer) throws IOException - { - _idleTimestamp=System.currentTimeMillis(); - return super.flush(buffer); - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer) + */ + @Override + public int flush(Buffer buffer) throws IOException + { + _idleTimestamp=System.currentTimeMillis(); + return super.flush(buffer); + } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer) - */ - @Override - public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException - { - _idleTimestamp=System.currentTimeMillis(); - return super.flush(header,buffer,trailer); - } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer) + */ + @Override + public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException + { + _idleTimestamp=System.currentTimeMillis(); + return super.flush(header,buffer,trailer); + } - /* ------------------------------------------------------------ */ - public void run() - { - try - { - _timeout=getMaxIdleTime(); - connectionOpened(_connection); - _endpoints.add(this); + /* ------------------------------------------------------------ */ + public void run() + { + try + { + _timeout=getMaxIdleTime(); + connectionOpened(_connection); + _endpoints.add(this); - while (isOpen()) - { - _idleTimestamp=System.currentTimeMillis(); - if (_connection.isIdle()) - { - if (getServer().getThreadPool().isLowOnThreads()) - { - int lrmit = getLowResourcesMaxIdleTime(); - if (lrmit>=0 && _timeout!= lrmit) - { - _timeout=lrmit; - } - } - } - else - { - if (_timeout!=getMaxIdleTime()) - { - _timeout=getMaxIdleTime(); - } - } + while (isOpen()) + { + _idleTimestamp=System.currentTimeMillis(); + if (_connection.isIdle()) + { + if (getServer().getThreadPool().isLowOnThreads()) + { + int lrmit = getLowResourcesMaxIdleTime(); + if (lrmit>=0 && _timeout!= lrmit) + { + _timeout=lrmit; + } + } + } + else + { + if (_timeout!=getMaxIdleTime()) + { + _timeout=getMaxIdleTime(); + } + } - _connection = _connection.handle(); + _connection = _connection.handle(); - } - } - catch (EofException e) - { - LOG.debug("EOF", e); - try{BlockingChannelEndPoint.this.close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch (HttpException e) - { - LOG.debug("BAD", e); - try{super.close();} - catch(IOException e2){LOG.trace("",e2);} - } - catch(Throwable e) - { - LOG.warn("handle failed",e); - try{super.close();} - catch(IOException e2){LOG.trace("",e2);} - } - finally - { - connectionClosed(_connection); - _endpoints.remove(this); + } + } + catch (EofException e) + { + LOG.debug("EOF", e); + try{BlockingChannelEndPoint.this.close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch (HttpException e) + { + LOG.debug("BAD", e); + try{super.close();} + catch(IOException e2){LOG.trace("",e2);} + } + catch(Throwable e) + { + LOG.warn("handle failed",e); + try{super.close();} + catch(IOException e2){LOG.trace("",e2);} + } + finally + { + connectionClosed(_connection); + _endpoints.remove(this); - // wait for client to close, but if not, close ourselves. - try - { - if (!_socket.isClosed()) - { - long timestamp=System.currentTimeMillis(); - int max_idle=getMaxIdleTime(); + // wait for client to close, but if not, close ourselves. + try + { + if (!_socket.isClosed()) + { + long timestamp=System.currentTimeMillis(); + int max_idle=getMaxIdleTime(); - _socket.setSoTimeout(getMaxIdleTime()); - int c=0; - do - { - c = _socket.getInputStream().read(); - } - while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); - if (!_socket.isClosed()) - _socket.close(); - } - } - catch(IOException e) - { - LOG.trace("",e); - } - } - } + _socket.setSoTimeout(getMaxIdleTime()); + int c=0; + do + { + c = _socket.getInputStream().read(); + } + while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); + if (!_socket.isClosed()) + _socket.close(); + } + } + catch(IOException e) + { + LOG.trace("",e); + } + } + } - /* ------------------------------------------------------------ */ - @Override - public String toString() - { - return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}", - hashCode(), - _socket.getRemoteSocketAddress(), - _socket.getLocalSocketAddress(), - isOpen(), - isInputShutdown(), - isOutputShutdown(), - _connection); - } + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}", + hashCode(), + _socket.getRemoteSocketAddress(), + _socket.getLocalSocketAddress(), + isOpen(), + isInputShutdown(), + isOutputShutdown(), + _connection); + } - } + } }
--- a/src/org/eclipse/jetty/util/ConcurrentHashSet.java Fri Sep 16 13:32:45 2016 -0600 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,126 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util; - -import java.util.AbstractSet; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -public class ConcurrentHashSet<E> extends AbstractSet<E> implements Set<E> -{ - private final Map<E, Boolean> _map = new ConcurrentHashMap<E, Boolean>(); - private transient Set<E> _keys = _map.keySet(); - - public ConcurrentHashSet() - { - } - - @Override - public boolean add(E e) - { - return _map.put(e,Boolean.TRUE) == null; - } - - @Override - public void clear() - { - _map.clear(); - } - - @Override - public boolean contains(Object o) - { - return _map.containsKey(o); - } - - @Override - public boolean containsAll(Collection<?> c) - { - return _keys.containsAll(c); - } - - @Override - public boolean equals(Object o) - { - return o == this || _keys.equals(o); - } - - @Override - public int hashCode() - { - return _keys.hashCode(); - } - - @Override - public boolean isEmpty() - { - return _map.isEmpty(); - } - - @Override - public Iterator<E> iterator() - { - return _keys.iterator(); - } - - @Override - public boolean remove(Object o) - { - return _map.remove(o) != null; - } - - @Override - public boolean removeAll(Collection<?> c) - { - return _keys.removeAll(c); - } - - @Override - public boolean retainAll(Collection<?> c) - { - return _keys.retainAll(c); - } - - @Override - public int size() - { - return _map.size(); - } - - @Override - public Object[] toArray() - { - return _keys.toArray(); - } - - @Override - public <T> T[] toArray(T[] a) - { - return _keys.toArray(a); - } - - @Override - public String toString() - { - return _keys.toString(); - } -}