Mercurial Hosting > luan
view src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 975:53b3f7d9714c
simplify BlockingChannelConnector
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 16 Oct 2016 01:10:02 -0600 |
parents | 5ee36654b383 |
children | 35d04ac3fd0b |
line wrap: on
line source
// // ======================================================================== // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. // // The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php // // You may elect to redistribute this code under either of these licenses. // ======================================================================== // package org.eclipse.jetty.io.nio; import java.io.IOException; import java.io.InterruptedIOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Locale; import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.EofException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /* ------------------------------------------------------------ */ /** * An Endpoint that can be scheduled by {@link SelectorManager}. */ public final class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint { public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); private final SelectorManager _manager; private final SelectionKey _key; private final Runnable _handler = new Runnable() { public void run() { handle(); } }; /** * The connection instance is the handler for any IO activity on the endpoint. * There is a different type of connection for HTTP, AJP, WebSocket and * ProxyConnect. The connection may change for an SCEP as it is upgraded * from HTTP to proxy connect or websocket. */ private final AsyncConnection _connection; /* private static final int STATE_NEEDS_DISPATCH = -1; private static final int STATE_UNDISPATCHED = 0; private static final int STATE_DISPATCHED = 1; private int _state; */ private boolean isDispatched = false; /** true if the last write operation succeed and wrote all offered bytes */ private volatile boolean _writable = true; /** True if a thread has is blocked in {@link #blockReadable(long)} */ private boolean _readBlocked; /** True if a thread has is blocked in {@link #blockWritable(long)} */ private boolean _writeBlocked; private boolean _ishut = false; public SelectChannelEndPoint(SocketChannel channel, SelectorManager manager, SelectionKey key, int maxIdleTime) throws IOException { super(channel, maxIdleTime); _manager = manager; _key = key; _connection = manager.newConnection(channel,this); } /* ------------------------------------------------------------ */ /** Called by selectSet to schedule handling * */ public synchronized void schedule() { if (!_key.isValid()) { /* _readBlocked = false; _writeBlocked = false; this.notifyAll(); */ _key.cancel(); return; } // If there are threads dispatched reading and writing if (_readBlocked || _writeBlocked) { // assert _dispatched; if (_readBlocked && _key.isReadable()) _readBlocked = false; if (_writeBlocked && _key.isWritable()) _writeBlocked = false; // wake them up is as good as a dispatched. this.notifyAll(); // we are not interested in further selecting _key.interestOps(0); if( !isDispatched ) updateKey(); return; } // Remove writeable op if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { // Remove writeable op int interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; _key.interestOps(interestOps); _writable = true; // Once writable is in ops, only removed with dispatch. } // If dispatched, then deregister interest if (isDispatched) _key.interestOps(0); else { // other wise do the dispatch dispatch(); } } @Override public synchronized void dispatch() { if( !isDispatched ) { isDispatched = true; try { _manager.execute(_handler); } catch(RejectedExecutionException e) { isDispatched = false; LOG.warn("Dispatched Failed! "+this+" to "+_manager); // updateKey(); } } } @Override public int fill(Buffer buffer) throws IOException { int fill=super.fill(buffer); return fill; } @Override public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException { int l = super.flush(header, buffer, trailer); // If there was something to write and it wasn't written, then we are not writable. if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) { synchronized (this) { _writable = false; if( !isDispatched ) updateKey(); } } else if (l>0) { _writable = true; } return l; } @Override public int flush(Buffer buffer) throws IOException { int l = super.flush(buffer); // If there was something to write and it wasn't written, then we are not writable. if (l==0 && buffer!=null && buffer.hasContent()) { synchronized (this) { _writable = false; if( !isDispatched ) updateKey(); } } else if (l>0) { _writable = true; } return l; } /* ------------------------------------------------------------ */ /* * Allows thread to block waiting for further events. */ @Override public synchronized boolean blockReadable(long timeoutMs) throws IOException { if (isInputShutdown()) throw new EofException(); long now = _manager.getNow(); long end = now+timeoutMs; try { _readBlocked = true; while (!isInputShutdown() && _readBlocked) { try { updateKey(); this.wait(timeoutMs>0?(end-now):10000); } catch (final InterruptedException e) { LOG.warn("",e); } finally { now = _manager.getNow(); } if (_readBlocked && timeoutMs>0 && now>=end) return false; } } finally { _readBlocked = false; } return true; } /* ------------------------------------------------------------ */ /* * Allows thread to block waiting for further events. */ @Override public synchronized boolean blockWritable(long timeoutMs) throws IOException { if (isOutputShutdown()) throw new EofException(); long now=_manager.getNow(); long end=now+timeoutMs; try { _writeBlocked = true; while (_writeBlocked && !isOutputShutdown()) { try { updateKey(); this.wait(timeoutMs>0?(end-now):10000); } catch (final InterruptedException e) { LOG.warn("",e); } finally { now = _manager.getNow(); } if (_writeBlocked && timeoutMs>0 && now>=end) return false; } } finally { _writeBlocked = false; } return true; } @Override public boolean hasProgressed() { return false; } /* ------------------------------------------------------------ */ /** * Updates selection key. Adds operations types to the selection key as needed. No operations * are removed as this is only done during dispatch. This method records the new key and * schedules a call to doUpdateKey to do the keyChange */ private synchronized void updateKey() { if( getChannel().isOpen() && _key.isValid()) { boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended()); boolean write_interest = _writeBlocked || (!isDispatched && !_writable); // boolean write_interest = _writeBlocked || !isDispatched; // boolean write_interest = true; int interestOps = ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); if( _key.interestOps() != interestOps ) { _key.interestOps(interestOps); _manager.getSelector().update(); } } else { _key.cancel(); // update needed? } } private void handle() { try { try { _connection.handle(); } catch (ClosedChannelException e) { LOG.trace("",e); } catch (EofException e) { LOG.debug("EOF", e); try{close();} catch(IOException e2){LOG.trace("",e2);} } catch (IOException e) { LOG.warn(e.toString()); try{close();} catch(IOException e2){LOG.trace("",e2);} } catch (Throwable e) { LOG.warn("handle failed", e); try{close();} catch(IOException e2){LOG.trace("",e2);} } finally { if (!_ishut && isInputShutdown() && isOpen()) { _ishut = true; try { _connection.onInputShutdown(); } catch(Throwable x) { LOG.warn("onInputShutdown failed", x); try{close();} catch(IOException e2){LOG.trace("",e2);} } } } } finally { isDispatched = false; updateKey(); } } @Override public void close() throws IOException { try { super.close(); } catch (IOException e) { LOG.trace("",e); } finally { updateKey(); } } @Override public String toString() { // Do NOT use synchronized (this) // because it's very easy to deadlock when debugging is enabled. // We do a best effort to print the right toString() and that's it. SelectionKey key = _key; String keyString = ""; if (key.isValid()) { if (key.isReadable()) keyString += "r"; if (key.isWritable()) keyString += "w"; } else { keyString += "!"; } return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", hashCode(), _socket.getRemoteSocketAddress(), _socket.getLocalSocketAddress(), isDispatched, isOpen(), isInputShutdown(), isOutputShutdown(), _readBlocked, _writeBlocked, _writable, keyString, _connection); } /* ------------------------------------------------------------ */ /** * Don't set the SoTimeout * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) */ @Override public void setMaxIdleTime(int timeMs) throws IOException { _maxIdleTime = timeMs; } }