Mercurial Hosting > luan
comparison src/org/eclipse/jetty/server/nio/SelectChannelConnector.java @ 864:e21ca9878a10
simplify ThreadPool use
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 16:17:38 -0600 |
parents | 3428c60d7cfc |
children | 6b210bb66c63 |
comparison
equal
deleted
inserted
replaced
863:88d3c8ff242a | 864:e21ca9878a10 |
---|---|
64 * | 64 * |
65 * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector" | 65 * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector" |
66 */ | 66 */ |
67 public class SelectChannelConnector extends AbstractNIOConnector | 67 public class SelectChannelConnector extends AbstractNIOConnector |
68 { | 68 { |
69 protected ServerSocketChannel _acceptChannel; | 69 protected ServerSocketChannel _acceptChannel; |
70 private int _lowResourcesConnections; | 70 private int _lowResourcesConnections; |
71 private int _lowResourcesMaxIdleTime; | 71 private int _lowResourcesMaxIdleTime; |
72 private int _localPort=-1; | 72 private int _localPort=-1; |
73 | 73 |
74 private final SelectorManager _manager = new ConnectorSelectorManager(); | 74 private final SelectorManager _manager = new ConnectorSelectorManager(); |
75 | 75 |
76 /* ------------------------------------------------------------------------------- */ | 76 /* ------------------------------------------------------------------------------- */ |
77 /** | 77 /** |
78 * Constructor. | 78 * Constructor. |
79 * | 79 * |
80 */ | 80 */ |
81 public SelectChannelConnector() | 81 public SelectChannelConnector() |
82 { | 82 { |
83 _manager.setMaxIdleTime(getMaxIdleTime()); | 83 _manager.setMaxIdleTime(getMaxIdleTime()); |
84 addBean(_manager,true); | 84 addBean(_manager,true); |
85 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4)); | 85 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4)); |
86 } | 86 } |
87 | 87 |
88 @Override | 88 /* ------------------------------------------------------------ */ |
89 public void setThreadPool(ThreadPool pool) | 89 @Override |
90 { | 90 public void accept(int acceptorID) throws IOException |
91 super.setThreadPool(pool); | 91 { |
92 // preserve start order | 92 ServerSocketChannel server; |
93 removeBean(_manager); | 93 synchronized(this) |
94 addBean(_manager,true); | 94 { |
95 } | 95 server = _acceptChannel; |
96 | 96 } |
97 /* ------------------------------------------------------------ */ | 97 |
98 @Override | 98 if (server!=null && server.isOpen() && _manager.isStarted()) |
99 public void accept(int acceptorID) throws IOException | 99 { |
100 { | 100 SocketChannel channel = server.accept(); |
101 ServerSocketChannel server; | 101 channel.configureBlocking(false); |
102 synchronized(this) | 102 Socket socket = channel.socket(); |
103 { | 103 configure(socket); |
104 server = _acceptChannel; | 104 _manager.register(channel); |
105 } | 105 } |
106 | 106 } |
107 if (server!=null && server.isOpen() && _manager.isStarted()) | 107 |
108 { | 108 /* ------------------------------------------------------------ */ |
109 SocketChannel channel = server.accept(); | 109 public void close() throws IOException |
110 channel.configureBlocking(false); | 110 { |
111 Socket socket = channel.socket(); | 111 synchronized(this) |
112 configure(socket); | 112 { |
113 _manager.register(channel); | 113 if (_acceptChannel != null) |
114 } | 114 { |
115 } | 115 removeBean(_acceptChannel); |
116 | 116 if (_acceptChannel.isOpen()) |
117 /* ------------------------------------------------------------ */ | 117 _acceptChannel.close(); |
118 public void close() throws IOException | 118 } |
119 { | 119 _acceptChannel = null; |
120 synchronized(this) | 120 _localPort=-2; |
121 { | 121 } |
122 if (_acceptChannel != null) | 122 } |
123 { | 123 |
124 removeBean(_acceptChannel); | 124 /* ------------------------------------------------------------------------------- */ |
125 if (_acceptChannel.isOpen()) | 125 @Override |
126 _acceptChannel.close(); | 126 public void customize(EndPoint endpoint, Request request) throws IOException |
127 } | 127 { |
128 _acceptChannel = null; | 128 request.setTimeStamp(System.currentTimeMillis()); |
129 _localPort=-2; | 129 endpoint.setMaxIdleTime(_maxIdleTime); |
130 } | 130 super.customize(endpoint, request); |
131 } | 131 } |
132 | 132 |
133 /* ------------------------------------------------------------------------------- */ | 133 /* ------------------------------------------------------------------------------- */ |
134 @Override | 134 @Override |
135 public void customize(EndPoint endpoint, Request request) throws IOException | 135 public void persist(EndPoint endpoint) throws IOException |
136 { | 136 { |
137 request.setTimeStamp(System.currentTimeMillis()); | 137 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint); |
138 endpoint.setMaxIdleTime(_maxIdleTime); | 138 aEndp.setCheckForIdle(true); |
139 super.customize(endpoint, request); | 139 super.persist(endpoint); |
140 } | 140 } |
141 | 141 |
142 /* ------------------------------------------------------------------------------- */ | 142 /* ------------------------------------------------------------ */ |
143 @Override | 143 public SelectorManager getSelectorManager() |
144 public void persist(EndPoint endpoint) throws IOException | 144 { |
145 { | 145 return _manager; |
146 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint); | 146 } |
147 aEndp.setCheckForIdle(true); | 147 |
148 super.persist(endpoint); | 148 /* ------------------------------------------------------------ */ |
149 } | 149 public synchronized Object getConnection() |
150 | 150 { |
151 /* ------------------------------------------------------------ */ | 151 return _acceptChannel; |
152 public SelectorManager getSelectorManager() | 152 } |
153 { | 153 |
154 return _manager; | 154 /* ------------------------------------------------------------------------------- */ |
155 } | 155 public int getLocalPort() |
156 | 156 { |
157 /* ------------------------------------------------------------ */ | 157 synchronized(this) |
158 public synchronized Object getConnection() | 158 { |
159 { | 159 return _localPort; |
160 return _acceptChannel; | 160 } |
161 } | 161 } |
162 | 162 |
163 /* ------------------------------------------------------------------------------- */ | 163 /* ------------------------------------------------------------ */ |
164 public int getLocalPort() | 164 public void open() throws IOException |
165 { | 165 { |
166 synchronized(this) | 166 synchronized(this) |
167 { | 167 { |
168 return _localPort; | 168 if (_acceptChannel == null) |
169 } | 169 { |
170 } | 170 // Create a new server socket |
171 | 171 _acceptChannel = ServerSocketChannel.open(); |
172 /* ------------------------------------------------------------ */ | 172 // Set to blocking mode |
173 public void open() throws IOException | 173 _acceptChannel.configureBlocking(true); |
174 { | 174 |
175 synchronized(this) | 175 // Bind the server socket to the local host and port |
176 { | 176 _acceptChannel.socket().setReuseAddress(getReuseAddress()); |
177 if (_acceptChannel == null) | 177 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); |
178 { | 178 _acceptChannel.socket().bind(addr,getAcceptQueueSize()); |
179 // Create a new server socket | 179 |
180 _acceptChannel = ServerSocketChannel.open(); | 180 _localPort=_acceptChannel.socket().getLocalPort(); |
181 // Set to blocking mode | 181 if (_localPort<=0) |
182 _acceptChannel.configureBlocking(true); | 182 throw new IOException("Server channel not bound"); |
183 | 183 |
184 // Bind the server socket to the local host and port | 184 addBean(_acceptChannel); |
185 _acceptChannel.socket().setReuseAddress(getReuseAddress()); | 185 } |
186 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); | 186 } |
187 _acceptChannel.socket().bind(addr,getAcceptQueueSize()); | 187 } |
188 | 188 |
189 _localPort=_acceptChannel.socket().getLocalPort(); | 189 /* ------------------------------------------------------------ */ |
190 if (_localPort<=0) | 190 @Override |
191 throw new IOException("Server channel not bound"); | 191 public void setMaxIdleTime(int maxIdleTime) |
192 | 192 { |
193 addBean(_acceptChannel); | 193 _manager.setMaxIdleTime(maxIdleTime); |
194 } | 194 super.setMaxIdleTime(maxIdleTime); |
195 } | 195 } |
196 } | 196 |
197 | 197 /* ------------------------------------------------------------ */ |
198 /* ------------------------------------------------------------ */ | 198 /** |
199 @Override | 199 * @return the lowResourcesConnections |
200 public void setMaxIdleTime(int maxIdleTime) | 200 */ |
201 { | 201 public int getLowResourcesConnections() |
202 _manager.setMaxIdleTime(maxIdleTime); | 202 { |
203 super.setMaxIdleTime(maxIdleTime); | 203 return _lowResourcesConnections; |
204 } | 204 } |
205 | 205 |
206 /* ------------------------------------------------------------ */ | 206 /* ------------------------------------------------------------ */ |
207 /** | 207 /** |
208 * @return the lowResourcesConnections | 208 * Set the number of connections, which if exceeded places this manager in low resources state. |
209 */ | 209 * This is not an exact measure as the connection count is averaged over the select sets. |
210 public int getLowResourcesConnections() | 210 * @param lowResourcesConnections the number of connections |
211 { | 211 * @see #setLowResourcesMaxIdleTime(int) |
212 return _lowResourcesConnections; | 212 */ |
213 } | 213 public void setLowResourcesConnections(int lowResourcesConnections) |
214 | 214 { |
215 /* ------------------------------------------------------------ */ | 215 _lowResourcesConnections=lowResourcesConnections; |
216 /** | 216 } |
217 * Set the number of connections, which if exceeded places this manager in low resources state. | 217 |
218 * This is not an exact measure as the connection count is averaged over the select sets. | 218 /* ------------------------------------------------------------ */ |
219 * @param lowResourcesConnections the number of connections | 219 /** |
220 * @see #setLowResourcesMaxIdleTime(int) | 220 * @return the lowResourcesMaxIdleTime |
221 */ | 221 */ |
222 public void setLowResourcesConnections(int lowResourcesConnections) | 222 @Override |
223 { | 223 public int getLowResourcesMaxIdleTime() |
224 _lowResourcesConnections=lowResourcesConnections; | 224 { |
225 } | 225 return _lowResourcesMaxIdleTime; |
226 | 226 } |
227 /* ------------------------------------------------------------ */ | 227 |
228 /** | 228 /* ------------------------------------------------------------ */ |
229 * @return the lowResourcesMaxIdleTime | 229 /** |
230 */ | 230 * Set the period in ms that a connection is allowed to be idle when this there are more |
231 @Override | 231 * than {@link #getLowResourcesConnections()} connections. This allows the server to rapidly close idle connections |
232 public int getLowResourcesMaxIdleTime() | 232 * in order to gracefully handle high load situations. |
233 { | 233 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low. |
234 return _lowResourcesMaxIdleTime; | 234 * @see #setMaxIdleTime(int) |
235 } | 235 */ |
236 | 236 @Override |
237 /* ------------------------------------------------------------ */ | 237 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime) |
238 /** | 238 { |
239 * Set the period in ms that a connection is allowed to be idle when this there are more | 239 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime; |
240 * than {@link #getLowResourcesConnections()} connections. This allows the server to rapidly close idle connections | 240 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime); |
241 * in order to gracefully handle high load situations. | 241 } |
242 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low. | 242 |
243 * @see #setMaxIdleTime(int) | 243 |
244 */ | 244 /* ------------------------------------------------------------ */ |
245 @Override | 245 /* |
246 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime) | 246 * @see org.eclipse.jetty.server.server.AbstractConnector#doStart() |
247 { | 247 */ |
248 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime; | 248 @Override |
249 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime); | 249 protected void doStart() throws Exception |
250 } | 250 { |
251 | 251 _manager.setSelectSets(getAcceptors()); |
252 | 252 _manager.setMaxIdleTime(getMaxIdleTime()); |
253 /* ------------------------------------------------------------ */ | 253 _manager.setLowResourcesConnections(getLowResourcesConnections()); |
254 /* | 254 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); |
255 * @see org.eclipse.jetty.server.server.AbstractConnector#doStart() | 255 |
256 */ | 256 super.doStart(); |
257 @Override | 257 } |
258 protected void doStart() throws Exception | 258 |
259 { | 259 /* ------------------------------------------------------------ */ |
260 _manager.setSelectSets(getAcceptors()); | 260 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException |
261 _manager.setMaxIdleTime(getMaxIdleTime()); | 261 { |
262 _manager.setLowResourcesConnections(getLowResourcesConnections()); | 262 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); |
263 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); | 263 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); |
264 | 264 return endp; |
265 super.doStart(); | 265 } |
266 } | 266 |
267 | 267 /* ------------------------------------------------------------------------------- */ |
268 /* ------------------------------------------------------------ */ | 268 protected void endPointClosed(SelectChannelEndPoint endpoint) |
269 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException | 269 { |
270 { | 270 connectionClosed(endpoint.getConnection()); |
271 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); | 271 } |
272 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); | 272 |
273 return endp; | 273 /* ------------------------------------------------------------------------------- */ |
274 } | 274 protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint) |
275 | 275 { |
276 /* ------------------------------------------------------------------------------- */ | 276 return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer()); |
277 protected void endPointClosed(SelectChannelEndPoint endpoint) | 277 } |
278 { | 278 |
279 connectionClosed(endpoint.getConnection()); | 279 |
280 } | 280 /* ------------------------------------------------------------ */ |
281 | 281 /* ------------------------------------------------------------ */ |
282 /* ------------------------------------------------------------------------------- */ | 282 /* ------------------------------------------------------------ */ |
283 protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint) | 283 private final class ConnectorSelectorManager extends SelectorManager |
284 { | 284 { |
285 return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer()); | 285 @Override |
286 } | 286 public boolean dispatch(Runnable task) |
287 | 287 { |
288 | 288 ThreadPool pool=getThreadPool(); |
289 /* ------------------------------------------------------------ */ | 289 if (pool==null) |
290 /* ------------------------------------------------------------ */ | 290 pool=getServer().getThreadPool(); |
291 /* ------------------------------------------------------------ */ | 291 return pool.dispatch(task); |
292 private final class ConnectorSelectorManager extends SelectorManager | 292 } |
293 { | 293 |
294 @Override | 294 @Override |
295 public boolean dispatch(Runnable task) | 295 protected void endPointClosed(final SelectChannelEndPoint endpoint) |
296 { | 296 { |
297 ThreadPool pool=getThreadPool(); | 297 SelectChannelConnector.this.endPointClosed(endpoint); |
298 if (pool==null) | 298 } |
299 pool=getServer().getThreadPool(); | 299 |
300 return pool.dispatch(task); | 300 @Override |
301 } | 301 protected void endPointOpened(SelectChannelEndPoint endpoint) |
302 | 302 { |
303 @Override | 303 // TODO handle max connections and low resources |
304 protected void endPointClosed(final SelectChannelEndPoint endpoint) | 304 connectionOpened(endpoint.getConnection()); |
305 { | 305 } |
306 SelectChannelConnector.this.endPointClosed(endpoint); | 306 |
307 } | 307 @Override |
308 | 308 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) |
309 @Override | 309 { |
310 protected void endPointOpened(SelectChannelEndPoint endpoint) | 310 connectionUpgraded(oldConnection,endpoint.getConnection()); |
311 { | 311 } |
312 // TODO handle max connections and low resources | 312 |
313 connectionOpened(endpoint.getConnection()); | 313 @Override |
314 } | 314 public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment) |
315 | 315 { |
316 @Override | 316 return SelectChannelConnector.this.newConnection(channel,endpoint); |
317 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) | 317 } |
318 { | 318 |
319 connectionUpgraded(oldConnection,endpoint.getConnection()); | 319 @Override |
320 } | 320 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException |
321 | 321 { |
322 @Override | 322 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey); |
323 public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment) | 323 } |
324 { | 324 } |
325 return SelectChannelConnector.this.newConnection(channel,endpoint); | |
326 } | |
327 | |
328 @Override | |
329 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException | |
330 { | |
331 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey); | |
332 } | |
333 } | |
334 } | 325 } |