comparison src/org/eclipse/jetty/server/nio/SelectChannelConnector.java @ 864:e21ca9878a10

simplify ThreadPool use
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 02 Oct 2016 16:17:38 -0600
parents 3428c60d7cfc
children 6b210bb66c63
comparison
equal deleted inserted replaced
863:88d3c8ff242a 864:e21ca9878a10
64 * 64 *
65 * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector" 65 * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
66 */ 66 */
67 public class SelectChannelConnector extends AbstractNIOConnector 67 public class SelectChannelConnector extends AbstractNIOConnector
68 { 68 {
69 protected ServerSocketChannel _acceptChannel; 69 protected ServerSocketChannel _acceptChannel;
70 private int _lowResourcesConnections; 70 private int _lowResourcesConnections;
71 private int _lowResourcesMaxIdleTime; 71 private int _lowResourcesMaxIdleTime;
72 private int _localPort=-1; 72 private int _localPort=-1;
73 73
74 private final SelectorManager _manager = new ConnectorSelectorManager(); 74 private final SelectorManager _manager = new ConnectorSelectorManager();
75 75
76 /* ------------------------------------------------------------------------------- */ 76 /* ------------------------------------------------------------------------------- */
77 /** 77 /**
78 * Constructor. 78 * Constructor.
79 * 79 *
80 */ 80 */
81 public SelectChannelConnector() 81 public SelectChannelConnector()
82 { 82 {
83 _manager.setMaxIdleTime(getMaxIdleTime()); 83 _manager.setMaxIdleTime(getMaxIdleTime());
84 addBean(_manager,true); 84 addBean(_manager,true);
85 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4)); 85 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
86 } 86 }
87 87
88 @Override 88 /* ------------------------------------------------------------ */
89 public void setThreadPool(ThreadPool pool) 89 @Override
90 { 90 public void accept(int acceptorID) throws IOException
91 super.setThreadPool(pool); 91 {
92 // preserve start order 92 ServerSocketChannel server;
93 removeBean(_manager); 93 synchronized(this)
94 addBean(_manager,true); 94 {
95 } 95 server = _acceptChannel;
96 96 }
97 /* ------------------------------------------------------------ */ 97
98 @Override 98 if (server!=null && server.isOpen() && _manager.isStarted())
99 public void accept(int acceptorID) throws IOException 99 {
100 { 100 SocketChannel channel = server.accept();
101 ServerSocketChannel server; 101 channel.configureBlocking(false);
102 synchronized(this) 102 Socket socket = channel.socket();
103 { 103 configure(socket);
104 server = _acceptChannel; 104 _manager.register(channel);
105 } 105 }
106 106 }
107 if (server!=null && server.isOpen() && _manager.isStarted()) 107
108 { 108 /* ------------------------------------------------------------ */
109 SocketChannel channel = server.accept(); 109 public void close() throws IOException
110 channel.configureBlocking(false); 110 {
111 Socket socket = channel.socket(); 111 synchronized(this)
112 configure(socket); 112 {
113 _manager.register(channel); 113 if (_acceptChannel != null)
114 } 114 {
115 } 115 removeBean(_acceptChannel);
116 116 if (_acceptChannel.isOpen())
117 /* ------------------------------------------------------------ */ 117 _acceptChannel.close();
118 public void close() throws IOException 118 }
119 { 119 _acceptChannel = null;
120 synchronized(this) 120 _localPort=-2;
121 { 121 }
122 if (_acceptChannel != null) 122 }
123 { 123
124 removeBean(_acceptChannel); 124 /* ------------------------------------------------------------------------------- */
125 if (_acceptChannel.isOpen()) 125 @Override
126 _acceptChannel.close(); 126 public void customize(EndPoint endpoint, Request request) throws IOException
127 } 127 {
128 _acceptChannel = null; 128 request.setTimeStamp(System.currentTimeMillis());
129 _localPort=-2; 129 endpoint.setMaxIdleTime(_maxIdleTime);
130 } 130 super.customize(endpoint, request);
131 } 131 }
132 132
133 /* ------------------------------------------------------------------------------- */ 133 /* ------------------------------------------------------------------------------- */
134 @Override 134 @Override
135 public void customize(EndPoint endpoint, Request request) throws IOException 135 public void persist(EndPoint endpoint) throws IOException
136 { 136 {
137 request.setTimeStamp(System.currentTimeMillis()); 137 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
138 endpoint.setMaxIdleTime(_maxIdleTime); 138 aEndp.setCheckForIdle(true);
139 super.customize(endpoint, request); 139 super.persist(endpoint);
140 } 140 }
141 141
142 /* ------------------------------------------------------------------------------- */ 142 /* ------------------------------------------------------------ */
143 @Override 143 public SelectorManager getSelectorManager()
144 public void persist(EndPoint endpoint) throws IOException 144 {
145 { 145 return _manager;
146 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint); 146 }
147 aEndp.setCheckForIdle(true); 147
148 super.persist(endpoint); 148 /* ------------------------------------------------------------ */
149 } 149 public synchronized Object getConnection()
150 150 {
151 /* ------------------------------------------------------------ */ 151 return _acceptChannel;
152 public SelectorManager getSelectorManager() 152 }
153 { 153
154 return _manager; 154 /* ------------------------------------------------------------------------------- */
155 } 155 public int getLocalPort()
156 156 {
157 /* ------------------------------------------------------------ */ 157 synchronized(this)
158 public synchronized Object getConnection() 158 {
159 { 159 return _localPort;
160 return _acceptChannel; 160 }
161 } 161 }
162 162
163 /* ------------------------------------------------------------------------------- */ 163 /* ------------------------------------------------------------ */
164 public int getLocalPort() 164 public void open() throws IOException
165 { 165 {
166 synchronized(this) 166 synchronized(this)
167 { 167 {
168 return _localPort; 168 if (_acceptChannel == null)
169 } 169 {
170 } 170 // Create a new server socket
171 171 _acceptChannel = ServerSocketChannel.open();
172 /* ------------------------------------------------------------ */ 172 // Set to blocking mode
173 public void open() throws IOException 173 _acceptChannel.configureBlocking(true);
174 { 174
175 synchronized(this) 175 // Bind the server socket to the local host and port
176 { 176 _acceptChannel.socket().setReuseAddress(getReuseAddress());
177 if (_acceptChannel == null) 177 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
178 { 178 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
179 // Create a new server socket 179
180 _acceptChannel = ServerSocketChannel.open(); 180 _localPort=_acceptChannel.socket().getLocalPort();
181 // Set to blocking mode 181 if (_localPort<=0)
182 _acceptChannel.configureBlocking(true); 182 throw new IOException("Server channel not bound");
183 183
184 // Bind the server socket to the local host and port 184 addBean(_acceptChannel);
185 _acceptChannel.socket().setReuseAddress(getReuseAddress()); 185 }
186 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); 186 }
187 _acceptChannel.socket().bind(addr,getAcceptQueueSize()); 187 }
188 188
189 _localPort=_acceptChannel.socket().getLocalPort(); 189 /* ------------------------------------------------------------ */
190 if (_localPort<=0) 190 @Override
191 throw new IOException("Server channel not bound"); 191 public void setMaxIdleTime(int maxIdleTime)
192 192 {
193 addBean(_acceptChannel); 193 _manager.setMaxIdleTime(maxIdleTime);
194 } 194 super.setMaxIdleTime(maxIdleTime);
195 } 195 }
196 } 196
197 197 /* ------------------------------------------------------------ */
198 /* ------------------------------------------------------------ */ 198 /**
199 @Override 199 * @return the lowResourcesConnections
200 public void setMaxIdleTime(int maxIdleTime) 200 */
201 { 201 public int getLowResourcesConnections()
202 _manager.setMaxIdleTime(maxIdleTime); 202 {
203 super.setMaxIdleTime(maxIdleTime); 203 return _lowResourcesConnections;
204 } 204 }
205 205
206 /* ------------------------------------------------------------ */ 206 /* ------------------------------------------------------------ */
207 /** 207 /**
208 * @return the lowResourcesConnections 208 * Set the number of connections, which if exceeded places this manager in low resources state.
209 */ 209 * This is not an exact measure as the connection count is averaged over the select sets.
210 public int getLowResourcesConnections() 210 * @param lowResourcesConnections the number of connections
211 { 211 * @see #setLowResourcesMaxIdleTime(int)
212 return _lowResourcesConnections; 212 */
213 } 213 public void setLowResourcesConnections(int lowResourcesConnections)
214 214 {
215 /* ------------------------------------------------------------ */ 215 _lowResourcesConnections=lowResourcesConnections;
216 /** 216 }
217 * Set the number of connections, which if exceeded places this manager in low resources state. 217
218 * This is not an exact measure as the connection count is averaged over the select sets. 218 /* ------------------------------------------------------------ */
219 * @param lowResourcesConnections the number of connections 219 /**
220 * @see #setLowResourcesMaxIdleTime(int) 220 * @return the lowResourcesMaxIdleTime
221 */ 221 */
222 public void setLowResourcesConnections(int lowResourcesConnections) 222 @Override
223 { 223 public int getLowResourcesMaxIdleTime()
224 _lowResourcesConnections=lowResourcesConnections; 224 {
225 } 225 return _lowResourcesMaxIdleTime;
226 226 }
227 /* ------------------------------------------------------------ */ 227
228 /** 228 /* ------------------------------------------------------------ */
229 * @return the lowResourcesMaxIdleTime 229 /**
230 */ 230 * Set the period in ms that a connection is allowed to be idle when this there are more
231 @Override 231 * than {@link #getLowResourcesConnections()} connections. This allows the server to rapidly close idle connections
232 public int getLowResourcesMaxIdleTime() 232 * in order to gracefully handle high load situations.
233 { 233 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low.
234 return _lowResourcesMaxIdleTime; 234 * @see #setMaxIdleTime(int)
235 } 235 */
236 236 @Override
237 /* ------------------------------------------------------------ */ 237 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
238 /** 238 {
239 * Set the period in ms that a connection is allowed to be idle when this there are more 239 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
240 * than {@link #getLowResourcesConnections()} connections. This allows the server to rapidly close idle connections 240 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
241 * in order to gracefully handle high load situations. 241 }
242 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low. 242
243 * @see #setMaxIdleTime(int) 243
244 */ 244 /* ------------------------------------------------------------ */
245 @Override 245 /*
246 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime) 246 * @see org.eclipse.jetty.server.server.AbstractConnector#doStart()
247 { 247 */
248 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime; 248 @Override
249 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime); 249 protected void doStart() throws Exception
250 } 250 {
251 251 _manager.setSelectSets(getAcceptors());
252 252 _manager.setMaxIdleTime(getMaxIdleTime());
253 /* ------------------------------------------------------------ */ 253 _manager.setLowResourcesConnections(getLowResourcesConnections());
254 /* 254 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
255 * @see org.eclipse.jetty.server.server.AbstractConnector#doStart() 255
256 */ 256 super.doStart();
257 @Override 257 }
258 protected void doStart() throws Exception 258
259 { 259 /* ------------------------------------------------------------ */
260 _manager.setSelectSets(getAcceptors()); 260 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
261 _manager.setMaxIdleTime(getMaxIdleTime()); 261 {
262 _manager.setLowResourcesConnections(getLowResourcesConnections()); 262 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
263 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); 263 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
264 264 return endp;
265 super.doStart(); 265 }
266 } 266
267 267 /* ------------------------------------------------------------------------------- */
268 /* ------------------------------------------------------------ */ 268 protected void endPointClosed(SelectChannelEndPoint endpoint)
269 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException 269 {
270 { 270 connectionClosed(endpoint.getConnection());
271 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); 271 }
272 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment())); 272
273 return endp; 273 /* ------------------------------------------------------------------------------- */
274 } 274 protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint)
275 275 {
276 /* ------------------------------------------------------------------------------- */ 276 return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer());
277 protected void endPointClosed(SelectChannelEndPoint endpoint) 277 }
278 { 278
279 connectionClosed(endpoint.getConnection()); 279
280 } 280 /* ------------------------------------------------------------ */
281 281 /* ------------------------------------------------------------ */
282 /* ------------------------------------------------------------------------------- */ 282 /* ------------------------------------------------------------ */
283 protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint) 283 private final class ConnectorSelectorManager extends SelectorManager
284 { 284 {
285 return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer()); 285 @Override
286 } 286 public boolean dispatch(Runnable task)
287 287 {
288 288 ThreadPool pool=getThreadPool();
289 /* ------------------------------------------------------------ */ 289 if (pool==null)
290 /* ------------------------------------------------------------ */ 290 pool=getServer().getThreadPool();
291 /* ------------------------------------------------------------ */ 291 return pool.dispatch(task);
292 private final class ConnectorSelectorManager extends SelectorManager 292 }
293 { 293
294 @Override 294 @Override
295 public boolean dispatch(Runnable task) 295 protected void endPointClosed(final SelectChannelEndPoint endpoint)
296 { 296 {
297 ThreadPool pool=getThreadPool(); 297 SelectChannelConnector.this.endPointClosed(endpoint);
298 if (pool==null) 298 }
299 pool=getServer().getThreadPool(); 299
300 return pool.dispatch(task); 300 @Override
301 } 301 protected void endPointOpened(SelectChannelEndPoint endpoint)
302 302 {
303 @Override 303 // TODO handle max connections and low resources
304 protected void endPointClosed(final SelectChannelEndPoint endpoint) 304 connectionOpened(endpoint.getConnection());
305 { 305 }
306 SelectChannelConnector.this.endPointClosed(endpoint); 306
307 } 307 @Override
308 308 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
309 @Override 309 {
310 protected void endPointOpened(SelectChannelEndPoint endpoint) 310 connectionUpgraded(oldConnection,endpoint.getConnection());
311 { 311 }
312 // TODO handle max connections and low resources 312
313 connectionOpened(endpoint.getConnection()); 313 @Override
314 } 314 public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment)
315 315 {
316 @Override 316 return SelectChannelConnector.this.newConnection(channel,endpoint);
317 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) 317 }
318 { 318
319 connectionUpgraded(oldConnection,endpoint.getConnection()); 319 @Override
320 } 320 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
321 321 {
322 @Override 322 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
323 public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment) 323 }
324 { 324 }
325 return SelectChannelConnector.this.newConnection(channel,endpoint);
326 }
327
328 @Override
329 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
330 {
331 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
332 }
333 }
334 } 325 }