Mercurial Hosting > luan
comparison src/org/eclipse/jetty/server/handler/ConnectHandler.java @ 802:3428c60d7cfc
replace jetty jars with source
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 07 Sep 2016 21:15:48 -0600 |
parents | |
children | 8e9db0bbf4f9 |
comparison
equal
deleted
inserted
replaced
801:6a21393191c1 | 802:3428c60d7cfc |
---|---|
1 // | |
2 // ======================================================================== | |
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. | |
4 // ------------------------------------------------------------------------ | |
5 // All rights reserved. This program and the accompanying materials | |
6 // are made available under the terms of the Eclipse Public License v1.0 | |
7 // and Apache License v2.0 which accompanies this distribution. | |
8 // | |
9 // The Eclipse Public License is available at | |
10 // http://www.eclipse.org/legal/epl-v10.html | |
11 // | |
12 // The Apache License v2.0 is available at | |
13 // http://www.opensource.org/licenses/apache2.0.php | |
14 // | |
15 // You may elect to redistribute this code under either of these licenses. | |
16 // ======================================================================== | |
17 // | |
18 | |
19 package org.eclipse.jetty.server.handler; | |
20 | |
21 import java.io.IOException; | |
22 import java.net.InetSocketAddress; | |
23 import java.net.SocketException; | |
24 import java.net.SocketTimeoutException; | |
25 import java.nio.channels.ClosedChannelException; | |
26 import java.nio.channels.SelectionKey; | |
27 import java.nio.channels.SocketChannel; | |
28 import java.util.Arrays; | |
29 import java.util.concurrent.ConcurrentHashMap; | |
30 import java.util.concurrent.ConcurrentMap; | |
31 import java.util.concurrent.CountDownLatch; | |
32 import java.util.concurrent.TimeUnit; | |
33 import javax.servlet.ServletException; | |
34 import javax.servlet.http.HttpServletRequest; | |
35 import javax.servlet.http.HttpServletResponse; | |
36 | |
37 import org.eclipse.jetty.http.HttpMethods; | |
38 import org.eclipse.jetty.http.HttpParser; | |
39 import org.eclipse.jetty.io.AsyncEndPoint; | |
40 import org.eclipse.jetty.io.Buffer; | |
41 import org.eclipse.jetty.io.ConnectedEndPoint; | |
42 import org.eclipse.jetty.io.Connection; | |
43 import org.eclipse.jetty.io.EndPoint; | |
44 import org.eclipse.jetty.io.nio.AsyncConnection; | |
45 import org.eclipse.jetty.io.nio.IndirectNIOBuffer; | |
46 import org.eclipse.jetty.io.nio.SelectChannelEndPoint; | |
47 import org.eclipse.jetty.io.nio.SelectorManager; | |
48 import org.eclipse.jetty.server.AbstractHttpConnection; | |
49 import org.eclipse.jetty.server.Handler; | |
50 import org.eclipse.jetty.server.Request; | |
51 import org.eclipse.jetty.server.Server; | |
52 import org.eclipse.jetty.util.HostMap; | |
53 import org.eclipse.jetty.util.TypeUtil; | |
54 import org.eclipse.jetty.util.component.LifeCycle; | |
55 import org.eclipse.jetty.util.log.Log; | |
56 import org.eclipse.jetty.util.log.Logger; | |
57 import org.eclipse.jetty.util.thread.ThreadPool; | |
58 | |
59 /** | |
60 * <p>Implementation of a tunneling proxy that supports HTTP CONNECT.</p> | |
61 * <p>To work as CONNECT proxy, objects of this class must be instantiated using the no-arguments | |
62 * constructor, since the remote server information will be present in the CONNECT URI.</p> | |
63 */ | |
64 public class ConnectHandler extends HandlerWrapper | |
65 { | |
66 private static final Logger LOG = Log.getLogger(ConnectHandler.class); | |
67 private final SelectorManager _selectorManager = new Manager(); | |
68 private volatile int _connectTimeout = 5000; | |
69 private volatile int _writeTimeout = 30000; | |
70 private volatile ThreadPool _threadPool; | |
71 private volatile boolean _privateThreadPool; | |
72 private HostMap<String> _white = new HostMap<String>(); | |
73 private HostMap<String> _black = new HostMap<String>(); | |
74 | |
75 public ConnectHandler() | |
76 { | |
77 this(null); | |
78 } | |
79 | |
80 public ConnectHandler(String[] white, String[] black) | |
81 { | |
82 this(null, white, black); | |
83 } | |
84 | |
85 public ConnectHandler(Handler handler) | |
86 { | |
87 setHandler(handler); | |
88 } | |
89 | |
90 public ConnectHandler(Handler handler, String[] white, String[] black) | |
91 { | |
92 setHandler(handler); | |
93 set(white, _white); | |
94 set(black, _black); | |
95 } | |
96 | |
97 /** | |
98 * @return the timeout, in milliseconds, to connect to the remote server | |
99 */ | |
100 public int getConnectTimeout() | |
101 { | |
102 return _connectTimeout; | |
103 } | |
104 | |
105 /** | |
106 * @param connectTimeout the timeout, in milliseconds, to connect to the remote server | |
107 */ | |
108 public void setConnectTimeout(int connectTimeout) | |
109 { | |
110 _connectTimeout = connectTimeout; | |
111 } | |
112 | |
113 /** | |
114 * @return the timeout, in milliseconds, to write data to a peer | |
115 */ | |
116 public int getWriteTimeout() | |
117 { | |
118 return _writeTimeout; | |
119 } | |
120 | |
121 /** | |
122 * @param writeTimeout the timeout, in milliseconds, to write data to a peer | |
123 */ | |
124 public void setWriteTimeout(int writeTimeout) | |
125 { | |
126 _writeTimeout = writeTimeout; | |
127 } | |
128 | |
129 @Override | |
130 public void setServer(Server server) | |
131 { | |
132 super.setServer(server); | |
133 | |
134 server.getContainer().update(this, null, _selectorManager, "selectManager"); | |
135 | |
136 if (_privateThreadPool) | |
137 server.getContainer().update(this, null, _privateThreadPool, "threadpool", true); | |
138 else | |
139 _threadPool = server.getThreadPool(); | |
140 } | |
141 | |
142 /** | |
143 * @return the thread pool | |
144 */ | |
145 public ThreadPool getThreadPool() | |
146 { | |
147 return _threadPool; | |
148 } | |
149 | |
150 /** | |
151 * @param threadPool the thread pool | |
152 */ | |
153 public void setThreadPool(ThreadPool threadPool) | |
154 { | |
155 if (getServer() != null) | |
156 getServer().getContainer().update(this, _privateThreadPool ? _threadPool : null, threadPool, "threadpool", true); | |
157 _privateThreadPool = threadPool != null; | |
158 _threadPool = threadPool; | |
159 } | |
160 | |
161 @Override | |
162 protected void doStart() throws Exception | |
163 { | |
164 super.doStart(); | |
165 | |
166 if (_threadPool == null) | |
167 { | |
168 _threadPool = getServer().getThreadPool(); | |
169 _privateThreadPool = false; | |
170 } | |
171 if (_threadPool instanceof LifeCycle && !((LifeCycle)_threadPool).isRunning()) | |
172 ((LifeCycle)_threadPool).start(); | |
173 | |
174 _selectorManager.start(); | |
175 } | |
176 | |
177 @Override | |
178 protected void doStop() throws Exception | |
179 { | |
180 _selectorManager.stop(); | |
181 | |
182 ThreadPool threadPool = _threadPool; | |
183 if (_privateThreadPool && _threadPool != null && threadPool instanceof LifeCycle) | |
184 ((LifeCycle)threadPool).stop(); | |
185 | |
186 super.doStop(); | |
187 } | |
188 | |
189 @Override | |
190 public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException | |
191 { | |
192 if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod())) | |
193 { | |
194 LOG.debug("CONNECT request for {}", request.getRequestURI()); | |
195 try | |
196 { | |
197 handleConnect(baseRequest, request, response, request.getRequestURI()); | |
198 } | |
199 catch(Exception e) | |
200 { | |
201 LOG.warn("ConnectHandler "+baseRequest.getUri()+" "+ e); | |
202 LOG.debug(e); | |
203 } | |
204 } | |
205 else | |
206 { | |
207 super.handle(target, baseRequest, request, response); | |
208 } | |
209 } | |
210 | |
211 /** | |
212 * <p>Handles a CONNECT request.</p> | |
213 * <p>CONNECT requests may have authentication headers such as <code>Proxy-Authorization</code> | |
214 * that authenticate the client with the proxy.</p> | |
215 * | |
216 * @param baseRequest Jetty-specific http request | |
217 * @param request the http request | |
218 * @param response the http response | |
219 * @param serverAddress the remote server address in the form {@code host:port} | |
220 * @throws ServletException if an application error occurs | |
221 * @throws IOException if an I/O error occurs | |
222 */ | |
223 protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress) throws ServletException, IOException | |
224 { | |
225 boolean proceed = handleAuthentication(request, response, serverAddress); | |
226 if (!proceed) | |
227 return; | |
228 | |
229 String host = serverAddress; | |
230 int port = 80; | |
231 int colon = serverAddress.indexOf(':'); | |
232 if (colon > 0) | |
233 { | |
234 host = serverAddress.substring(0, colon); | |
235 port = Integer.parseInt(serverAddress.substring(colon + 1)); | |
236 } | |
237 | |
238 if (!validateDestination(host)) | |
239 { | |
240 LOG.info("ProxyHandler: Forbidden destination " + host); | |
241 response.setStatus(HttpServletResponse.SC_FORBIDDEN); | |
242 baseRequest.setHandled(true); | |
243 return; | |
244 } | |
245 | |
246 SocketChannel channel; | |
247 | |
248 try | |
249 { | |
250 channel = connectToServer(request,host,port); | |
251 } | |
252 catch (SocketException se) | |
253 { | |
254 LOG.info("ConnectHandler: SocketException " + se.getMessage()); | |
255 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); | |
256 baseRequest.setHandled(true); | |
257 return; | |
258 } | |
259 catch (SocketTimeoutException ste) | |
260 { | |
261 LOG.info("ConnectHandler: SocketTimeoutException" + ste.getMessage()); | |
262 response.setStatus(HttpServletResponse.SC_GATEWAY_TIMEOUT); | |
263 baseRequest.setHandled(true); | |
264 return; | |
265 } | |
266 catch (IOException ioe) | |
267 { | |
268 LOG.info("ConnectHandler: IOException" + ioe.getMessage()); | |
269 response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); | |
270 baseRequest.setHandled(true); | |
271 return; | |
272 } | |
273 | |
274 // Transfer unread data from old connection to new connection | |
275 // We need to copy the data to avoid races: | |
276 // 1. when this unread data is written and the server replies before the clientToProxy | |
277 // connection is installed (it is only installed after returning from this method) | |
278 // 2. when the client sends data before this unread data has been written. | |
279 AbstractHttpConnection httpConnection = AbstractHttpConnection.getCurrentConnection(); | |
280 Buffer headerBuffer = ((HttpParser)httpConnection.getParser()).getHeaderBuffer(); | |
281 Buffer bodyBuffer = ((HttpParser)httpConnection.getParser()).getBodyBuffer(); | |
282 int length = headerBuffer == null ? 0 : headerBuffer.length(); | |
283 length += bodyBuffer == null ? 0 : bodyBuffer.length(); | |
284 IndirectNIOBuffer buffer = null; | |
285 if (length > 0) | |
286 { | |
287 buffer = new IndirectNIOBuffer(length); | |
288 if (headerBuffer != null) | |
289 { | |
290 buffer.put(headerBuffer); | |
291 headerBuffer.clear(); | |
292 } | |
293 if (bodyBuffer != null) | |
294 { | |
295 buffer.put(bodyBuffer); | |
296 bodyBuffer.clear(); | |
297 } | |
298 } | |
299 | |
300 ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>(); | |
301 prepareContext(request, context); | |
302 | |
303 ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer); | |
304 | |
305 // CONNECT expects a 200 response | |
306 response.setStatus(HttpServletResponse.SC_OK); | |
307 | |
308 // Prevent close | |
309 baseRequest.getConnection().getGenerator().setPersistent(true); | |
310 | |
311 // Close to force last flush it so that the client receives it | |
312 response.getOutputStream().close(); | |
313 | |
314 upgradeConnection(request, response, clientToProxy); | |
315 } | |
316 | |
317 private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer) | |
318 { | |
319 AbstractHttpConnection httpConnection = AbstractHttpConnection.getCurrentConnection(); | |
320 ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer); | |
321 ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp()); | |
322 clientToProxy.setConnection(proxyToServer); | |
323 proxyToServer.setConnection(clientToProxy); | |
324 return clientToProxy; | |
325 } | |
326 | |
327 /** | |
328 * <p>Handles the authentication before setting up the tunnel to the remote server.</p> | |
329 * <p>The default implementation returns true.</p> | |
330 * | |
331 * @param request the HTTP request | |
332 * @param response the HTTP response | |
333 * @param address the address of the remote server in the form {@code host:port}. | |
334 * @return true to allow to connect to the remote host, false otherwise | |
335 * @throws ServletException to report a server error to the caller | |
336 * @throws IOException to report a server error to the caller | |
337 */ | |
338 protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException | |
339 { | |
340 return true; | |
341 } | |
342 | |
343 protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp) | |
344 { | |
345 return new ClientToProxyConnection(context, channel, endPoint, timeStamp); | |
346 } | |
347 | |
348 protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer) | |
349 { | |
350 return new ProxyToServerConnection(context, buffer); | |
351 } | |
352 | |
353 // may return null | |
354 private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException | |
355 { | |
356 SocketChannel channel = connect(request, host, port); | |
357 channel.configureBlocking(false); | |
358 return channel; | |
359 } | |
360 | |
361 /** | |
362 * <p>Establishes a connection to the remote server.</p> | |
363 * | |
364 * @param request the HTTP request that initiated the tunnel | |
365 * @param host the host to connect to | |
366 * @param port the port to connect to | |
367 * @return a {@link SocketChannel} connected to the remote server | |
368 * @throws IOException if the connection cannot be established | |
369 */ | |
370 protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException | |
371 { | |
372 SocketChannel channel = SocketChannel.open(); | |
373 | |
374 if (channel == null) | |
375 { | |
376 throw new IOException("unable to connect to " + host + ":" + port); | |
377 } | |
378 | |
379 try | |
380 { | |
381 // Connect to remote server | |
382 LOG.debug("Establishing connection to {}:{}", host, port); | |
383 channel.socket().setTcpNoDelay(true); | |
384 channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout()); | |
385 LOG.debug("Established connection to {}:{}", host, port); | |
386 return channel; | |
387 } | |
388 catch (IOException x) | |
389 { | |
390 LOG.debug("Failed to establish connection to " + host + ":" + port, x); | |
391 try | |
392 { | |
393 channel.close(); | |
394 } | |
395 catch (IOException xx) | |
396 { | |
397 LOG.ignore(xx); | |
398 } | |
399 throw x; | |
400 } | |
401 } | |
402 | |
403 protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context) | |
404 { | |
405 } | |
406 | |
407 private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException | |
408 { | |
409 // Set the new connection as request attribute and change the status to 101 | |
410 // so that Jetty understands that it has to upgrade the connection | |
411 request.setAttribute("org.eclipse.jetty.io.Connection", connection); | |
412 response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); | |
413 LOG.debug("Upgraded connection to {}", connection); | |
414 } | |
415 | |
416 private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException | |
417 { | |
418 _selectorManager.register(channel, proxyToServer); | |
419 proxyToServer.waitReady(_connectTimeout); | |
420 } | |
421 | |
422 /** | |
423 * <p>Reads (with non-blocking semantic) into the given {@code buffer} from the given {@code endPoint}.</p> | |
424 * | |
425 * @param endPoint the endPoint to read from | |
426 * @param buffer the buffer to read data into | |
427 * @param context the context information related to the connection | |
428 * @return the number of bytes read (possibly 0 since the read is non-blocking) | |
429 * or -1 if the channel has been closed remotely | |
430 * @throws IOException if the endPoint cannot be read | |
431 */ | |
432 protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException | |
433 { | |
434 return endPoint.fill(buffer); | |
435 } | |
436 | |
437 /** | |
438 * <p>Writes (with blocking semantic) the given buffer of data onto the given endPoint.</p> | |
439 * | |
440 * @param endPoint the endPoint to write to | |
441 * @param buffer the buffer to write | |
442 * @param context the context information related to the connection | |
443 * @throws IOException if the buffer cannot be written | |
444 * @return the number of bytes written | |
445 */ | |
446 protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException | |
447 { | |
448 if (buffer == null) | |
449 return 0; | |
450 | |
451 int length = buffer.length(); | |
452 final StringBuilder debug = LOG.isDebugEnabled()?new StringBuilder():null; | |
453 int flushed = endPoint.flush(buffer); | |
454 if (debug!=null) | |
455 debug.append(flushed); | |
456 | |
457 // Loop until all written | |
458 while (buffer.length()>0 && !endPoint.isOutputShutdown()) | |
459 { | |
460 if (!endPoint.isBlocking()) | |
461 { | |
462 boolean ready = endPoint.blockWritable(getWriteTimeout()); | |
463 if (!ready) | |
464 throw new IOException("Write timeout"); | |
465 } | |
466 flushed = endPoint.flush(buffer); | |
467 if (debug!=null) | |
468 debug.append("+").append(flushed); | |
469 } | |
470 | |
471 LOG.debug("Written {}/{} bytes {}", debug, length, endPoint); | |
472 buffer.compact(); | |
473 return length; | |
474 } | |
475 | |
476 private class Manager extends SelectorManager | |
477 { | |
478 @Override | |
479 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException | |
480 { | |
481 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, key, channel.socket().getSoTimeout()); | |
482 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); | |
483 endp.setMaxIdleTime(_writeTimeout); | |
484 return endp; | |
485 } | |
486 | |
487 @Override | |
488 public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) | |
489 { | |
490 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)attachment; | |
491 proxyToServer.setTimeStamp(System.currentTimeMillis()); | |
492 proxyToServer.setEndPoint(endpoint); | |
493 return proxyToServer; | |
494 } | |
495 | |
496 @Override | |
497 protected void endPointOpened(SelectChannelEndPoint endpoint) | |
498 { | |
499 ProxyToServerConnection proxyToServer = (ProxyToServerConnection)endpoint.getSelectionKey().attachment(); | |
500 proxyToServer.ready(); | |
501 } | |
502 | |
503 @Override | |
504 public boolean dispatch(Runnable task) | |
505 { | |
506 return _threadPool.dispatch(task); | |
507 } | |
508 | |
509 @Override | |
510 protected void endPointClosed(SelectChannelEndPoint endpoint) | |
511 { | |
512 } | |
513 | |
514 @Override | |
515 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) | |
516 { | |
517 } | |
518 } | |
519 | |
520 public class ProxyToServerConnection implements AsyncConnection | |
521 { | |
522 private final CountDownLatch _ready = new CountDownLatch(1); | |
523 private final Buffer _buffer = new IndirectNIOBuffer(4096); | |
524 private final ConcurrentMap<String, Object> _context; | |
525 private volatile Buffer _data; | |
526 private volatile ClientToProxyConnection _toClient; | |
527 private volatile long _timestamp; | |
528 private volatile AsyncEndPoint _endPoint; | |
529 | |
530 public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data) | |
531 { | |
532 _context = context; | |
533 _data = data; | |
534 } | |
535 | |
536 @Override | |
537 public String toString() | |
538 { | |
539 StringBuilder builder = new StringBuilder("ProxyToServer"); | |
540 builder.append("(:").append(_endPoint.getLocalPort()); | |
541 builder.append("<=>:").append(_endPoint.getRemotePort()); | |
542 return builder.append(")").toString(); | |
543 } | |
544 | |
545 public Connection handle() throws IOException | |
546 { | |
547 LOG.debug("{}: begin reading from server", this); | |
548 try | |
549 { | |
550 writeData(); | |
551 | |
552 while (true) | |
553 { | |
554 int read = read(_endPoint, _buffer, _context); | |
555 | |
556 if (read == -1) | |
557 { | |
558 LOG.debug("{}: server closed connection {}", this, _endPoint); | |
559 | |
560 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen()) | |
561 closeClient(); | |
562 else | |
563 _toClient.shutdownOutput(); | |
564 | |
565 break; | |
566 } | |
567 | |
568 if (read == 0) | |
569 break; | |
570 | |
571 LOG.debug("{}: read from server {} bytes {}", this, read, _endPoint); | |
572 int written = write(_toClient._endPoint, _buffer, _context); | |
573 LOG.debug("{}: written to {} {} bytes", this, _toClient, written); | |
574 } | |
575 return this; | |
576 } | |
577 catch (ClosedChannelException x) | |
578 { | |
579 LOG.debug(x); | |
580 throw x; | |
581 } | |
582 catch (IOException x) | |
583 { | |
584 LOG.warn(this + ": unexpected exception", x); | |
585 close(); | |
586 throw x; | |
587 } | |
588 catch (RuntimeException x) | |
589 { | |
590 LOG.warn(this + ": unexpected exception", x); | |
591 close(); | |
592 throw x; | |
593 } | |
594 finally | |
595 { | |
596 LOG.debug("{}: end reading from server", this); | |
597 } | |
598 } | |
599 | |
600 public void onInputShutdown() throws IOException | |
601 { | |
602 } | |
603 | |
604 private void writeData() throws IOException | |
605 { | |
606 // This method is called from handle() and closeServer() | |
607 // which may happen concurrently (e.g. a client closing | |
608 // while reading from the server), so needs synchronization | |
609 synchronized (this) | |
610 { | |
611 if (_data != null) | |
612 { | |
613 try | |
614 { | |
615 int written = write(_endPoint, _data, _context); | |
616 LOG.debug("{}: written to server {} bytes", this, written); | |
617 } | |
618 finally | |
619 { | |
620 // Attempt once to write the data; if the write fails (for example | |
621 // because the connection is already closed), clear the data and | |
622 // give up to avoid to continue to write data to a closed connection | |
623 _data = null; | |
624 } | |
625 } | |
626 } | |
627 } | |
628 | |
629 public void setConnection(ClientToProxyConnection connection) | |
630 { | |
631 _toClient = connection; | |
632 } | |
633 | |
634 public long getTimeStamp() | |
635 { | |
636 return _timestamp; | |
637 } | |
638 | |
639 public void setTimeStamp(long timestamp) | |
640 { | |
641 _timestamp = timestamp; | |
642 } | |
643 | |
644 public void setEndPoint(AsyncEndPoint endpoint) | |
645 { | |
646 _endPoint = endpoint; | |
647 } | |
648 | |
649 public boolean isIdle() | |
650 { | |
651 return false; | |
652 } | |
653 | |
654 public boolean isSuspended() | |
655 { | |
656 return false; | |
657 } | |
658 | |
659 public void onClose() | |
660 { | |
661 } | |
662 | |
663 public void ready() | |
664 { | |
665 _ready.countDown(); | |
666 } | |
667 | |
668 public void waitReady(long timeout) throws IOException | |
669 { | |
670 try | |
671 { | |
672 _ready.await(timeout, TimeUnit.MILLISECONDS); | |
673 } | |
674 catch (final InterruptedException x) | |
675 { | |
676 throw new IOException() | |
677 {{ | |
678 initCause(x); | |
679 }}; | |
680 } | |
681 } | |
682 | |
683 public void closeClient() throws IOException | |
684 { | |
685 _toClient.closeClient(); | |
686 } | |
687 | |
688 public void closeServer() throws IOException | |
689 { | |
690 _endPoint.close(); | |
691 } | |
692 | |
693 public void close() | |
694 { | |
695 try | |
696 { | |
697 closeClient(); | |
698 } | |
699 catch (IOException x) | |
700 { | |
701 LOG.debug(this + ": unexpected exception closing the client", x); | |
702 } | |
703 | |
704 try | |
705 { | |
706 closeServer(); | |
707 } | |
708 catch (IOException x) | |
709 { | |
710 LOG.debug(this + ": unexpected exception closing the server", x); | |
711 } | |
712 } | |
713 | |
714 public void shutdownOutput() throws IOException | |
715 { | |
716 writeData(); | |
717 _endPoint.shutdownOutput(); | |
718 } | |
719 | |
720 public void onIdleExpired(long idleForMs) | |
721 { | |
722 try | |
723 { | |
724 LOG.debug("{} idle expired", this); | |
725 if (_endPoint.isOutputShutdown()) | |
726 close(); | |
727 else | |
728 shutdownOutput(); | |
729 } | |
730 catch(Exception e) | |
731 { | |
732 LOG.debug(e); | |
733 close(); | |
734 } | |
735 } | |
736 } | |
737 | |
738 public class ClientToProxyConnection implements AsyncConnection | |
739 { | |
740 private final Buffer _buffer = new IndirectNIOBuffer(4096); | |
741 private final ConcurrentMap<String, Object> _context; | |
742 private final SocketChannel _channel; | |
743 private final EndPoint _endPoint; | |
744 private final long _timestamp; | |
745 private volatile ProxyToServerConnection _toServer; | |
746 private boolean _firstTime = true; | |
747 | |
748 public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp) | |
749 { | |
750 _context = context; | |
751 _channel = channel; | |
752 _endPoint = endPoint; | |
753 _timestamp = timestamp; | |
754 } | |
755 | |
756 @Override | |
757 public String toString() | |
758 { | |
759 StringBuilder builder = new StringBuilder("ClientToProxy"); | |
760 builder.append("(:").append(_endPoint.getLocalPort()); | |
761 builder.append("<=>:").append(_endPoint.getRemotePort()); | |
762 return builder.append(")").toString(); | |
763 } | |
764 | |
765 public Connection handle() throws IOException | |
766 { | |
767 LOG.debug("{}: begin reading from client", this); | |
768 try | |
769 { | |
770 if (_firstTime) | |
771 { | |
772 _firstTime = false; | |
773 register(_channel, _toServer); | |
774 LOG.debug("{}: registered channel {} with connection {}", this, _channel, _toServer); | |
775 } | |
776 | |
777 while (true) | |
778 { | |
779 int read = read(_endPoint, _buffer, _context); | |
780 | |
781 if (read == -1) | |
782 { | |
783 LOG.debug("{}: client closed connection {}", this, _endPoint); | |
784 | |
785 if (_endPoint.isOutputShutdown() || !_endPoint.isOpen()) | |
786 closeServer(); | |
787 else | |
788 _toServer.shutdownOutput(); | |
789 | |
790 break; | |
791 } | |
792 | |
793 if (read == 0) | |
794 break; | |
795 | |
796 LOG.debug("{}: read from client {} bytes {}", this, read, _endPoint); | |
797 int written = write(_toServer._endPoint, _buffer, _context); | |
798 LOG.debug("{}: written to {} {} bytes", this, _toServer, written); | |
799 } | |
800 return this; | |
801 } | |
802 catch (ClosedChannelException x) | |
803 { | |
804 LOG.debug(x); | |
805 closeServer(); | |
806 throw x; | |
807 } | |
808 catch (IOException x) | |
809 { | |
810 LOG.warn(this + ": unexpected exception", x); | |
811 close(); | |
812 throw x; | |
813 } | |
814 catch (RuntimeException x) | |
815 { | |
816 LOG.warn(this + ": unexpected exception", x); | |
817 close(); | |
818 throw x; | |
819 } | |
820 finally | |
821 { | |
822 LOG.debug("{}: end reading from client", this); | |
823 } | |
824 } | |
825 | |
826 public void onInputShutdown() throws IOException | |
827 { | |
828 } | |
829 | |
830 public long getTimeStamp() | |
831 { | |
832 return _timestamp; | |
833 } | |
834 | |
835 public boolean isIdle() | |
836 { | |
837 return false; | |
838 } | |
839 | |
840 public boolean isSuspended() | |
841 { | |
842 return false; | |
843 } | |
844 | |
845 public void onClose() | |
846 { | |
847 } | |
848 | |
849 public void setConnection(ProxyToServerConnection connection) | |
850 { | |
851 _toServer = connection; | |
852 } | |
853 | |
854 public void closeClient() throws IOException | |
855 { | |
856 _endPoint.close(); | |
857 } | |
858 | |
859 public void closeServer() throws IOException | |
860 { | |
861 _toServer.closeServer(); | |
862 } | |
863 | |
864 public void close() | |
865 { | |
866 try | |
867 { | |
868 closeClient(); | |
869 } | |
870 catch (IOException x) | |
871 { | |
872 LOG.debug(this + ": unexpected exception closing the client", x); | |
873 } | |
874 | |
875 try | |
876 { | |
877 closeServer(); | |
878 } | |
879 catch (IOException x) | |
880 { | |
881 LOG.debug(this + ": unexpected exception closing the server", x); | |
882 } | |
883 } | |
884 | |
885 public void shutdownOutput() throws IOException | |
886 { | |
887 _endPoint.shutdownOutput(); | |
888 } | |
889 | |
890 public void onIdleExpired(long idleForMs) | |
891 { | |
892 try | |
893 { | |
894 LOG.debug("{} idle expired", this); | |
895 if (_endPoint.isOutputShutdown()) | |
896 close(); | |
897 else | |
898 shutdownOutput(); | |
899 } | |
900 catch(Exception e) | |
901 { | |
902 LOG.debug(e); | |
903 close(); | |
904 } | |
905 } | |
906 } | |
907 | |
908 /** | |
909 * Add a whitelist entry to an existing handler configuration | |
910 * | |
911 * @param entry new whitelist entry | |
912 */ | |
913 public void addWhite(String entry) | |
914 { | |
915 add(entry, _white); | |
916 } | |
917 | |
918 /** | |
919 * Add a blacklist entry to an existing handler configuration | |
920 * | |
921 * @param entry new blacklist entry | |
922 */ | |
923 public void addBlack(String entry) | |
924 { | |
925 add(entry, _black); | |
926 } | |
927 | |
928 /** | |
929 * Re-initialize the whitelist of existing handler object | |
930 * | |
931 * @param entries array of whitelist entries | |
932 */ | |
933 public void setWhite(String[] entries) | |
934 { | |
935 set(entries, _white); | |
936 } | |
937 | |
938 /** | |
939 * Re-initialize the blacklist of existing handler object | |
940 * | |
941 * @param entries array of blacklist entries | |
942 */ | |
943 public void setBlack(String[] entries) | |
944 { | |
945 set(entries, _black); | |
946 } | |
947 | |
948 /** | |
949 * Helper method to process a list of new entries and replace | |
950 * the content of the specified host map | |
951 * | |
952 * @param entries new entries | |
953 * @param hostMap target host map | |
954 */ | |
955 protected void set(String[] entries, HostMap<String> hostMap) | |
956 { | |
957 hostMap.clear(); | |
958 | |
959 if (entries != null && entries.length > 0) | |
960 { | |
961 for (String addrPath : entries) | |
962 { | |
963 add(addrPath, hostMap); | |
964 } | |
965 } | |
966 } | |
967 | |
968 /** | |
969 * Helper method to process the new entry and add it to | |
970 * the specified host map. | |
971 * | |
972 * @param entry new entry | |
973 * @param hostMap target host map | |
974 */ | |
975 private void add(String entry, HostMap<String> hostMap) | |
976 { | |
977 if (entry != null && entry.length() > 0) | |
978 { | |
979 entry = entry.trim(); | |
980 if (hostMap.get(entry) == null) | |
981 { | |
982 hostMap.put(entry, entry); | |
983 } | |
984 } | |
985 } | |
986 | |
987 /** | |
988 * Check the request hostname against white- and blacklist. | |
989 * | |
990 * @param host hostname to check | |
991 * @return true if hostname is allowed to be proxied | |
992 */ | |
993 public boolean validateDestination(String host) | |
994 { | |
995 if (_white.size() > 0) | |
996 { | |
997 Object whiteObj = _white.getLazyMatches(host); | |
998 if (whiteObj == null) | |
999 { | |
1000 return false; | |
1001 } | |
1002 } | |
1003 | |
1004 if (_black.size() > 0) | |
1005 { | |
1006 Object blackObj = _black.getLazyMatches(host); | |
1007 if (blackObj != null) | |
1008 { | |
1009 return false; | |
1010 } | |
1011 } | |
1012 | |
1013 return true; | |
1014 } | |
1015 | |
1016 @Override | |
1017 public void dump(Appendable out, String indent) throws IOException | |
1018 { | |
1019 dumpThis(out); | |
1020 if (_privateThreadPool) | |
1021 dump(out, indent, Arrays.asList(_threadPool, _selectorManager), TypeUtil.asList(getHandlers()), getBeans()); | |
1022 else | |
1023 dump(out, indent, Arrays.asList(_selectorManager), TypeUtil.asList(getHandlers()), getBeans()); | |
1024 } | |
1025 } |