Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 951:e542a9cc75ef
simplify SelectorManager
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 12 Oct 2016 18:12:12 -0600 |
parents | a778413aefc0 |
children | 669769bcdf5c |
comparison
equal
deleted
inserted
replaced
950:a778413aefc0 | 951:e542a9cc75ef |
---|---|
56 */ | 56 */ |
57 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable | 57 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable |
58 { | 58 { |
59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); | 59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); |
60 | 60 |
61 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); | |
62 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); | |
63 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); | |
64 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); | |
65 | |
66 private int _maxIdleTime; | 61 private int _maxIdleTime; |
67 private long _lowResourcesConnections; | 62 private long _lowResourcesConnections; |
68 private SelectSet[] _selectSet; | 63 private SelectSet[] _selectSet; |
69 private int _selectSets=1; | 64 private int _selectSets=1; |
70 private volatile int _set=0; | 65 private volatile int _set=0; |
72 /* ------------------------------------------------------------ */ | 67 /* ------------------------------------------------------------ */ |
73 /** | 68 /** |
74 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. | 69 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. |
75 * @see #setLowResourcesMaxIdleTime(long) | 70 * @see #setLowResourcesMaxIdleTime(long) |
76 */ | 71 */ |
77 public void setMaxIdleTime(long maxIdleTime) | 72 public void setMaxIdleTime(int maxIdleTime) |
78 { | 73 { |
79 _maxIdleTime=(int)maxIdleTime; | 74 _maxIdleTime = maxIdleTime; |
80 } | 75 } |
81 | 76 |
82 /* ------------------------------------------------------------ */ | 77 /* ------------------------------------------------------------ */ |
83 /** | 78 /** |
84 * @param selectSets number of select sets to create | 79 * @param selectSets number of select sets to create |
210 } | 205 } |
211 } | 206 } |
212 super.doStop(); | 207 super.doStop(); |
213 } | 208 } |
214 | 209 |
215 /* ------------------------------------------------------------------------------- */ | |
216 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); | 210 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); |
217 | 211 |
218 public String dump() | 212 public String dump() |
219 { | 213 { |
220 return AggregateLifeCycle.dump(this); | 214 return AggregateLifeCycle.dump(this); |
233 private volatile long _now = System.currentTimeMillis(); | 227 private volatile long _now = System.currentTimeMillis(); |
234 | 228 |
235 private volatile SaneSelector _selector; | 229 private volatile SaneSelector _selector; |
236 | 230 |
237 private volatile Thread _selecting; | 231 private volatile Thread _selecting; |
238 private int _busySelects; | |
239 private long _monitorNext; | |
240 private boolean _pausing; | |
241 private boolean _paused; | |
242 private volatile long _idleTick; | |
243 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); | 232 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); |
244 | 233 |
245 SelectSet(int acceptorID) throws Exception | 234 SelectSet(int acceptorID) throws Exception |
246 { | 235 { |
247 _setID=acceptorID; | 236 _setID=acceptorID; |
248 | |
249 _idleTick = System.currentTimeMillis(); | |
250 | 237 |
251 // create a selector; | 238 // create a selector; |
252 _selector = new SaneSelector(); | 239 _selector = new SaneSelector(); |
253 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; | |
254 } | 240 } |
255 | 241 |
256 private void addChange(SocketChannel channel) | 242 private void addChange(SocketChannel channel) |
257 { | 243 { |
258 try { | 244 try { |
283 final SaneSelector selector = _selector; | 269 final SaneSelector selector = _selector; |
284 // Stopped concurrently ? | 270 // Stopped concurrently ? |
285 if (selector == null) | 271 if (selector == null) |
286 return; | 272 return; |
287 | 273 |
288 // Do and instant select to see if any connections can be handled. | 274 selector.select(); |
289 // int selected = selector.selectNow(); | |
290 int selected = selector.select(); | |
291 | |
292 _now = System.currentTimeMillis(); | |
293 /* | |
294 // if no immediate things to do | |
295 if (selected==0 && selector.selectedKeys().isEmpty()) | |
296 { | |
297 | |
298 // If we are in pausing mode | |
299 if (_pausing) | |
300 { | |
301 try | |
302 { | |
303 Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop | |
304 } | |
305 catch(InterruptedException e) | |
306 { | |
307 LOG.trace("",e); | |
308 } | |
309 _now = System.currentTimeMillis(); | |
310 } | |
311 | |
312 // workout how long to wait in select | |
313 long wait = __IDLE_TICK; | |
314 | |
315 // If we should wait with a select | |
316 if (wait>0) | |
317 { | |
318 long before = _now; | |
319 selector.select(wait); | |
320 // selector.select(10000L); | |
321 _now = System.currentTimeMillis(); | |
322 | |
323 // If we are monitoring for busy selector | |
324 // and this select did not wait more than 1ms | |
325 if (__MONITOR_PERIOD>0 && _now-before <=1) | |
326 { | |
327 // count this as a busy select and if there have been too many this monitor cycle | |
328 if (++_busySelects>__MAX_SELECTS) | |
329 { | |
330 // Start injecting pauses | |
331 _pausing=true; | |
332 | |
333 // if this is the first pause | |
334 if (!_paused) | |
335 { | |
336 // Log and dump some status | |
337 _paused=true; | |
338 LOG.warn("Selector {} is too busy, pausing!",this); | |
339 } | |
340 } | |
341 } | |
342 } | |
343 } | |
344 */ | |
345 // have we been destroyed while sleeping | |
346 if (_selector==null || !selector.isOpen()) | |
347 return; | |
348 | 275 |
349 // Look for things to do | 276 // Look for things to do |
350 for (SelectionKey key: selector.selectedKeys()) | 277 for (SelectionKey key: selector.selectedKeys()) |
351 { | 278 { |
352 SocketChannel channel=null; | 279 SocketChannel channel=null; |
437 | 364 |
438 // Everything always handled | 365 // Everything always handled |
439 selector.selectedKeys().clear(); | 366 selector.selectedKeys().clear(); |
440 | 367 |
441 _now = System.currentTimeMillis(); | 368 _now = System.currentTimeMillis(); |
442 /* | |
443 // Idle tick | |
444 if (_now-_idleTick>__IDLE_TICK) | |
445 { | |
446 _idleTick = _now; | |
447 | |
448 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) | |
449 ?(_now+_maxIdleTime) | |
450 :_now; | |
451 | |
452 execute(new Runnable() | |
453 { | |
454 public void run() | |
455 { | |
456 for (SelectChannelEndPoint endp:_endPoints.keySet()) | |
457 { | |
458 endp.checkIdleTimestamp(idle_now); | |
459 } | |
460 } | |
461 public String toString() {return "Idle-"+super.toString();} | |
462 }); | |
463 | |
464 } | |
465 */ | |
466 // Reset busy select monitor counts | |
467 if (__MONITOR_PERIOD>0 && _now>_monitorNext) | |
468 { | |
469 _busySelects=0; | |
470 _pausing=false; | |
471 _monitorNext=_now+__MONITOR_PERIOD; | |
472 | |
473 } | |
474 } | 369 } |
475 catch (ClosedSelectorException e) | 370 catch (ClosedSelectorException e) |
476 { | 371 { |
477 if (isRunning()) | 372 if (isRunning()) |
478 LOG.warn("",e); | 373 LOG.warn("",e); |
496 | 391 |
497 public long getNow() | 392 public long getNow() |
498 { | 393 { |
499 return _now; | 394 return _now; |
500 } | 395 } |
501 /* | 396 |
502 public void wakeup() | |
503 { | |
504 SaneSelector selector = _selector; | |
505 if (selector!=null) | |
506 selector.wakeup(); | |
507 } | |
508 */ | |
509 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException | 397 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException |
510 { | 398 { |
511 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,this,sKey, _maxIdleTime); | 399 SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,this,sKey, _maxIdleTime); |
512 endp.setConnection(getManager().newConnection(channel,endp, sKey.attachment())); | 400 endp.setConnection(getManager().newConnection(channel,endp, sKey.attachment())); |
513 LOG.debug("created {}",endp); | 401 LOG.debug("created {}",endp); |
546 } | 434 } |
547 */ | 435 */ |
548 // close endpoints and selector | 436 // close endpoints and selector |
549 synchronized (this) | 437 synchronized (this) |
550 { | 438 { |
551 SaneSelector selector=_selector; | 439 for (SelectionKey key : _selector.keys()) |
552 for (SelectionKey key:selector.keys()) | |
553 { | 440 { |
554 if (key==null) | 441 if (key==null) |
555 continue; | 442 continue; |
556 Object att=key.attachment(); | 443 Object att=key.attachment(); |
557 if (att instanceof EndPoint) | 444 if (att instanceof EndPoint) |
568 } | 455 } |
569 } | 456 } |
570 | 457 |
571 try | 458 try |
572 { | 459 { |
573 selector=_selector; | 460 _selector.close(); |
574 if (selector != null) | |
575 selector.close(); | |
576 } | 461 } |
577 catch (IOException e) | 462 catch (IOException e) |
578 { | 463 { |
579 LOG.trace("",e); | 464 LOG.trace("",e); |
580 } | 465 } |