comparison src/org/eclipse/jetty/server/nio/BlockingChannelConnector.java @ 802:3428c60d7cfc

replace jetty jars with source
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 07 Sep 2016 21:15:48 -0600
parents
children 8e9db0bbf4f9
comparison
equal deleted inserted replaced
801:6a21393191c1 802:3428c60d7cfc
1 //
2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
8 //
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
11 //
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
14 //
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
17 //
18
19 package org.eclipse.jetty.server.nio;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.Socket;
24 import java.nio.channels.ByteChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.ServerSocketChannel;
27 import java.nio.channels.SocketChannel;
28 import java.util.Set;
29
30 import org.eclipse.jetty.http.HttpException;
31 import org.eclipse.jetty.io.Buffer;
32 import org.eclipse.jetty.io.ConnectedEndPoint;
33 import org.eclipse.jetty.io.Connection;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.io.EofException;
36 import org.eclipse.jetty.io.nio.ChannelEndPoint;
37 import org.eclipse.jetty.server.BlockingHttpConnection;
38 import org.eclipse.jetty.server.Request;
39 import org.eclipse.jetty.util.ConcurrentHashSet;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.log.Logger;
42
43
44 /* ------------------------------------------------------------------------------- */
45 /** Blocking NIO connector.
46 * This connector uses efficient NIO buffers with a traditional blocking thread model.
47 * Direct NIO buffers are used and a thread is allocated per connections.
48 *
49 * This connector is best used when there are a few very active connections.
50 *
51 * @org.apache.xbean.XBean element="blockingNioConnector" description="Creates a blocking NIO based socket connector"
52 *
53 *
54 *
55 */
56 public class BlockingChannelConnector extends AbstractNIOConnector
57 {
58 private static final Logger LOG = Log.getLogger(BlockingChannelConnector.class);
59
60 private transient ServerSocketChannel _acceptChannel;
61 private final Set<BlockingChannelEndPoint> _endpoints = new ConcurrentHashSet<BlockingChannelEndPoint>();
62
63
64 /* ------------------------------------------------------------ */
65 /** Constructor.
66 *
67 */
68 public BlockingChannelConnector()
69 {
70 }
71
72 /* ------------------------------------------------------------ */
73 public Object getConnection()
74 {
75 return _acceptChannel;
76 }
77
78 /* ------------------------------------------------------------ */
79 /**
80 * @see org.eclipse.jetty.server.AbstractConnector#doStart()
81 */
82 @Override
83 protected void doStart() throws Exception
84 {
85 super.doStart();
86 getThreadPool().dispatch(new Runnable()
87 {
88
89 public void run()
90 {
91 while (isRunning())
92 {
93 try
94 {
95 Thread.sleep(400);
96 long now=System.currentTimeMillis();
97 for (BlockingChannelEndPoint endp : _endpoints)
98 {
99 endp.checkIdleTimestamp(now);
100 }
101 }
102 catch(InterruptedException e)
103 {
104 LOG.ignore(e);
105 }
106 catch(Exception e)
107 {
108 LOG.warn(e);
109 }
110 }
111 }
112
113 });
114
115 }
116
117
118 /* ------------------------------------------------------------ */
119 public void open() throws IOException
120 {
121 // Create a new server socket and set to non blocking mode
122 _acceptChannel= ServerSocketChannel.open();
123 _acceptChannel.configureBlocking(true);
124
125 // Bind the server socket to the local host and port
126 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
127 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
128 }
129
130 /* ------------------------------------------------------------ */
131 public void close() throws IOException
132 {
133 if (_acceptChannel != null)
134 _acceptChannel.close();
135 _acceptChannel=null;
136 }
137
138 /* ------------------------------------------------------------ */
139 @Override
140 public void accept(int acceptorID)
141 throws IOException, InterruptedException
142 {
143 SocketChannel channel = _acceptChannel.accept();
144 channel.configureBlocking(true);
145 Socket socket=channel.socket();
146 configure(socket);
147
148 BlockingChannelEndPoint connection=new BlockingChannelEndPoint(channel);
149 connection.dispatch();
150 }
151
152 /* ------------------------------------------------------------------------------- */
153 @Override
154 public void customize(EndPoint endpoint, Request request)
155 throws IOException
156 {
157 super.customize(endpoint, request);
158 endpoint.setMaxIdleTime(_maxIdleTime);
159 configure(((SocketChannel)endpoint.getTransport()).socket());
160 }
161
162
163 /* ------------------------------------------------------------------------------- */
164 public int getLocalPort()
165 {
166 if (_acceptChannel==null || !_acceptChannel.isOpen())
167 return -1;
168 return _acceptChannel.socket().getLocalPort();
169 }
170
171 /* ------------------------------------------------------------------------------- */
172 /* ------------------------------------------------------------------------------- */
173 /* ------------------------------------------------------------------------------- */
174 private class BlockingChannelEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
175 {
176 private Connection _connection;
177 private int _timeout;
178 private volatile long _idleTimestamp;
179
180 BlockingChannelEndPoint(ByteChannel channel)
181 throws IOException
182 {
183 super(channel,BlockingChannelConnector.this._maxIdleTime);
184 _connection = new BlockingHttpConnection(BlockingChannelConnector.this,this,getServer());
185 }
186
187 /* ------------------------------------------------------------ */
188 /** Get the connection.
189 * @return the connection
190 */
191 public Connection getConnection()
192 {
193 return _connection;
194 }
195
196 /* ------------------------------------------------------------ */
197 public void setConnection(Connection connection)
198 {
199 _connection=connection;
200 }
201
202 /* ------------------------------------------------------------ */
203 public void checkIdleTimestamp(long now)
204 {
205 if (_idleTimestamp!=0 && _timeout>0 && now>(_idleTimestamp+_timeout))
206 {
207 idleExpired();
208 }
209 }
210
211 /* ------------------------------------------------------------ */
212 protected void idleExpired()
213 {
214 try
215 {
216 super.close();
217 }
218 catch (IOException e)
219 {
220 LOG.ignore(e);
221 }
222 }
223
224 /* ------------------------------------------------------------ */
225 void dispatch() throws IOException
226 {
227 if (!getThreadPool().dispatch(this))
228 {
229 LOG.warn("dispatch failed for {}",_connection);
230 super.close();
231 }
232 }
233
234 /* ------------------------------------------------------------ */
235 /**
236 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#fill(org.eclipse.jetty.io.Buffer)
237 */
238 @Override
239 public int fill(Buffer buffer) throws IOException
240 {
241 _idleTimestamp=System.currentTimeMillis();
242 return super.fill(buffer);
243 }
244
245 /* ------------------------------------------------------------ */
246 /**
247 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer)
248 */
249 @Override
250 public int flush(Buffer buffer) throws IOException
251 {
252 _idleTimestamp=System.currentTimeMillis();
253 return super.flush(buffer);
254 }
255
256 /* ------------------------------------------------------------ */
257 /**
258 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#flush(org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer, org.eclipse.jetty.io.Buffer)
259 */
260 @Override
261 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
262 {
263 _idleTimestamp=System.currentTimeMillis();
264 return super.flush(header,buffer,trailer);
265 }
266
267 /* ------------------------------------------------------------ */
268 public void run()
269 {
270 try
271 {
272 _timeout=getMaxIdleTime();
273 connectionOpened(_connection);
274 _endpoints.add(this);
275
276 while (isOpen())
277 {
278 _idleTimestamp=System.currentTimeMillis();
279 if (_connection.isIdle())
280 {
281 if (getServer().getThreadPool().isLowOnThreads())
282 {
283 int lrmit = getLowResourcesMaxIdleTime();
284 if (lrmit>=0 && _timeout!= lrmit)
285 {
286 _timeout=lrmit;
287 }
288 }
289 }
290 else
291 {
292 if (_timeout!=getMaxIdleTime())
293 {
294 _timeout=getMaxIdleTime();
295 }
296 }
297
298 _connection = _connection.handle();
299
300 }
301 }
302 catch (EofException e)
303 {
304 LOG.debug("EOF", e);
305 try{BlockingChannelEndPoint.this.close();}
306 catch(IOException e2){LOG.ignore(e2);}
307 }
308 catch (HttpException e)
309 {
310 LOG.debug("BAD", e);
311 try{super.close();}
312 catch(IOException e2){LOG.ignore(e2);}
313 }
314 catch(Throwable e)
315 {
316 LOG.warn("handle failed",e);
317 try{super.close();}
318 catch(IOException e2){LOG.ignore(e2);}
319 }
320 finally
321 {
322 connectionClosed(_connection);
323 _endpoints.remove(this);
324
325 // wait for client to close, but if not, close ourselves.
326 try
327 {
328 if (!_socket.isClosed())
329 {
330 long timestamp=System.currentTimeMillis();
331 int max_idle=getMaxIdleTime();
332
333 _socket.setSoTimeout(getMaxIdleTime());
334 int c=0;
335 do
336 {
337 c = _socket.getInputStream().read();
338 }
339 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
340 if (!_socket.isClosed())
341 _socket.close();
342 }
343 }
344 catch(IOException e)
345 {
346 LOG.ignore(e);
347 }
348 }
349 }
350
351 /* ------------------------------------------------------------ */
352 @Override
353 public String toString()
354 {
355 return String.format("BCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b}-{%s}",
356 hashCode(),
357 _socket.getRemoteSocketAddress(),
358 _socket.getLocalSocketAddress(),
359 isOpen(),
360 isInputShutdown(),
361 isOutputShutdown(),
362 _connection);
363 }
364
365 }
366 }