comparison src/org/eclipse/jetty/server/bio/SocketConnector.java @ 865:6b210bb66c63

remove ThreadPool
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 02 Oct 2016 20:38:06 -0600
parents 8e9db0bbf4f9
children
comparison
equal deleted inserted replaced
864:e21ca9878a10 865:6b210bb66c63
23 import java.net.ServerSocket; 23 import java.net.ServerSocket;
24 import java.net.Socket; 24 import java.net.Socket;
25 import java.net.SocketException; 25 import java.net.SocketException;
26 import java.util.HashSet; 26 import java.util.HashSet;
27 import java.util.Set; 27 import java.util.Set;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadPoolExecutor;
28 30
29 import org.eclipse.jetty.http.HttpException; 31 import org.eclipse.jetty.http.HttpException;
30 import org.eclipse.jetty.io.Buffer; 32 import org.eclipse.jetty.io.Buffer;
31 import org.eclipse.jetty.io.ConnectedEndPoint; 33 import org.eclipse.jetty.io.ConnectedEndPoint;
32 import org.eclipse.jetty.io.Connection; 34 import org.eclipse.jetty.io.Connection;
54 * 56 *
55 * 57 *
56 */ 58 */
57 public class SocketConnector extends AbstractConnector 59 public class SocketConnector extends AbstractConnector
58 { 60 {
59 private static final Logger LOG = LoggerFactory.getLogger(SocketConnector.class); 61 private static final Logger LOG = LoggerFactory.getLogger(SocketConnector.class);
60 62
61 protected ServerSocket _serverSocket; 63 protected ServerSocket _serverSocket;
62 protected final Set<EndPoint> _connections; 64 protected final Set<EndPoint> _connections;
63 protected volatile int _localPort=-1; 65 protected volatile int _localPort=-1;
64 66
65 /* ------------------------------------------------------------ */ 67 /* ------------------------------------------------------------ */
66 /** Constructor. 68 /** Constructor.
67 * 69 *
68 */ 70 */
69 public SocketConnector() 71 public SocketConnector()
70 { 72 {
71 _connections=new HashSet<EndPoint>(); 73 _connections=new HashSet<EndPoint>();
72 } 74 }
73 75
74 /* ------------------------------------------------------------ */ 76 /* ------------------------------------------------------------ */
75 public Object getConnection() 77 public Object getConnection()
76 { 78 {
77 return _serverSocket; 79 return _serverSocket;
78 } 80 }
79 81
80 /* ------------------------------------------------------------ */ 82 /* ------------------------------------------------------------ */
81 public void open() throws IOException 83 public void open() throws IOException
82 { 84 {
83 // Create a new server socket and set to non blocking mode 85 // Create a new server socket and set to non blocking mode
84 if (_serverSocket==null || _serverSocket.isClosed()) 86 if (_serverSocket==null || _serverSocket.isClosed())
85 _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize()); 87 _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize());
86 _serverSocket.setReuseAddress(getReuseAddress()); 88 _serverSocket.setReuseAddress(getReuseAddress());
87 _localPort=_serverSocket.getLocalPort(); 89 _localPort=_serverSocket.getLocalPort();
88 if (_localPort<=0) 90 if (_localPort<=0)
89 throw new IllegalStateException("port not allocated for "+this); 91 throw new IllegalStateException("port not allocated for "+this);
90 92
91 } 93 }
92 94
93 /* ------------------------------------------------------------ */ 95 /* ------------------------------------------------------------ */
94 protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException 96 protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException
95 { 97 {
96 ServerSocket ss= host==null? 98 ServerSocket ss= host==null?
97 new ServerSocket(port,backlog): 99 new ServerSocket(port,backlog):
98 new ServerSocket(port,backlog,InetAddress.getByName(host)); 100 new ServerSocket(port,backlog,InetAddress.getByName(host));
99 101
100 return ss; 102 return ss;
101 } 103 }
102 104
103 /* ------------------------------------------------------------ */ 105 /* ------------------------------------------------------------ */
104 public void close() throws IOException 106 public void close() throws IOException
105 { 107 {
106 if (_serverSocket!=null) 108 if (_serverSocket!=null)
107 _serverSocket.close(); 109 _serverSocket.close();
108 _serverSocket=null; 110 _serverSocket=null;
109 _localPort=-2; 111 _localPort=-2;
110 } 112 }
111 113
112 /* ------------------------------------------------------------ */ 114 /* ------------------------------------------------------------ */
113 @Override 115 @Override
114 public void accept(int acceptorID) 116 public void accept(int acceptorID)
115 throws IOException, InterruptedException 117 throws IOException, InterruptedException
116 { 118 {
117 Socket socket = _serverSocket.accept(); 119 Socket socket = _serverSocket.accept();
118 configure(socket); 120 configure(socket);
119 121
120 ConnectorEndPoint connection=new ConnectorEndPoint(socket); 122 ConnectorEndPoint connection=new ConnectorEndPoint(socket);
121 connection.dispatch(); 123 connection.dispatch();
122 } 124 }
123 125
124 /* ------------------------------------------------------------------------------- */ 126 /* ------------------------------------------------------------------------------- */
125 /** 127 /**
126 * Allows subclass to override Conection if required. 128 * Allows subclass to override Conection if required.
127 */ 129 */
128 protected Connection newConnection(EndPoint endpoint) 130 protected Connection newConnection(EndPoint endpoint)
129 { 131 {
130 return new BlockingHttpConnection(this, endpoint, getServer()); 132 return new BlockingHttpConnection(this, endpoint, getServer());
131 } 133 }
132 134
133 /* ------------------------------------------------------------------------------- */ 135 /* ------------------------------------------------------------------------------- */
134 @Override 136 @Override
135 public void customize(EndPoint endpoint, Request request) 137 public void customize(EndPoint endpoint, Request request)
136 throws IOException 138 throws IOException
137 { 139 {
138 ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; 140 ConnectorEndPoint connection = (ConnectorEndPoint)endpoint;
139 int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; 141 int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime;
140 connection.setMaxIdleTime(lrmit); 142 connection.setMaxIdleTime(lrmit);
141 143
142 super.customize(endpoint, request); 144 super.customize(endpoint, request);
143 } 145 }
144 146
145 /* ------------------------------------------------------------------------------- */ 147 /* ------------------------------------------------------------------------------- */
146 public int getLocalPort() 148 public int getLocalPort()
147 { 149 {
148 return _localPort; 150 return _localPort;
149 } 151 }
150 152
151 /* ------------------------------------------------------------------------------- */ 153 /* ------------------------------------------------------------------------------- */
152 @Override 154 @Override
153 protected void doStart() throws Exception 155 protected void doStart() throws Exception
154 { 156 {
155 _connections.clear(); 157 _connections.clear();
156 super.doStart(); 158 super.doStart();
157 } 159 }
158 160
159 /* ------------------------------------------------------------------------------- */ 161 /* ------------------------------------------------------------------------------- */
160 @Override 162 @Override
161 protected void doStop() throws Exception 163 protected void doStop() throws Exception
162 { 164 {
163 super.doStop(); 165 super.doStop();
164 Set<EndPoint> set = new HashSet<EndPoint>(); 166 Set<EndPoint> set = new HashSet<EndPoint>();
165 synchronized(_connections) 167 synchronized(_connections)
166 { 168 {
167 set.addAll(_connections); 169 set.addAll(_connections);
168 } 170 }
169 for (EndPoint endPoint : set) 171 for (EndPoint endPoint : set)
170 { 172 {
171 ConnectorEndPoint connection = (ConnectorEndPoint)endPoint; 173 ConnectorEndPoint connection = (ConnectorEndPoint)endPoint;
172 connection.close(); 174 connection.close();
173 } 175 }
174 } 176 }
175 177
176 @Override 178 @Override
177 public void dump(Appendable out, String indent) throws IOException 179 public void dump(Appendable out, String indent) throws IOException
178 { 180 {
179 super.dump(out, indent); 181 super.dump(out, indent);
180 Set<EndPoint> connections = new HashSet<EndPoint>(); 182 Set<EndPoint> connections = new HashSet<EndPoint>();
181 synchronized (_connections) 183 synchronized (_connections)
182 { 184 {
183 connections.addAll(_connections); 185 connections.addAll(_connections);
184 } 186 }
185 AggregateLifeCycle.dump(out, indent, connections); 187 AggregateLifeCycle.dump(out, indent, connections);
186 } 188 }
187 189
188 /* ------------------------------------------------------------------------------- */ 190 /* ------------------------------------------------------------------------------- */
189 /* ------------------------------------------------------------------------------- */ 191 /* ------------------------------------------------------------------------------- */
190 /* ------------------------------------------------------------------------------- */ 192 /* ------------------------------------------------------------------------------- */
191 protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint 193 protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint
192 { 194 {
193 volatile Connection _connection; 195 volatile Connection _connection;
194 protected final Socket _socket; 196 protected final Socket _socket;
195 197
196 public ConnectorEndPoint(Socket socket) throws IOException 198 public ConnectorEndPoint(Socket socket) throws IOException
197 { 199 {
198 super(socket,_maxIdleTime); 200 super(socket,_maxIdleTime);
199 _connection = newConnection(this); 201 _connection = newConnection(this);
200 _socket=socket; 202 _socket=socket;
201 } 203 }
202 204
203 public Connection getConnection() 205 public Connection getConnection()
204 { 206 {
205 return _connection; 207 return _connection;
206 } 208 }
207 209
208 public void setConnection(Connection connection) 210 public void setConnection(Connection connection)
209 { 211 {
210 if (_connection!=connection && _connection!=null) 212 if (_connection!=connection && _connection!=null)
211 connectionUpgraded(_connection,connection); 213 connectionUpgraded(_connection,connection);
212 _connection=connection; 214 _connection=connection;
213 } 215 }
214 216
215 public void dispatch() throws IOException 217 public void dispatch() throws IOException
216 { 218 {
217 if (getThreadPool()==null || !getThreadPool().dispatch(this)) 219 ThreadPoolExecutor tpe = getThreadPool();
218 { 220 if( tpe != null ) {
219 LOG.warn("dispatch failed for {}",_connection); 221 try {
220 close(); 222 tpe.execute(this);
221 } 223 return;
222 } 224 } catch(RejectedExecutionException e) {}
223 225 }
224 @Override 226 LOG.warn("dispatch failed for {}",_connection);
225 public int fill(Buffer buffer) throws IOException 227 close();
226 { 228 }
227 int l = super.fill(buffer); 229
228 if (l<0) 230 @Override
229 { 231 public int fill(Buffer buffer) throws IOException
230 if (!isInputShutdown()) 232 {
231 shutdownInput(); 233 int l = super.fill(buffer);
232 if (isOutputShutdown()) 234 if (l<0)
233 close(); 235 {
234 } 236 if (!isInputShutdown())
235 return l; 237 shutdownInput();
236 } 238 if (isOutputShutdown())
237 239 close();
238 @Override 240 }
239 public void close() throws IOException 241 return l;
240 { 242 }
241 if (_connection instanceof AbstractHttpConnection) 243
242 ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel(); 244 @Override
243 super.close(); 245 public void close() throws IOException
244 } 246 {
245 247 if (_connection instanceof AbstractHttpConnection)
246 public void run() 248 ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel();
247 { 249 super.close();
248 try 250 }
249 { 251
250 connectionOpened(_connection); 252 public void run()
251 synchronized(_connections) 253 {
252 { 254 try
253 _connections.add(this); 255 {
254 } 256 connectionOpened(_connection);
255 257 synchronized(_connections)
256 while (isStarted() && !isClosed()) 258 {
257 { 259 _connections.add(this);
258 if (_connection.isIdle()) 260 }
259 { 261
260 if (isLowResources()) 262 while (isStarted() && !isClosed())
261 setMaxIdleTime(getLowResourcesMaxIdleTime()); 263 {
262 } 264 if (_connection.isIdle())
263 265 {
264 _connection=_connection.handle(); 266 if (isLowResources())
265 } 267 setMaxIdleTime(getLowResourcesMaxIdleTime());
266 } 268 }
267 catch (EofException e) 269
268 { 270 _connection=_connection.handle();
269 LOG.debug("EOF", e); 271 }
270 try{close();} 272 }
271 catch(IOException e2){LOG.trace("",e2);} 273 catch (EofException e)
272 } 274 {
273 catch (SocketException e) 275 LOG.debug("EOF", e);
274 { 276 try{close();}
275 LOG.debug("EOF", e); 277 catch(IOException e2){LOG.trace("",e2);}
276 try{close();} 278 }
277 catch(IOException e2){LOG.trace("",e2);} 279 catch (SocketException e)
278 } 280 {
279 catch (HttpException e) 281 LOG.debug("EOF", e);
280 { 282 try{close();}
281 LOG.debug("BAD", e); 283 catch(IOException e2){LOG.trace("",e2);}
282 try{close();} 284 }
283 catch(IOException e2){LOG.trace("",e2);} 285 catch (HttpException e)
284 } 286 {
285 catch(Exception e) 287 LOG.debug("BAD", e);
286 { 288 try{close();}
287 LOG.warn("handle failed?",e); 289 catch(IOException e2){LOG.trace("",e2);}
288 try{close();} 290 }
289 catch(IOException e2){LOG.trace("",e2);} 291 catch(Exception e)
290 } 292 {
291 finally 293 LOG.warn("handle failed?",e);
292 { 294 try{close();}
293 connectionClosed(_connection); 295 catch(IOException e2){LOG.trace("",e2);}
294 synchronized(_connections) 296 }
295 { 297 finally
296 _connections.remove(this); 298 {
297 } 299 connectionClosed(_connection);
298 300 synchronized(_connections)
299 // wait for client to close, but if not, close ourselves. 301 {
300 try 302 _connections.remove(this);
301 { 303 }
302 if (!_socket.isClosed()) 304
303 { 305 // wait for client to close, but if not, close ourselves.
304 long timestamp=System.currentTimeMillis(); 306 try
305 int max_idle=getMaxIdleTime(); 307 {
306 308 if (!_socket.isClosed())
307 _socket.setSoTimeout(getMaxIdleTime()); 309 {
308 int c=0; 310 long timestamp=System.currentTimeMillis();
309 do 311 int max_idle=getMaxIdleTime();
310 { 312
311 c = _socket.getInputStream().read(); 313 _socket.setSoTimeout(getMaxIdleTime());
312 } 314 int c=0;
313 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); 315 do
314 if (!_socket.isClosed()) 316 {
315 _socket.close(); 317 c = _socket.getInputStream().read();
316 } 318 }
317 } 319 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle);
318 catch(IOException e) 320 if (!_socket.isClosed())
319 { 321 _socket.close();
320 LOG.trace("",e); 322 }
321 } 323 }
322 } 324 catch(IOException e)
323 } 325 {
324 } 326 LOG.trace("",e);
327 }
328 }
329 }
330 }
325 } 331 }