Mercurial Hosting > luan
view src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 958:fc521d2f098e
simplify SelectorManager
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 13 Oct 2016 18:48:20 -0600 |
parents | 1094975d013b |
children | 7b94f5b33c64 |
line wrap: on
line source
// // ======================================================================== // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. // // The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php // // You may elect to redistribute this code under either of these licenses. // ======================================================================== // package org.eclipse.jetty.io.nio; import java.io.IOException; import java.io.InterruptedIOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Locale; import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /* ------------------------------------------------------------ */ /** * An Endpoint that can be scheduled by {@link SelectorManager}. */ public final class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint { public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; private SelectionKey _key; private final Runnable _handler = new Runnable() { public void run() { handle(); } }; /** The desired value for {@link SelectionKey#interestOps()} */ private int _interestOps; /** * The connection instance is the handler for any IO activity on the endpoint. * There is a different type of connection for HTTP, AJP, WebSocket and * ProxyConnect. The connection may change for an SCEP as it is upgraded * from HTTP to proxy connect or websocket. */ private volatile AsyncConnection _connection; private static final int STATE_NEEDS_DISPATCH = -1; private static final int STATE_UNDISPATCHED = 0; private static final int STATE_DISPATCHED = 1; private int _state; /** true if the last write operation succeed and wrote all offered bytes */ private volatile boolean _writable = true; /** True if a thread has is blocked in {@link #blockReadable(long)} */ private boolean _readBlocked; /** True if a thread has is blocked in {@link #blockWritable(long)} */ private boolean _writeBlocked; /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ private boolean _open; private boolean _ishut; public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) throws IOException { super(channel, maxIdleTime); _manager = selectSet.getManager(); _selectSet = selectSet; _state = STATE_UNDISPATCHED; _open = true; _key = key; } @Override public Connection getConnection() { return _connection; } @Override public void setConnection(Connection connection) { _connection = (AsyncConnection)connection; } /* ------------------------------------------------------------ */ /** Called by selectSet to schedule handling * */ public synchronized void schedule() { // If there is no key, then do nothing if (_key == null || !_key.isValid()) { _readBlocked = false; _writeBlocked = false; this.notifyAll(); return; } // If there are threads dispatched reading and writing if (_readBlocked || _writeBlocked) { // assert _dispatched; if (_readBlocked && _key.isReadable()) _readBlocked=false; if (_writeBlocked && _key.isWritable()) _writeBlocked = false; // wake them up is as good as a dispatched. this.notifyAll(); // we are not interested in further selecting _key.interestOps(0); if (_state<STATE_DISPATCHED) updateKey(); return; } // Remove writeable op if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { // Remove writeable op _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; _key.interestOps(_interestOps); _writable = true; // Once writable is in ops, only removed with dispatch. } // If dispatched, then deregister interest if (_state>=STATE_DISPATCHED) _key.interestOps(0); else { // other wise do the dispatch dispatch(); } } @Override public synchronized void dispatch() { if (_state<=STATE_UNDISPATCHED) { _state = STATE_DISPATCHED; try { _manager.execute(_handler); } catch(RejectedExecutionException e) { _state = STATE_NEEDS_DISPATCH; LOG.warn("Dispatched Failed! "+this+" to "+_manager); updateKey(); } } } /* ------------------------------------------------------------ */ /** * Called when a dispatched thread is no longer handling the endpoint. * The selection key operations are updated. * @return If false is returned, the endpoint has been redispatched and * thread must keep handling the endpoint. */ private synchronized void undispatch() { _state = STATE_UNDISPATCHED; updateKey(); } @Override public int fill(Buffer buffer) throws IOException { int fill=super.fill(buffer); return fill; } @Override public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException { int l = super.flush(header, buffer, trailer); // If there was something to write and it wasn't written, then we are not writable. if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) { synchronized (this) { _writable = false; if (_state<STATE_DISPATCHED) updateKey(); } } else if (l>0) { _writable = true; } return l; } @Override public int flush(Buffer buffer) throws IOException { int l = super.flush(buffer); // If there was something to write and it wasn't written, then we are not writable. if (l==0 && buffer!=null && buffer.hasContent()) { synchronized (this) { _writable = false; if (_state<STATE_DISPATCHED) updateKey(); } } else if (l>0) { _writable = true; } return l; } /* ------------------------------------------------------------ */ /* * Allows thread to block waiting for further events. */ @Override public boolean blockReadable(long timeoutMs) throws IOException { synchronized (this) { if (isInputShutdown()) throw new EofException(); long now = _selectSet.getNow(); long end = now+timeoutMs; try { _readBlocked = true; while (!isInputShutdown() && _readBlocked) { try { updateKey(); this.wait(timeoutMs>0?(end-now):10000); } catch (final InterruptedException e) { LOG.warn("",e); } finally { now=_selectSet.getNow(); } if (_readBlocked && timeoutMs>0 && now>=end) return false; } } finally { _readBlocked = false; } } return true; } /* ------------------------------------------------------------ */ /* * Allows thread to block waiting for further events. */ @Override public boolean blockWritable(long timeoutMs) throws IOException { synchronized (this) { if (isOutputShutdown()) throw new EofException(); long now=_selectSet.getNow(); long end=now+timeoutMs; try { _writeBlocked = true; while (_writeBlocked && !isOutputShutdown()) { try { updateKey(); this.wait(timeoutMs>0?(end-now):10000); } catch (final InterruptedException e) { LOG.warn("",e); } finally { now=_selectSet.getNow(); } if (_writeBlocked && timeoutMs>0 && now>=end) return false; } } finally { _writeBlocked = false; } } return true; } @Override public boolean hasProgressed() { return false; } /* ------------------------------------------------------------ */ /** * Updates selection key. Adds operations types to the selection key as needed. No operations * are removed as this is only done during dispatch. This method records the new key and * schedules a call to doUpdateKey to do the keyChange */ private void updateKey() { final boolean changed; synchronized (this) { int current_ops=-1; if (getChannel().isOpen()) { boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); _interestOps = ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); try { current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); } catch(Exception e) { _key = null; LOG.trace("",e); } } changed = _interestOps!=current_ops; } if(changed) { doUpdateKey(); _selectSet.getSelector().update(); } } /* ------------------------------------------------------------ */ /** * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey */ synchronized void doUpdateKey() { if (getChannel().isOpen()) { if (_interestOps>0) { if (_key==null || !_key.isValid()) { SelectableChannel sc = (SelectableChannel)getChannel(); if (sc.isRegistered()) { updateKey(); } else { try { _key = _selectSet.getSelector().register(sc,_interestOps,this); } catch (Exception e) { LOG.trace("",e); if (_key!=null && _key.isValid()) { _key.cancel(); } if (_open) { _selectSet.destroyEndPoint(this); } _open = false; _key = null; } } } else { _key.interestOps(_interestOps); } } else { if (_key!=null && _key.isValid()) _key.interestOps(0); else _key = null; } } else { if (_key!=null && _key.isValid()) _key.cancel(); if (_open) { _open = false; _selectSet.destroyEndPoint(this); } _key = null; } } private void handle() { boolean dispatched = true; try { try { while(true) { final AsyncConnection next = (AsyncConnection)_connection.handle(); if (next!=_connection) { LOG.debug("{} replaced {}",next,_connection); _connection=next; continue; } break; } } catch (ClosedChannelException e) { LOG.trace("",e); } catch (EofException e) { LOG.debug("EOF", e); try{close();} catch(IOException e2){LOG.trace("",e2);} } catch (IOException e) { LOG.warn(e.toString()); try{close();} catch(IOException e2){LOG.trace("",e2);} } catch (Throwable e) { LOG.warn("handle failed", e); try{close();} catch(IOException e2){LOG.trace("",e2);} } finally { if (!_ishut && isInputShutdown() && isOpen()) { _ishut = true; try { _connection.onInputShutdown(); } catch(Throwable x) { LOG.warn("onInputShutdown failed", x); try{close();} catch(IOException e2){LOG.trace("",e2);} } finally { updateKey(); } } undispatch(); dispatched = false; } } finally { if (dispatched) { undispatch(); } } } /* ------------------------------------------------------------ */ /* * @see org.eclipse.io.nio.ChannelEndPoint#close() */ @Override public void close() throws IOException { try { super.close(); } catch (IOException e) { LOG.trace("",e); } finally { updateKey(); } } @Override public String toString() { // Do NOT use synchronized (this) // because it's very easy to deadlock when debugging is enabled. // We do a best effort to print the right toString() and that's it. SelectionKey key = _key; String keyString = ""; if (key != null) { if (key.isValid()) { if (key.isReadable()) keyString += "r"; if (key.isWritable()) keyString += "w"; } else { keyString += "!"; } } else { keyString += "-"; } return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}", hashCode(), _socket.getRemoteSocketAddress(), _socket.getLocalSocketAddress(), _state, isOpen(), isInputShutdown(), isOutputShutdown(), _readBlocked, _writeBlocked, _writable, _interestOps, keyString, _connection); } /* ------------------------------------------------------------ */ /** * Don't set the SoTimeout * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) */ @Override public void setMaxIdleTime(int timeMs) throws IOException { _maxIdleTime = timeMs; } }