comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 951:e542a9cc75ef

simplify SelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 12 Oct 2016 18:12:12 -0600
parents a778413aefc0
children 669769bcdf5c
comparison
equal deleted inserted replaced
950:a778413aefc0 951:e542a9cc75ef
56 */ 56 */
57 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable 57 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
58 { 58 {
59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); 59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");
60 60
61 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
62 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
63 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
64 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
65
66 private int _maxIdleTime; 61 private int _maxIdleTime;
67 private long _lowResourcesConnections; 62 private long _lowResourcesConnections;
68 private SelectSet[] _selectSet; 63 private SelectSet[] _selectSet;
69 private int _selectSets=1; 64 private int _selectSets=1;
70 private volatile int _set=0; 65 private volatile int _set=0;
72 /* ------------------------------------------------------------ */ 67 /* ------------------------------------------------------------ */
73 /** 68 /**
74 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. 69 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
75 * @see #setLowResourcesMaxIdleTime(long) 70 * @see #setLowResourcesMaxIdleTime(long)
76 */ 71 */
77 public void setMaxIdleTime(long maxIdleTime) 72 public void setMaxIdleTime(int maxIdleTime)
78 { 73 {
79 _maxIdleTime=(int)maxIdleTime; 74 _maxIdleTime = maxIdleTime;
80 } 75 }
81 76
82 /* ------------------------------------------------------------ */ 77 /* ------------------------------------------------------------ */
83 /** 78 /**
84 * @param selectSets number of select sets to create 79 * @param selectSets number of select sets to create
210 } 205 }
211 } 206 }
212 super.doStop(); 207 super.doStop();
213 } 208 }
214 209
215 /* ------------------------------------------------------------------------------- */
216 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); 210 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
217 211
218 public String dump() 212 public String dump()
219 { 213 {
220 return AggregateLifeCycle.dump(this); 214 return AggregateLifeCycle.dump(this);
233 private volatile long _now = System.currentTimeMillis(); 227 private volatile long _now = System.currentTimeMillis();
234 228
235 private volatile SaneSelector _selector; 229 private volatile SaneSelector _selector;
236 230
237 private volatile Thread _selecting; 231 private volatile Thread _selecting;
238 private int _busySelects;
239 private long _monitorNext;
240 private boolean _pausing;
241 private boolean _paused;
242 private volatile long _idleTick;
243 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); 232 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
244 233
245 SelectSet(int acceptorID) throws Exception 234 SelectSet(int acceptorID) throws Exception
246 { 235 {
247 _setID=acceptorID; 236 _setID=acceptorID;
248
249 _idleTick = System.currentTimeMillis();
250 237
251 // create a selector; 238 // create a selector;
252 _selector = new SaneSelector(); 239 _selector = new SaneSelector();
253 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
254 } 240 }
255 241
256 private void addChange(SocketChannel channel) 242 private void addChange(SocketChannel channel)
257 { 243 {
258 try { 244 try {
283 final SaneSelector selector = _selector; 269 final SaneSelector selector = _selector;
284 // Stopped concurrently ? 270 // Stopped concurrently ?
285 if (selector == null) 271 if (selector == null)
286 return; 272 return;
287 273
288 // Do and instant select to see if any connections can be handled. 274 selector.select();
289 // int selected = selector.selectNow();
290 int selected = selector.select();
291
292 _now = System.currentTimeMillis();
293 /*
294 // if no immediate things to do
295 if (selected==0 && selector.selectedKeys().isEmpty())
296 {
297
298 // If we are in pausing mode
299 if (_pausing)
300 {
301 try
302 {
303 Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop
304 }
305 catch(InterruptedException e)
306 {
307 LOG.trace("",e);
308 }
309 _now = System.currentTimeMillis();
310 }
311
312 // workout how long to wait in select
313 long wait = __IDLE_TICK;
314
315 // If we should wait with a select
316 if (wait>0)
317 {
318 long before = _now;
319 selector.select(wait);
320 // selector.select(10000L);
321 _now = System.currentTimeMillis();
322
323 // If we are monitoring for busy selector
324 // and this select did not wait more than 1ms
325 if (__MONITOR_PERIOD>0 && _now-before <=1)
326 {
327 // count this as a busy select and if there have been too many this monitor cycle
328 if (++_busySelects>__MAX_SELECTS)
329 {
330 // Start injecting pauses
331 _pausing=true;
332
333 // if this is the first pause
334 if (!_paused)
335 {
336 // Log and dump some status
337 _paused=true;
338 LOG.warn("Selector {} is too busy, pausing!",this);
339 }
340 }
341 }
342 }
343 }
344 */
345 // have we been destroyed while sleeping
346 if (_selector==null || !selector.isOpen())
347 return;
348 275
349 // Look for things to do 276 // Look for things to do
350 for (SelectionKey key: selector.selectedKeys()) 277 for (SelectionKey key: selector.selectedKeys())
351 { 278 {
352 SocketChannel channel=null; 279 SocketChannel channel=null;
437 364
438 // Everything always handled 365 // Everything always handled
439 selector.selectedKeys().clear(); 366 selector.selectedKeys().clear();
440 367
441 _now = System.currentTimeMillis(); 368 _now = System.currentTimeMillis();
442 /*
443 // Idle tick
444 if (_now-_idleTick>__IDLE_TICK)
445 {
446 _idleTick = _now;
447
448 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
449 ?(_now+_maxIdleTime)
450 :_now;
451
452 execute(new Runnable()
453 {
454 public void run()
455 {
456 for (SelectChannelEndPoint endp:_endPoints.keySet())
457 {
458 endp.checkIdleTimestamp(idle_now);
459 }
460 }
461 public String toString() {return "Idle-"+super.toString();}
462 });
463
464 }
465 */
466 // Reset busy select monitor counts
467 if (__MONITOR_PERIOD>0 && _now>_monitorNext)
468 {
469 _busySelects=0;
470 _pausing=false;
471 _monitorNext=_now+__MONITOR_PERIOD;
472
473 }
474 } 369 }
475 catch (ClosedSelectorException e) 370 catch (ClosedSelectorException e)
476 { 371 {
477 if (isRunning()) 372 if (isRunning())
478 LOG.warn("",e); 373 LOG.warn("",e);
496 391
497 public long getNow() 392 public long getNow()
498 { 393 {
499 return _now; 394 return _now;
500 } 395 }
501 /* 396
502 public void wakeup()
503 {
504 SaneSelector selector = _selector;
505 if (selector!=null)
506 selector.wakeup();
507 }
508 */
509 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException 397 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
510 { 398 {
511 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,this,sKey, _maxIdleTime); 399 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,this,sKey, _maxIdleTime);
512 endp.setConnection(getManager().newConnection(channel,endp, sKey.attachment())); 400 endp.setConnection(getManager().newConnection(channel,endp, sKey.attachment()));
513 LOG.debug("created {}",endp); 401 LOG.debug("created {}",endp);
546 } 434 }
547 */ 435 */
548 // close endpoints and selector 436 // close endpoints and selector
549 synchronized (this) 437 synchronized (this)
550 { 438 {
551 SaneSelector selector=_selector; 439 for (SelectionKey key : _selector.keys())
552 for (SelectionKey key:selector.keys())
553 { 440 {
554 if (key==null) 441 if (key==null)
555 continue; 442 continue;
556 Object att=key.attachment(); 443 Object att=key.attachment();
557 if (att instanceof EndPoint) 444 if (att instanceof EndPoint)
568 } 455 }
569 } 456 }
570 457
571 try 458 try
572 { 459 {
573 selector=_selector; 460 _selector.close();
574 if (selector != null)
575 selector.close();
576 } 461 }
577 catch (IOException e) 462 catch (IOException e)
578 { 463 {
579 LOG.trace("",e); 464 LOG.trace("",e);
580 } 465 }