comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 953:7db4a488fc82

simplify SelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 12 Oct 2016 22:16:36 -0600
parents 669769bcdf5c
children a021c4c9c244
comparison
equal deleted inserted replaced
952:669769bcdf5c 953:7db4a488fc82
59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); 59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");
60 60
61 private int _maxIdleTime; 61 private int _maxIdleTime;
62 private long _lowResourcesConnections; 62 private long _lowResourcesConnections;
63 private SelectSet[] _selectSet; 63 private SelectSet[] _selectSet;
64 private int _selectSets=1; 64 private int _selectSets = 1;
65 private volatile int _set=0; 65 private volatile int _set=0;
66 66
67 /* ------------------------------------------------------------ */ 67 /* ------------------------------------------------------------ */
68 /** 68 /**
69 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. 69 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
240 } 240 }
241 241
242 private void addChange(SocketChannel channel) 242 private void addChange(SocketChannel channel)
243 { 243 {
244 try { 244 try {
245 SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null); 245 //System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq a");
246 // SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null);
247 SelectionKey key = _selector.register(channel,0,null);
246 SelectChannelEndPoint endpoint = createEndPoint(channel,key); 248 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
247 key.attach(endpoint); 249 key.attach(endpoint);
250 key.interestOps(SelectionKey.OP_READ);
251 _selector.update();
252 //System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq b");
248 endpoint.schedule(); 253 endpoint.schedule();
249 } catch(IOException e) { 254 } catch(IOException e) {
250 LOG.warn("",e); 255 LOG.warn("",e);
251 try { 256 try {
252 channel.close(); 257 channel.close();
259 /** 264 /**
260 * Select and dispatch tasks found from changes and the selector. 265 * Select and dispatch tasks found from changes and the selector.
261 * 266 *
262 * @throws IOException 267 * @throws IOException
263 */ 268 */
264 public void doSelect() throws IOException 269 private void doSelect() throws IOException
265 { 270 {
266 try 271 try
267 { 272 {
268 _selecting=Thread.currentThread(); 273 _selecting=Thread.currentThread();
269 final SaneSelector selector = _selector; 274 final SaneSelector selector = _selector;
274 selector.select(); 279 selector.select();
275 280
276 // Look for things to do 281 // Look for things to do
277 for (SelectionKey key: selector.selectedKeys()) 282 for (SelectionKey key: selector.selectedKeys())
278 { 283 {
279 SocketChannel channel=null;
280
281 try 284 try
282 { 285 {
283 if (!key.isValid()) 286 if (!key.isValid())
284 { 287 {
285 key.cancel(); 288 key.cancel();
287 if (endpoint != null) 290 if (endpoint != null)
288 endpoint.doUpdateKey(); 291 endpoint.doUpdateKey();
289 continue; 292 continue;
290 } 293 }
291 294
292 Object att = key.attachment(); 295 if (key.isReadable()||key.isWritable()) {
293 if (att instanceof SelectChannelEndPoint) 296 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
294 { 297 endpoint.schedule();
295 if (key.isReadable()||key.isWritable())
296 ((SelectChannelEndPoint)att).schedule();
297 }
298 else if (key.isConnectable())
299 {
300 // Complete a connection of a registered channel
301 channel = (SocketChannel)key.channel();
302 boolean connected=false;
303 try
304 {
305 connected=channel.finishConnect();
306 }
307 catch(Exception e)
308 {
309 LOG.warn(e+","+channel+","+att);
310 LOG.debug("",e);
311 }
312 finally
313 {
314 if (connected)
315 {
316 key.interestOps(SelectionKey.OP_READ);
317 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
318 key.attach(endpoint);
319 endpoint.schedule();
320 }
321 else
322 {
323 key.cancel();
324 channel.close();
325 }
326 }
327 }
328 else
329 {
330 // Wrap readable registered channel in an endpoint
331 channel = (SocketChannel)key.channel();
332 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
333 key.attach(endpoint);
334 if (key.isReadable())
335 endpoint.schedule();
336 } 298 }
337 key = null; 299 key = null;
338 } 300 }
339 catch (CancelledKeyException e) 301 catch (CancelledKeyException e)
340 { 302 {
344 { 306 {
345 if (isRunning()) 307 if (isRunning())
346 LOG.warn("",e); 308 LOG.warn("",e);
347 else 309 else
348 LOG.trace("",e); 310 LOG.trace("",e);
349
350 try
351 {
352 if (channel!=null)
353 channel.close();
354 }
355 catch(IOException e2)
356 {
357 LOG.debug("",e2);
358 }
359
360 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
361 key.cancel();
362 } 311 }
363 } 312 }
364 313
365 // Everything always handled 314 // Everything always handled
366 selector.selectedKeys().clear(); 315 selector.selectedKeys().clear();
413 SaneSelector getSelector() 362 SaneSelector getSelector()
414 { 363 {
415 return _selector; 364 return _selector;
416 } 365 }
417 366
418 void stop() throws Exception 367 synchronized void stop() throws Exception
419 { 368 {
420 // Spin for a while waiting for selector to complete 369 // close endpoints and selector
421 // to avoid unneccessary closed channel exceptions 370 for (SelectionKey key : _selector.keys())
422 /* 371 {
372 Object att=key.attachment();
373 if (att instanceof EndPoint)
374 {
375 EndPoint endpoint = (EndPoint)att;
376 try
377 {
378 endpoint.close();
379 }
380 catch(IOException e)
381 {
382 LOG.trace("",e);
383 }
384 }
385 }
386
423 try 387 try
424 { 388 {
425 for (int i=0;i<100 && _selecting!=null;i++) 389 _selector.close();
426 { 390 }
427 _selector.wakeup(); 391 catch (IOException e)
428 Thread.sleep(10); 392 {
429 } 393 LOG.trace("",e);
430 } 394 }
431 catch(Exception e) 395 _selector = null;
432 {
433 LOG.warn("",e);
434 }
435 */
436 // close endpoints and selector
437 synchronized (this)
438 {
439 for (SelectionKey key : _selector.keys())
440 {
441 if (key==null)
442 continue;
443 Object att=key.attachment();
444 if (att instanceof EndPoint)
445 {
446 EndPoint endpoint = (EndPoint)att;
447 try
448 {
449 endpoint.close();
450 }
451 catch(IOException e)
452 {
453 LOG.trace("",e);
454 }
455 }
456 }
457
458 try
459 {
460 _selector.close();
461 }
462 catch (IOException e)
463 {
464 LOG.trace("",e);
465 }
466 _selector = null;
467 }
468 } 396 }
469 397
470 public String dump() 398 public String dump()
471 { 399 {
472 return AggregateLifeCycle.dump(this); 400 return AggregateLifeCycle.dump(this);