Mercurial Hosting > luan
comparison src/org/eclipse/jetty/server/bio/SocketConnector.java @ 865:6b210bb66c63
remove ThreadPool
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 20:38:06 -0600 |
parents | 8e9db0bbf4f9 |
children |
comparison
equal
deleted
inserted
replaced
864:e21ca9878a10 | 865:6b210bb66c63 |
---|---|
23 import java.net.ServerSocket; | 23 import java.net.ServerSocket; |
24 import java.net.Socket; | 24 import java.net.Socket; |
25 import java.net.SocketException; | 25 import java.net.SocketException; |
26 import java.util.HashSet; | 26 import java.util.HashSet; |
27 import java.util.Set; | 27 import java.util.Set; |
28 import java.util.concurrent.RejectedExecutionException; | |
29 import java.util.concurrent.ThreadPoolExecutor; | |
28 | 30 |
29 import org.eclipse.jetty.http.HttpException; | 31 import org.eclipse.jetty.http.HttpException; |
30 import org.eclipse.jetty.io.Buffer; | 32 import org.eclipse.jetty.io.Buffer; |
31 import org.eclipse.jetty.io.ConnectedEndPoint; | 33 import org.eclipse.jetty.io.ConnectedEndPoint; |
32 import org.eclipse.jetty.io.Connection; | 34 import org.eclipse.jetty.io.Connection; |
54 * | 56 * |
55 * | 57 * |
56 */ | 58 */ |
57 public class SocketConnector extends AbstractConnector | 59 public class SocketConnector extends AbstractConnector |
58 { | 60 { |
59 private static final Logger LOG = LoggerFactory.getLogger(SocketConnector.class); | 61 private static final Logger LOG = LoggerFactory.getLogger(SocketConnector.class); |
60 | 62 |
61 protected ServerSocket _serverSocket; | 63 protected ServerSocket _serverSocket; |
62 protected final Set<EndPoint> _connections; | 64 protected final Set<EndPoint> _connections; |
63 protected volatile int _localPort=-1; | 65 protected volatile int _localPort=-1; |
64 | 66 |
65 /* ------------------------------------------------------------ */ | 67 /* ------------------------------------------------------------ */ |
66 /** Constructor. | 68 /** Constructor. |
67 * | 69 * |
68 */ | 70 */ |
69 public SocketConnector() | 71 public SocketConnector() |
70 { | 72 { |
71 _connections=new HashSet<EndPoint>(); | 73 _connections=new HashSet<EndPoint>(); |
72 } | 74 } |
73 | 75 |
74 /* ------------------------------------------------------------ */ | 76 /* ------------------------------------------------------------ */ |
75 public Object getConnection() | 77 public Object getConnection() |
76 { | 78 { |
77 return _serverSocket; | 79 return _serverSocket; |
78 } | 80 } |
79 | 81 |
80 /* ------------------------------------------------------------ */ | 82 /* ------------------------------------------------------------ */ |
81 public void open() throws IOException | 83 public void open() throws IOException |
82 { | 84 { |
83 // Create a new server socket and set to non blocking mode | 85 // Create a new server socket and set to non blocking mode |
84 if (_serverSocket==null || _serverSocket.isClosed()) | 86 if (_serverSocket==null || _serverSocket.isClosed()) |
85 _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize()); | 87 _serverSocket= newServerSocket(getHost(),getPort(),getAcceptQueueSize()); |
86 _serverSocket.setReuseAddress(getReuseAddress()); | 88 _serverSocket.setReuseAddress(getReuseAddress()); |
87 _localPort=_serverSocket.getLocalPort(); | 89 _localPort=_serverSocket.getLocalPort(); |
88 if (_localPort<=0) | 90 if (_localPort<=0) |
89 throw new IllegalStateException("port not allocated for "+this); | 91 throw new IllegalStateException("port not allocated for "+this); |
90 | 92 |
91 } | 93 } |
92 | 94 |
93 /* ------------------------------------------------------------ */ | 95 /* ------------------------------------------------------------ */ |
94 protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException | 96 protected ServerSocket newServerSocket(String host, int port,int backlog) throws IOException |
95 { | 97 { |
96 ServerSocket ss= host==null? | 98 ServerSocket ss= host==null? |
97 new ServerSocket(port,backlog): | 99 new ServerSocket(port,backlog): |
98 new ServerSocket(port,backlog,InetAddress.getByName(host)); | 100 new ServerSocket(port,backlog,InetAddress.getByName(host)); |
99 | 101 |
100 return ss; | 102 return ss; |
101 } | 103 } |
102 | 104 |
103 /* ------------------------------------------------------------ */ | 105 /* ------------------------------------------------------------ */ |
104 public void close() throws IOException | 106 public void close() throws IOException |
105 { | 107 { |
106 if (_serverSocket!=null) | 108 if (_serverSocket!=null) |
107 _serverSocket.close(); | 109 _serverSocket.close(); |
108 _serverSocket=null; | 110 _serverSocket=null; |
109 _localPort=-2; | 111 _localPort=-2; |
110 } | 112 } |
111 | 113 |
112 /* ------------------------------------------------------------ */ | 114 /* ------------------------------------------------------------ */ |
113 @Override | 115 @Override |
114 public void accept(int acceptorID) | 116 public void accept(int acceptorID) |
115 throws IOException, InterruptedException | 117 throws IOException, InterruptedException |
116 { | 118 { |
117 Socket socket = _serverSocket.accept(); | 119 Socket socket = _serverSocket.accept(); |
118 configure(socket); | 120 configure(socket); |
119 | 121 |
120 ConnectorEndPoint connection=new ConnectorEndPoint(socket); | 122 ConnectorEndPoint connection=new ConnectorEndPoint(socket); |
121 connection.dispatch(); | 123 connection.dispatch(); |
122 } | 124 } |
123 | 125 |
124 /* ------------------------------------------------------------------------------- */ | 126 /* ------------------------------------------------------------------------------- */ |
125 /** | 127 /** |
126 * Allows subclass to override Conection if required. | 128 * Allows subclass to override Conection if required. |
127 */ | 129 */ |
128 protected Connection newConnection(EndPoint endpoint) | 130 protected Connection newConnection(EndPoint endpoint) |
129 { | 131 { |
130 return new BlockingHttpConnection(this, endpoint, getServer()); | 132 return new BlockingHttpConnection(this, endpoint, getServer()); |
131 } | 133 } |
132 | 134 |
133 /* ------------------------------------------------------------------------------- */ | 135 /* ------------------------------------------------------------------------------- */ |
134 @Override | 136 @Override |
135 public void customize(EndPoint endpoint, Request request) | 137 public void customize(EndPoint endpoint, Request request) |
136 throws IOException | 138 throws IOException |
137 { | 139 { |
138 ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; | 140 ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; |
139 int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; | 141 int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; |
140 connection.setMaxIdleTime(lrmit); | 142 connection.setMaxIdleTime(lrmit); |
141 | 143 |
142 super.customize(endpoint, request); | 144 super.customize(endpoint, request); |
143 } | 145 } |
144 | 146 |
145 /* ------------------------------------------------------------------------------- */ | 147 /* ------------------------------------------------------------------------------- */ |
146 public int getLocalPort() | 148 public int getLocalPort() |
147 { | 149 { |
148 return _localPort; | 150 return _localPort; |
149 } | 151 } |
150 | 152 |
151 /* ------------------------------------------------------------------------------- */ | 153 /* ------------------------------------------------------------------------------- */ |
152 @Override | 154 @Override |
153 protected void doStart() throws Exception | 155 protected void doStart() throws Exception |
154 { | 156 { |
155 _connections.clear(); | 157 _connections.clear(); |
156 super.doStart(); | 158 super.doStart(); |
157 } | 159 } |
158 | 160 |
159 /* ------------------------------------------------------------------------------- */ | 161 /* ------------------------------------------------------------------------------- */ |
160 @Override | 162 @Override |
161 protected void doStop() throws Exception | 163 protected void doStop() throws Exception |
162 { | 164 { |
163 super.doStop(); | 165 super.doStop(); |
164 Set<EndPoint> set = new HashSet<EndPoint>(); | 166 Set<EndPoint> set = new HashSet<EndPoint>(); |
165 synchronized(_connections) | 167 synchronized(_connections) |
166 { | 168 { |
167 set.addAll(_connections); | 169 set.addAll(_connections); |
168 } | 170 } |
169 for (EndPoint endPoint : set) | 171 for (EndPoint endPoint : set) |
170 { | 172 { |
171 ConnectorEndPoint connection = (ConnectorEndPoint)endPoint; | 173 ConnectorEndPoint connection = (ConnectorEndPoint)endPoint; |
172 connection.close(); | 174 connection.close(); |
173 } | 175 } |
174 } | 176 } |
175 | 177 |
176 @Override | 178 @Override |
177 public void dump(Appendable out, String indent) throws IOException | 179 public void dump(Appendable out, String indent) throws IOException |
178 { | 180 { |
179 super.dump(out, indent); | 181 super.dump(out, indent); |
180 Set<EndPoint> connections = new HashSet<EndPoint>(); | 182 Set<EndPoint> connections = new HashSet<EndPoint>(); |
181 synchronized (_connections) | 183 synchronized (_connections) |
182 { | 184 { |
183 connections.addAll(_connections); | 185 connections.addAll(_connections); |
184 } | 186 } |
185 AggregateLifeCycle.dump(out, indent, connections); | 187 AggregateLifeCycle.dump(out, indent, connections); |
186 } | 188 } |
187 | 189 |
188 /* ------------------------------------------------------------------------------- */ | 190 /* ------------------------------------------------------------------------------- */ |
189 /* ------------------------------------------------------------------------------- */ | 191 /* ------------------------------------------------------------------------------- */ |
190 /* ------------------------------------------------------------------------------- */ | 192 /* ------------------------------------------------------------------------------- */ |
191 protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint | 193 protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint |
192 { | 194 { |
193 volatile Connection _connection; | 195 volatile Connection _connection; |
194 protected final Socket _socket; | 196 protected final Socket _socket; |
195 | 197 |
196 public ConnectorEndPoint(Socket socket) throws IOException | 198 public ConnectorEndPoint(Socket socket) throws IOException |
197 { | 199 { |
198 super(socket,_maxIdleTime); | 200 super(socket,_maxIdleTime); |
199 _connection = newConnection(this); | 201 _connection = newConnection(this); |
200 _socket=socket; | 202 _socket=socket; |
201 } | 203 } |
202 | 204 |
203 public Connection getConnection() | 205 public Connection getConnection() |
204 { | 206 { |
205 return _connection; | 207 return _connection; |
206 } | 208 } |
207 | 209 |
208 public void setConnection(Connection connection) | 210 public void setConnection(Connection connection) |
209 { | 211 { |
210 if (_connection!=connection && _connection!=null) | 212 if (_connection!=connection && _connection!=null) |
211 connectionUpgraded(_connection,connection); | 213 connectionUpgraded(_connection,connection); |
212 _connection=connection; | 214 _connection=connection; |
213 } | 215 } |
214 | 216 |
215 public void dispatch() throws IOException | 217 public void dispatch() throws IOException |
216 { | 218 { |
217 if (getThreadPool()==null || !getThreadPool().dispatch(this)) | 219 ThreadPoolExecutor tpe = getThreadPool(); |
218 { | 220 if( tpe != null ) { |
219 LOG.warn("dispatch failed for {}",_connection); | 221 try { |
220 close(); | 222 tpe.execute(this); |
221 } | 223 return; |
222 } | 224 } catch(RejectedExecutionException e) {} |
223 | 225 } |
224 @Override | 226 LOG.warn("dispatch failed for {}",_connection); |
225 public int fill(Buffer buffer) throws IOException | 227 close(); |
226 { | 228 } |
227 int l = super.fill(buffer); | 229 |
228 if (l<0) | 230 @Override |
229 { | 231 public int fill(Buffer buffer) throws IOException |
230 if (!isInputShutdown()) | 232 { |
231 shutdownInput(); | 233 int l = super.fill(buffer); |
232 if (isOutputShutdown()) | 234 if (l<0) |
233 close(); | 235 { |
234 } | 236 if (!isInputShutdown()) |
235 return l; | 237 shutdownInput(); |
236 } | 238 if (isOutputShutdown()) |
237 | 239 close(); |
238 @Override | 240 } |
239 public void close() throws IOException | 241 return l; |
240 { | 242 } |
241 if (_connection instanceof AbstractHttpConnection) | 243 |
242 ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel(); | 244 @Override |
243 super.close(); | 245 public void close() throws IOException |
244 } | 246 { |
245 | 247 if (_connection instanceof AbstractHttpConnection) |
246 public void run() | 248 ((AbstractHttpConnection)_connection).getRequest().getAsyncContinuation().cancel(); |
247 { | 249 super.close(); |
248 try | 250 } |
249 { | 251 |
250 connectionOpened(_connection); | 252 public void run() |
251 synchronized(_connections) | 253 { |
252 { | 254 try |
253 _connections.add(this); | 255 { |
254 } | 256 connectionOpened(_connection); |
255 | 257 synchronized(_connections) |
256 while (isStarted() && !isClosed()) | 258 { |
257 { | 259 _connections.add(this); |
258 if (_connection.isIdle()) | 260 } |
259 { | 261 |
260 if (isLowResources()) | 262 while (isStarted() && !isClosed()) |
261 setMaxIdleTime(getLowResourcesMaxIdleTime()); | 263 { |
262 } | 264 if (_connection.isIdle()) |
263 | 265 { |
264 _connection=_connection.handle(); | 266 if (isLowResources()) |
265 } | 267 setMaxIdleTime(getLowResourcesMaxIdleTime()); |
266 } | 268 } |
267 catch (EofException e) | 269 |
268 { | 270 _connection=_connection.handle(); |
269 LOG.debug("EOF", e); | 271 } |
270 try{close();} | 272 } |
271 catch(IOException e2){LOG.trace("",e2);} | 273 catch (EofException e) |
272 } | 274 { |
273 catch (SocketException e) | 275 LOG.debug("EOF", e); |
274 { | 276 try{close();} |
275 LOG.debug("EOF", e); | 277 catch(IOException e2){LOG.trace("",e2);} |
276 try{close();} | 278 } |
277 catch(IOException e2){LOG.trace("",e2);} | 279 catch (SocketException e) |
278 } | 280 { |
279 catch (HttpException e) | 281 LOG.debug("EOF", e); |
280 { | 282 try{close();} |
281 LOG.debug("BAD", e); | 283 catch(IOException e2){LOG.trace("",e2);} |
282 try{close();} | 284 } |
283 catch(IOException e2){LOG.trace("",e2);} | 285 catch (HttpException e) |
284 } | 286 { |
285 catch(Exception e) | 287 LOG.debug("BAD", e); |
286 { | 288 try{close();} |
287 LOG.warn("handle failed?",e); | 289 catch(IOException e2){LOG.trace("",e2);} |
288 try{close();} | 290 } |
289 catch(IOException e2){LOG.trace("",e2);} | 291 catch(Exception e) |
290 } | 292 { |
291 finally | 293 LOG.warn("handle failed?",e); |
292 { | 294 try{close();} |
293 connectionClosed(_connection); | 295 catch(IOException e2){LOG.trace("",e2);} |
294 synchronized(_connections) | 296 } |
295 { | 297 finally |
296 _connections.remove(this); | 298 { |
297 } | 299 connectionClosed(_connection); |
298 | 300 synchronized(_connections) |
299 // wait for client to close, but if not, close ourselves. | 301 { |
300 try | 302 _connections.remove(this); |
301 { | 303 } |
302 if (!_socket.isClosed()) | 304 |
303 { | 305 // wait for client to close, but if not, close ourselves. |
304 long timestamp=System.currentTimeMillis(); | 306 try |
305 int max_idle=getMaxIdleTime(); | 307 { |
306 | 308 if (!_socket.isClosed()) |
307 _socket.setSoTimeout(getMaxIdleTime()); | 309 { |
308 int c=0; | 310 long timestamp=System.currentTimeMillis(); |
309 do | 311 int max_idle=getMaxIdleTime(); |
310 { | 312 |
311 c = _socket.getInputStream().read(); | 313 _socket.setSoTimeout(getMaxIdleTime()); |
312 } | 314 int c=0; |
313 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); | 315 do |
314 if (!_socket.isClosed()) | 316 { |
315 _socket.close(); | 317 c = _socket.getInputStream().read(); |
316 } | 318 } |
317 } | 319 while (c>=0 && (System.currentTimeMillis()-timestamp)<max_idle); |
318 catch(IOException e) | 320 if (!_socket.isClosed()) |
319 { | 321 _socket.close(); |
320 LOG.trace("",e); | 322 } |
321 } | 323 } |
322 } | 324 catch(IOException e) |
323 } | 325 { |
324 } | 326 LOG.trace("",e); |
327 } | |
328 } | |
329 } | |
330 } | |
325 } | 331 } |