comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 865:6b210bb66c63

remove ThreadPool
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 02 Oct 2016 20:38:06 -0600
parents 8e9db0bbf4f9
children 54308d65265a
comparison
equal deleted inserted replaced
864:e21ca9878a10 865:6b210bb66c63
56 * NIO scheduling to scale to large numbers of connections. 56 * NIO scheduling to scale to large numbers of connections.
57 * <p> 57 * <p>
58 */ 58 */
59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable 59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
60 { 60 {
61 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); 61 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");
62 62
63 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); 63 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
64 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); 64 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
65 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); 65 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
66 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); 66 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
67 67
68 private int _maxIdleTime; 68 private int _maxIdleTime;
69 private int _lowResourcesMaxIdleTime; 69 private int _lowResourcesMaxIdleTime;
70 private long _lowResourcesConnections; 70 private long _lowResourcesConnections;
71 private SelectSet[] _selectSet; 71 private SelectSet[] _selectSet;
72 private int _selectSets=1; 72 private int _selectSets=1;
73 private volatile int _set=0; 73 private volatile int _set=0;
74 private boolean _deferringInterestedOps0=true; 74 private boolean _deferringInterestedOps0=true;
75 private int _selectorPriorityDelta=0; 75 private int _selectorPriorityDelta=0;
76 76
77 /* ------------------------------------------------------------ */ 77 /* ------------------------------------------------------------ */
78 /** 78 /**
79 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. 79 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
80 * @see #setLowResourcesMaxIdleTime(long) 80 * @see #setLowResourcesMaxIdleTime(long)
81 */ 81 */
82 public void setMaxIdleTime(long maxIdleTime) 82 public void setMaxIdleTime(long maxIdleTime)
83 { 83 {
84 _maxIdleTime=(int)maxIdleTime; 84 _maxIdleTime=(int)maxIdleTime;
85 } 85 }
86 86
87 /* ------------------------------------------------------------ */ 87 /* ------------------------------------------------------------ */
88 /** 88 /**
89 * @param selectSets number of select sets to create 89 * @param selectSets number of select sets to create
90 */ 90 */
91 public void setSelectSets(int selectSets) 91 public void setSelectSets(int selectSets)
92 { 92 {
93 long lrc = _lowResourcesConnections * _selectSets; 93 long lrc = _lowResourcesConnections * _selectSets;
94 _selectSets=selectSets; 94 _selectSets=selectSets;
95 _lowResourcesConnections=lrc/_selectSets; 95 _lowResourcesConnections=lrc/_selectSets;
96 } 96 }
97 97
98 /* ------------------------------------------------------------ */ 98 /* ------------------------------------------------------------ */
99 /** 99 /**
100 * @return the max idle time 100 * @return the max idle time
101 */ 101 */
102 public long getMaxIdleTime() 102 public long getMaxIdleTime()
103 { 103 {
104 return _maxIdleTime; 104 return _maxIdleTime;
105 } 105 }
106 106
107 /* ------------------------------------------------------------ */ 107 /* ------------------------------------------------------------ */
108 /** 108 /**
109 * @return the number of select sets in use 109 * @return the number of select sets in use
110 */ 110 */
111 public int getSelectSets() 111 public int getSelectSets()
112 { 112 {
113 return _selectSets; 113 return _selectSets;
114 } 114 }
115 115
116 /* ------------------------------------------------------------ */ 116 /* ------------------------------------------------------------ */
117 /** 117 /**
118 * @param i 118 * @param i
119 * @return The select set 119 * @return The select set
120 */ 120 */
121 public SelectSet getSelectSet(int i) 121 public SelectSet getSelectSet(int i)
122 { 122 {
123 return _selectSet[i]; 123 return _selectSet[i];
124 } 124 }
125 125
126 /* ------------------------------------------------------------ */ 126 /* ------------------------------------------------------------ */
127 /** Register a channel 127 /** Register a channel
128 * @param channel 128 * @param channel
129 * @param att Attached Object 129 * @param att Attached Object
130 */ 130 */
131 public void register(SocketChannel channel, Object att) 131 public void register(SocketChannel channel, Object att)
132 { 132 {
133 // The ++ increment here is not atomic, but it does not matter. 133 // The ++ increment here is not atomic, but it does not matter.
134 // so long as the value changes sometimes, then connections will 134 // so long as the value changes sometimes, then connections will
135 // be distributed over the available sets. 135 // be distributed over the available sets.
136 136
137 int s=_set++; 137 int s=_set++;
138 if (s<0) 138 if (s<0)
139 s=-s; 139 s=-s;
140 s=s%_selectSets; 140 s=s%_selectSets;
141 SelectSet[] sets=_selectSet; 141 SelectSet[] sets=_selectSet;
142 if (sets!=null) 142 if (sets!=null)
143 { 143 {
144 SelectSet set=sets[s]; 144 SelectSet set=sets[s];
145 set.addChange(channel,att); 145 set.addChange(channel,att);
146 set.wakeup(); 146 set.wakeup();
147 } 147 }
148 } 148 }
149 149
150 150
151 /* ------------------------------------------------------------ */ 151 /* ------------------------------------------------------------ */
152 /** Register a channel 152 /** Register a channel
153 * @param channel 153 * @param channel
154 */ 154 */
155 public void register(SocketChannel channel) 155 public void register(SocketChannel channel)
156 { 156 {
157 // The ++ increment here is not atomic, but it does not matter. 157 // The ++ increment here is not atomic, but it does not matter.
158 // so long as the value changes sometimes, then connections will 158 // so long as the value changes sometimes, then connections will
159 // be distributed over the available sets. 159 // be distributed over the available sets.
160 160
161 int s=_set++; 161 int s=_set++;
162 if (s<0) 162 if (s<0)
163 s=-s; 163 s=-s;
164 s=s%_selectSets; 164 s=s%_selectSets;
165 SelectSet[] sets=_selectSet; 165 SelectSet[] sets=_selectSet;
166 if (sets!=null) 166 if (sets!=null)
167 { 167 {
168 SelectSet set=sets[s]; 168 SelectSet set=sets[s];
169 set.addChange(channel); 169 set.addChange(channel);
170 set.wakeup(); 170 set.wakeup();
171 } 171 }
172 } 172 }
173 173
174 /* ------------------------------------------------------------ */ 174 /* ------------------------------------------------------------ */
175 /** Register a {@link ServerSocketChannel} 175 /** Register a {@link ServerSocketChannel}
176 * @param acceptChannel 176 * @param acceptChannel
177 */ 177 */
178 public void register(ServerSocketChannel acceptChannel) 178 public void register(ServerSocketChannel acceptChannel)
179 { 179 {
180 int s=_set++; 180 int s=_set++;
181 if (s<0) 181 if (s<0)
182 s=-s; 182 s=-s;
183 s=s%_selectSets; 183 s=s%_selectSets;
184 SelectSet set=_selectSet[s]; 184 SelectSet set=_selectSet[s];
185 set.addChange(acceptChannel); 185 set.addChange(acceptChannel);
186 set.wakeup(); 186 set.wakeup();
187 } 187 }
188 188
189 /* ------------------------------------------------------------ */ 189 /* ------------------------------------------------------------ */
190 /** 190 /**
191 * @return delta The value to add to the selector thread priority. 191 * @return delta The value to add to the selector thread priority.
192 */ 192 */
193 public int getSelectorPriorityDelta() 193 public int getSelectorPriorityDelta()
194 { 194 {
195 return _selectorPriorityDelta; 195 return _selectorPriorityDelta;
196 } 196 }
197 197
198 /* ------------------------------------------------------------ */ 198 /* ------------------------------------------------------------ */
199 /** Set the selector thread priorty delta. 199 /** Set the selector thread priorty delta.
200 * @param delta The value to add to the selector thread priority. 200 * @param delta The value to add to the selector thread priority.
201 */ 201 */
202 public void setSelectorPriorityDelta(int delta) 202 public void setSelectorPriorityDelta(int delta)
203 { 203 {
204 _selectorPriorityDelta=delta; 204 _selectorPriorityDelta=delta;
205 } 205 }
206 206
207 207
208 /* ------------------------------------------------------------ */ 208 /* ------------------------------------------------------------ */
209 /** 209 /**
210 * @return the lowResourcesConnections 210 * @return the lowResourcesConnections
211 */ 211 */
212 public long getLowResourcesConnections() 212 public long getLowResourcesConnections()
213 { 213 {
214 return _lowResourcesConnections*_selectSets; 214 return _lowResourcesConnections*_selectSets;
215 } 215 }
216 216
217 /* ------------------------------------------------------------ */ 217 /* ------------------------------------------------------------ */
218 /** 218 /**
219 * Set the number of connections, which if exceeded places this manager in low resources state. 219 * Set the number of connections, which if exceeded places this manager in low resources state.
220 * This is not an exact measure as the connection count is averaged over the select sets. 220 * This is not an exact measure as the connection count is averaged over the select sets.
221 * @param lowResourcesConnections the number of connections 221 * @param lowResourcesConnections the number of connections
222 * @see #setLowResourcesMaxIdleTime(long) 222 * @see #setLowResourcesMaxIdleTime(long)
223 */ 223 */
224 public void setLowResourcesConnections(long lowResourcesConnections) 224 public void setLowResourcesConnections(long lowResourcesConnections)
225 { 225 {
226 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; 226 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
227 } 227 }
228 228
229 /* ------------------------------------------------------------ */ 229 /* ------------------------------------------------------------ */
230 /** 230 /**
231 * @return the lowResourcesMaxIdleTime 231 * @return the lowResourcesMaxIdleTime
232 */ 232 */
233 public long getLowResourcesMaxIdleTime() 233 public long getLowResourcesMaxIdleTime()
234 { 234 {
235 return _lowResourcesMaxIdleTime; 235 return _lowResourcesMaxIdleTime;
236 } 236 }
237 237
238 /* ------------------------------------------------------------ */ 238 /* ------------------------------------------------------------ */
239 /** 239 /**
240 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} 240 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
241 * @see #setMaxIdleTime(long) 241 * @see #setMaxIdleTime(long)
242 */ 242 */
243 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) 243 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
244 { 244 {
245 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; 245 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
246 } 246 }
247 247
248 248
249 /* ------------------------------------------------------------------------------- */ 249 /* ------------------------------------------------------------------------------- */
250 public abstract boolean dispatch(Runnable task); 250 public abstract void execute(Runnable task);
251 251
252 /* ------------------------------------------------------------ */ 252 /* ------------------------------------------------------------ */
253 /* (non-Javadoc) 253 /* (non-Javadoc)
254 * @see org.eclipse.component.AbstractLifeCycle#doStart() 254 * @see org.eclipse.component.AbstractLifeCycle#doStart()
255 */ 255 */
256 @Override 256 @Override
257 protected void doStart() throws Exception 257 protected void doStart() throws Exception
258 { 258 {
259 _selectSet = new SelectSet[_selectSets]; 259 _selectSet = new SelectSet[_selectSets];
260 for (int i=0;i<_selectSet.length;i++) 260 for (int i=0;i<_selectSet.length;i++)
261 _selectSet[i]= new SelectSet(i); 261 _selectSet[i]= new SelectSet(i);
262 262
263 super.doStart(); 263 super.doStart();
264 264
265 // start a thread to Select 265 // start a thread to Select
266 for (int i=0;i<getSelectSets();i++) 266 for (int i=0;i<getSelectSets();i++)
267 { 267 {
268 final int id=i; 268 final int id=i;
269 boolean selecting=dispatch(new Runnable() 269 execute(new Runnable()
270 { 270 {
271 public void run() 271 public void run()
272 { 272 {
273 String name=Thread.currentThread().getName(); 273 String name=Thread.currentThread().getName();
274 int priority=Thread.currentThread().getPriority(); 274 int priority=Thread.currentThread().getPriority();
275 try 275 try
276 { 276 {
277 SelectSet[] sets=_selectSet; 277 SelectSet[] sets=_selectSet;
278 if (sets==null) 278 if (sets==null)
279 return; 279 return;
280 SelectSet set=sets[id]; 280 SelectSet set=sets[id];
281 281
282 Thread.currentThread().setName(name+" Selector"+id); 282 Thread.currentThread().setName(name+" Selector"+id);
283 if (getSelectorPriorityDelta()!=0) 283 if (getSelectorPriorityDelta()!=0)
284 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta()); 284 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
285 LOG.debug("Starting {} on {}",Thread.currentThread(),this); 285 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
286 while (isRunning()) 286 while (isRunning())
287 { 287 {
288 try 288 try
289 { 289 {
290 set.doSelect(); 290 set.doSelect();
291 } 291 }
292 catch(IOException e) 292 catch(IOException e)
293 { 293 {
294 LOG.trace("",e); 294 LOG.trace("",e);
295 } 295 }
296 catch(Exception e) 296 catch(Exception e)
297 { 297 {
298 LOG.warn("",e); 298 LOG.warn("",e);
299 } 299 }
300 } 300 }
301 } 301 }
302 finally 302 finally
303 { 303 {
304 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); 304 LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
305 Thread.currentThread().setName(name); 305 Thread.currentThread().setName(name);
306 if (getSelectorPriorityDelta()!=0) 306 if (getSelectorPriorityDelta()!=0)
307 Thread.currentThread().setPriority(priority); 307 Thread.currentThread().setPriority(priority);
308 } 308 }
309 } 309 }
310 310
311 }); 311 });
312 312 }
313 if (!selecting) 313 }
314 throw new IllegalStateException("!Selecting"); 314
315 } 315
316 } 316 /* ------------------------------------------------------------------------------- */
317 317 @Override
318 318 protected void doStop() throws Exception
319 /* ------------------------------------------------------------------------------- */ 319 {
320 @Override 320 SelectSet[] sets= _selectSet;
321 protected void doStop() throws Exception 321 _selectSet=null;
322 { 322 if (sets!=null)
323 SelectSet[] sets= _selectSet; 323 {
324 _selectSet=null; 324 for (SelectSet set : sets)
325 if (sets!=null) 325 {
326 { 326 if (set!=null)
327 for (SelectSet set : sets) 327 set.stop();
328 { 328 }
329 if (set!=null) 329 }
330 set.stop(); 330 super.doStop();
331 } 331 }
332 } 332
333 super.doStop(); 333 /* ------------------------------------------------------------ */
334 } 334 /**
335 335 * @param endpoint
336 /* ------------------------------------------------------------ */ 336 */
337 /** 337 protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
338 * @param endpoint 338
339 */ 339 /* ------------------------------------------------------------ */
340 protected abstract void endPointClosed(SelectChannelEndPoint endpoint); 340 /**
341 341 * @param endpoint
342 /* ------------------------------------------------------------ */ 342 */
343 /** 343 protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
344 * @param endpoint 344
345 */ 345 /* ------------------------------------------------------------ */
346 protected abstract void endPointOpened(SelectChannelEndPoint endpoint); 346 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
347 347
348 /* ------------------------------------------------------------ */ 348 /* ------------------------------------------------------------------------------- */
349 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); 349 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment);
350 350
351 /* ------------------------------------------------------------------------------- */ 351 /* ------------------------------------------------------------ */
352 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); 352 /**
353 353 * Create a new end point
354 /* ------------------------------------------------------------ */ 354 * @param channel
355 /** 355 * @param selectSet
356 * Create a new end point 356 * @param sKey the selection key
357 * @param channel 357 * @return the new endpoint {@link SelectChannelEndPoint}
358 * @param selectSet 358 * @throws IOException
359 * @param sKey the selection key 359 */
360 * @return the new endpoint {@link SelectChannelEndPoint} 360 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
361 * @throws IOException 361
362 */ 362 /* ------------------------------------------------------------------------------- */
363 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; 363 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
364 364 {
365 /* ------------------------------------------------------------------------------- */ 365 LOG.warn(ex+","+channel+","+attachment);
366 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) 366 LOG.debug("",ex);
367 { 367 }
368 LOG.warn(ex+","+channel+","+attachment); 368
369 LOG.debug("",ex); 369 /* ------------------------------------------------------------ */
370 } 370 public String dump()
371 371 {
372 /* ------------------------------------------------------------ */ 372 return AggregateLifeCycle.dump(this);
373 public String dump() 373 }
374 { 374
375 return AggregateLifeCycle.dump(this); 375 /* ------------------------------------------------------------ */
376 } 376 public void dump(Appendable out, String indent) throws IOException
377 377 {
378 /* ------------------------------------------------------------ */ 378 AggregateLifeCycle.dumpObject(out,this);
379 public void dump(Appendable out, String indent) throws IOException 379 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
380 { 380 }
381 AggregateLifeCycle.dumpObject(out,this); 381
382 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); 382
383 } 383 /* ------------------------------------------------------------------------------- */
384 384 /* ------------------------------------------------------------------------------- */
385 385 /* ------------------------------------------------------------------------------- */
386 /* ------------------------------------------------------------------------------- */ 386 public class SelectSet implements Dumpable
387 /* ------------------------------------------------------------------------------- */ 387 {
388 /* ------------------------------------------------------------------------------- */ 388 private final int _setID;
389 public class SelectSet implements Dumpable 389 private final Timeout _timeout;
390 { 390
391 private final int _setID; 391 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
392 private final Timeout _timeout; 392
393 393 private volatile Selector _selector;
394 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); 394
395 395 private volatile Thread _selecting;
396 private volatile Selector _selector; 396 private int _busySelects;
397 397 private long _monitorNext;
398 private volatile Thread _selecting; 398 private boolean _pausing;
399 private int _busySelects; 399 private boolean _paused;
400 private long _monitorNext; 400 private volatile long _idleTick;
401 private boolean _pausing; 401 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
402 private boolean _paused; 402
403 private volatile long _idleTick; 403 /* ------------------------------------------------------------ */
404 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); 404 SelectSet(int acceptorID) throws Exception
405 405 {
406 /* ------------------------------------------------------------ */ 406 _setID=acceptorID;
407 SelectSet(int acceptorID) throws Exception 407
408 { 408 _idleTick = System.currentTimeMillis();
409 _setID=acceptorID; 409 _timeout = new Timeout(this);
410 410 _timeout.setDuration(0L);
411 _idleTick = System.currentTimeMillis(); 411
412 _timeout = new Timeout(this); 412 // create a selector;
413 _timeout.setDuration(0L); 413 _selector = Selector.open();
414 414 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD;
415 // create a selector; 415 }
416 _selector = Selector.open(); 416
417 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; 417 /* ------------------------------------------------------------ */
418 } 418 public void addChange(Object change)
419 419 {
420 /* ------------------------------------------------------------ */ 420 _changes.add(change);
421 public void addChange(Object change) 421 }
422 { 422
423 _changes.add(change); 423 /* ------------------------------------------------------------ */
424 } 424 public void addChange(SelectableChannel channel, Object att)
425 425 {
426 /* ------------------------------------------------------------ */ 426 if (att==null)
427 public void addChange(SelectableChannel channel, Object att) 427 addChange(channel);
428 { 428 else if (att instanceof EndPoint)
429 if (att==null) 429 addChange(att);
430 addChange(channel); 430 else
431 else if (att instanceof EndPoint) 431 addChange(new ChannelAndAttachment(channel,att));
432 addChange(att); 432 }
433 else 433
434 addChange(new ChannelAndAttachment(channel,att)); 434 /* ------------------------------------------------------------ */
435 } 435 /**
436 436 * Select and dispatch tasks found from changes and the selector.
437 /* ------------------------------------------------------------ */ 437 *
438 /** 438 * @throws IOException
439 * Select and dispatch tasks found from changes and the selector. 439 */
440 * 440 public void doSelect() throws IOException
441 * @throws IOException 441 {
442 */ 442 try
443 public void doSelect() throws IOException 443 {
444 { 444 _selecting=Thread.currentThread();
445 try 445 final Selector selector=_selector;
446 { 446 // Stopped concurrently ?
447 _selecting=Thread.currentThread(); 447 if (selector == null)
448 final Selector selector=_selector; 448 return;
449 // Stopped concurrently ? 449
450 if (selector == null) 450 // Make any key changes required
451 return; 451 Object change;
452 452 int changes=_changes.size();
453 // Make any key changes required 453 while (changes-->0 && (change=_changes.poll())!=null)
454 Object change; 454 {
455 int changes=_changes.size(); 455 Channel ch=null;
456 while (changes-->0 && (change=_changes.poll())!=null) 456 SelectionKey key=null;
457 { 457
458 Channel ch=null; 458 try
459 SelectionKey key=null; 459 {
460 460 if (change instanceof EndPoint)
461 try 461 {
462 { 462 // Update the operations for a key.
463 if (change instanceof EndPoint) 463 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
464 { 464 ch=endpoint.getChannel();
465 // Update the operations for a key. 465 endpoint.doUpdateKey();
466 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; 466 }
467 ch=endpoint.getChannel(); 467 else if (change instanceof ChannelAndAttachment)
468 endpoint.doUpdateKey(); 468 {
469 } 469 // finish accepting/connecting this connection
470 else if (change instanceof ChannelAndAttachment) 470 final ChannelAndAttachment asc = (ChannelAndAttachment)change;
471 { 471 final SelectableChannel channel=asc._channel;
472 // finish accepting/connecting this connection 472 ch=channel;
473 final ChannelAndAttachment asc = (ChannelAndAttachment)change; 473 final Object att = asc._attachment;
474 final SelectableChannel channel=asc._channel; 474
475 ch=channel; 475 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
476 final Object att = asc._attachment; 476 {
477 477 key = channel.register(selector,SelectionKey.OP_READ,att);
478 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) 478 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
479 { 479 key.attach(endpoint);
480 key = channel.register(selector,SelectionKey.OP_READ,att); 480 endpoint.schedule();
481 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); 481 }
482 key.attach(endpoint); 482 else if (channel.isOpen())
483 endpoint.schedule(); 483 {
484 } 484 key = channel.register(selector,SelectionKey.OP_CONNECT,att);
485 else if (channel.isOpen()) 485 }
486 { 486 }
487 key = channel.register(selector,SelectionKey.OP_CONNECT,att); 487 else if (change instanceof SocketChannel)
488 } 488 {
489 } 489 // Newly registered channel
490 else if (change instanceof SocketChannel) 490 final SocketChannel channel=(SocketChannel)change;
491 { 491 ch=channel;
492 // Newly registered channel 492 key = channel.register(selector,SelectionKey.OP_READ,null);
493 final SocketChannel channel=(SocketChannel)change; 493 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
494 ch=channel; 494 key.attach(endpoint);
495 key = channel.register(selector,SelectionKey.OP_READ,null); 495 endpoint.schedule();
496 SelectChannelEndPoint endpoint = createEndPoint(channel,key); 496 }
497 key.attach(endpoint); 497 else if (change instanceof ChangeTask)
498 endpoint.schedule(); 498 {
499 } 499 ((Runnable)change).run();
500 else if (change instanceof ChangeTask) 500 }
501 { 501 else if (change instanceof Runnable)
502 ((Runnable)change).run(); 502 {
503 } 503 execute((Runnable)change);
504 else if (change instanceof Runnable) 504 }
505 { 505 else
506 dispatch((Runnable)change); 506 throw new IllegalArgumentException(change.toString());
507 } 507 }
508 else 508 catch (CancelledKeyException e)
509 throw new IllegalArgumentException(change.toString()); 509 {
510 } 510 LOG.trace("",e);
511 catch (CancelledKeyException e) 511 }
512 { 512 catch (Throwable e)
513 LOG.trace("",e); 513 {
514 } 514 if (isRunning())
515 catch (Throwable e) 515 LOG.warn("",e);
516 { 516 else
517 if (isRunning()) 517 LOG.debug("",e);
518 LOG.warn("",e); 518
519 else 519 try
520 LOG.debug("",e); 520 {
521 521 if (ch!=null)
522 try 522 ch.close();
523 { 523 }
524 if (ch!=null) 524 catch(IOException e2)
525 ch.close(); 525 {
526 } 526 LOG.debug("",e2);
527 catch(IOException e2) 527 }
528 { 528 }
529 LOG.debug("",e2); 529 }
530 } 530
531 } 531
532 } 532 // Do and instant select to see if any connections can be handled.
533 533 int selected=selector.selectNow();
534 534
535 // Do and instant select to see if any connections can be handled. 535 long now=System.currentTimeMillis();
536 int selected=selector.selectNow(); 536
537 537 // if no immediate things to do
538 long now=System.currentTimeMillis(); 538 if (selected==0 && selector.selectedKeys().isEmpty())
539 539 {
540 // if no immediate things to do 540 // If we are in pausing mode
541 if (selected==0 && selector.selectedKeys().isEmpty()) 541 if (_pausing)
542 { 542 {
543 // If we are in pausing mode 543 try
544 if (_pausing) 544 {
545 { 545 Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop
546 try 546 }
547 { 547 catch(InterruptedException e)
548 Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop 548 {
549 } 549 LOG.trace("",e);
550 catch(InterruptedException e) 550 }
551 { 551 now=System.currentTimeMillis();
552 LOG.trace("",e); 552 }
553 } 553
554 now=System.currentTimeMillis(); 554 // workout how long to wait in select
555 } 555 _timeout.setNow(now);
556 556 long to_next_timeout=_timeout.getTimeToNext();
557 // workout how long to wait in select 557
558 _timeout.setNow(now); 558 long wait = _changes.size()==0?__IDLE_TICK:0L;
559 long to_next_timeout=_timeout.getTimeToNext(); 559 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
560 560 wait = to_next_timeout;
561 long wait = _changes.size()==0?__IDLE_TICK:0L; 561
562 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) 562 // If we should wait with a select
563 wait = to_next_timeout; 563 if (wait>0)
564 564 {
565 // If we should wait with a select 565 long before=now;
566 if (wait>0) 566 selector.select(wait);
567 { 567 now = System.currentTimeMillis();
568 long before=now; 568 _timeout.setNow(now);
569 selector.select(wait); 569
570 now = System.currentTimeMillis(); 570 // If we are monitoring for busy selector
571 _timeout.setNow(now); 571 // and this select did not wait more than 1ms
572 572 if (__MONITOR_PERIOD>0 && now-before <=1)
573 // If we are monitoring for busy selector 573 {
574 // and this select did not wait more than 1ms 574 // count this as a busy select and if there have been too many this monitor cycle
575 if (__MONITOR_PERIOD>0 && now-before <=1) 575 if (++_busySelects>__MAX_SELECTS)
576 { 576 {
577 // count this as a busy select and if there have been too many this monitor cycle 577 // Start injecting pauses
578 if (++_busySelects>__MAX_SELECTS) 578 _pausing=true;
579 { 579
580 // Start injecting pauses 580 // if this is the first pause
581 _pausing=true; 581 if (!_paused)
582 582 {
583 // if this is the first pause 583 // Log and dump some status
584 if (!_paused) 584 _paused=true;
585 { 585 LOG.warn("Selector {} is too busy, pausing!",this);
586 // Log and dump some status 586 }
587 _paused=true; 587 }
588 LOG.warn("Selector {} is too busy, pausing!",this); 588 }
589 } 589 }
590 } 590 }
591 } 591
592 } 592 // have we been destroyed while sleeping
593 } 593 if (_selector==null || !selector.isOpen())
594 594 return;
595 // have we been destroyed while sleeping 595
596 if (_selector==null || !selector.isOpen()) 596 // Look for things to do
597 return; 597 for (SelectionKey key: selector.selectedKeys())
598 598 {
599 // Look for things to do 599 SocketChannel channel=null;
600 for (SelectionKey key: selector.selectedKeys()) 600
601 { 601 try
602 SocketChannel channel=null; 602 {
603 603 if (!key.isValid())
604 try 604 {
605 { 605 key.cancel();
606 if (!key.isValid()) 606 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
607 { 607 if (endpoint != null)
608 key.cancel(); 608 endpoint.doUpdateKey();
609 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); 609 continue;
610 if (endpoint != null) 610 }
611 endpoint.doUpdateKey(); 611
612 continue; 612 Object att = key.attachment();
613 } 613 if (att instanceof SelectChannelEndPoint)
614 614 {
615 Object att = key.attachment(); 615 if (key.isReadable()||key.isWritable())
616 if (att instanceof SelectChannelEndPoint) 616 ((SelectChannelEndPoint)att).schedule();
617 { 617 }
618 if (key.isReadable()||key.isWritable()) 618 else if (key.isConnectable())
619 ((SelectChannelEndPoint)att).schedule(); 619 {
620 } 620 // Complete a connection of a registered channel
621 else if (key.isConnectable()) 621 channel = (SocketChannel)key.channel();
622 { 622 boolean connected=false;
623 // Complete a connection of a registered channel 623 try
624 channel = (SocketChannel)key.channel(); 624 {
625 boolean connected=false; 625 connected=channel.finishConnect();
626 try 626 }
627 { 627 catch(Exception e)
628 connected=channel.finishConnect(); 628 {
629 } 629 connectionFailed(channel,e,att);
630 catch(Exception e) 630 }
631 { 631 finally
632 connectionFailed(channel,e,att); 632 {
633 } 633 if (connected)
634 finally 634 {
635 { 635 key.interestOps(SelectionKey.OP_READ);
636 if (connected) 636 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
637 { 637 key.attach(endpoint);
638 key.interestOps(SelectionKey.OP_READ); 638 endpoint.schedule();
639 SelectChannelEndPoint endpoint = createEndPoint(channel,key); 639 }
640 key.attach(endpoint); 640 else
641 endpoint.schedule(); 641 {
642 } 642 key.cancel();
643 else 643 channel.close();
644 { 644 }
645 key.cancel(); 645 }
646 channel.close(); 646 }
647 } 647 else
648 } 648 {
649 } 649 // Wrap readable registered channel in an endpoint
650 else 650 channel = (SocketChannel)key.channel();
651 { 651 SelectChannelEndPoint endpoint = createEndPoint(channel,key);
652 // Wrap readable registered channel in an endpoint 652 key.attach(endpoint);
653 channel = (SocketChannel)key.channel(); 653 if (key.isReadable())
654 SelectChannelEndPoint endpoint = createEndPoint(channel,key); 654 endpoint.schedule();
655 key.attach(endpoint); 655 }
656 if (key.isReadable()) 656 key = null;
657 endpoint.schedule(); 657 }
658 } 658 catch (CancelledKeyException e)
659 key = null; 659 {
660 } 660 LOG.trace("",e);
661 catch (CancelledKeyException e) 661 }
662 { 662 catch (Exception e)
663 LOG.trace("",e); 663 {
664 } 664 if (isRunning())
665 catch (Exception e) 665 LOG.warn("",e);
666 { 666 else
667 if (isRunning()) 667 LOG.trace("",e);
668 LOG.warn("",e); 668
669 else 669 try
670 LOG.trace("",e); 670 {
671 671 if (channel!=null)
672 try 672 channel.close();
673 { 673 }
674 if (channel!=null) 674 catch(IOException e2)
675 channel.close(); 675 {
676 } 676 LOG.debug("",e2);
677 catch(IOException e2) 677 }
678 { 678
679 LOG.debug("",e2); 679 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
680 } 680 key.cancel();
681 681 }
682 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) 682 }
683 key.cancel(); 683
684 } 684 // Everything always handled
685 } 685 selector.selectedKeys().clear();
686 686
687 // Everything always handled 687 now=System.currentTimeMillis();
688 selector.selectedKeys().clear(); 688 _timeout.setNow(now);
689 689 Task task = _timeout.expired();
690 now=System.currentTimeMillis(); 690 while (task!=null)
691 _timeout.setNow(now); 691 {
692 Task task = _timeout.expired(); 692 if (task instanceof Runnable)
693 while (task!=null) 693 execute((Runnable)task);
694 { 694 task = _timeout.expired();
695 if (task instanceof Runnable) 695 }
696 dispatch((Runnable)task); 696
697 task = _timeout.expired(); 697 // Idle tick
698 } 698 if (now-_idleTick>__IDLE_TICK)
699 699 {
700 // Idle tick 700 _idleTick=now;
701 if (now-_idleTick>__IDLE_TICK) 701
702 { 702 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
703 _idleTick=now; 703 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
704 704 :now;
705 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) 705
706 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) 706 execute(new Runnable()
707 :now; 707 {
708 708 public void run()
709 dispatch(new Runnable() 709 {
710 { 710 for (SelectChannelEndPoint endp:_endPoints.keySet())
711 public void run() 711 {
712 { 712 endp.checkIdleTimestamp(idle_now);
713 for (SelectChannelEndPoint endp:_endPoints.keySet()) 713 }
714 { 714 }
715 endp.checkIdleTimestamp(idle_now); 715 public String toString() {return "Idle-"+super.toString();}
716 } 716 });
717 } 717
718 public String toString() {return "Idle-"+super.toString();} 718 }
719 }); 719
720 720 // Reset busy select monitor counts
721 } 721 if (__MONITOR_PERIOD>0 && now>_monitorNext)
722 722 {
723 // Reset busy select monitor counts 723 _busySelects=0;
724 if (__MONITOR_PERIOD>0 && now>_monitorNext) 724 _pausing=false;
725 { 725 _monitorNext=now+__MONITOR_PERIOD;
726 _busySelects=0; 726
727 _pausing=false; 727 }
728 _monitorNext=now+__MONITOR_PERIOD; 728 }
729 729 catch (ClosedSelectorException e)
730 } 730 {
731 } 731 if (isRunning())
732 catch (ClosedSelectorException e) 732 LOG.warn("",e);
733 { 733 else
734 if (isRunning()) 734 LOG.trace("",e);
735 LOG.warn("",e); 735 }
736 else 736 catch (CancelledKeyException e)
737 LOG.trace("",e); 737 {
738 } 738 LOG.trace("",e);
739 catch (CancelledKeyException e) 739 }
740 { 740 finally
741 LOG.trace("",e); 741 {
742 } 742 _selecting=null;
743 finally 743 }
744 { 744 }
745 _selecting=null; 745
746 } 746
747 } 747 /* ------------------------------------------------------------ */
748 748 private void renewSelector()
749 749 {
750 /* ------------------------------------------------------------ */ 750 try
751 private void renewSelector() 751 {
752 { 752 synchronized (this)
753 try 753 {
754 { 754 Selector selector=_selector;
755 synchronized (this) 755 if (selector==null)
756 { 756 return;
757 Selector selector=_selector; 757 final Selector new_selector = Selector.open();
758 if (selector==null) 758 for (SelectionKey k: selector.keys())
759 return; 759 {
760 final Selector new_selector = Selector.open(); 760 if (!k.isValid() || k.interestOps()==0)
761 for (SelectionKey k: selector.keys()) 761 continue;
762 { 762
763 if (!k.isValid() || k.interestOps()==0) 763 final SelectableChannel channel = k.channel();
764 continue; 764 final Object attachment = k.attachment();
765 765
766 final SelectableChannel channel = k.channel(); 766 if (attachment==null)
767 final Object attachment = k.attachment(); 767 addChange(channel);
768 768 else
769 if (attachment==null) 769 addChange(channel,attachment);
770 addChange(channel); 770 }
771 else 771 _selector.close();
772 addChange(channel,attachment); 772 _selector=new_selector;
773 } 773 }
774 _selector.close(); 774 }
775 _selector=new_selector; 775 catch(IOException e)
776 } 776 {
777 } 777 throw new RuntimeException("recreating selector",e);
778 catch(IOException e) 778 }
779 { 779 }
780 throw new RuntimeException("recreating selector",e); 780
781 } 781 /* ------------------------------------------------------------ */
782 } 782 public SelectorManager getManager()
783 783 {
784 /* ------------------------------------------------------------ */ 784 return SelectorManager.this;
785 public SelectorManager getManager() 785 }
786 { 786
787 return SelectorManager.this; 787 /* ------------------------------------------------------------ */
788 } 788 public long getNow()
789 789 {
790 /* ------------------------------------------------------------ */ 790 return _timeout.getNow();
791 public long getNow() 791 }
792 { 792
793 return _timeout.getNow(); 793 /* ------------------------------------------------------------ */
794 } 794 /**
795 795 * @param task The task to timeout. If it implements Runnable, then
796 /* ------------------------------------------------------------ */ 796 * expired will be called from a dispatched thread.
797 /** 797 *
798 * @param task The task to timeout. If it implements Runnable, then 798 * @param timeoutMs
799 * expired will be called from a dispatched thread. 799 */
800 * 800 public void scheduleTimeout(Timeout.Task task, long timeoutMs)
801 * @param timeoutMs 801 {
802 */ 802 if (!(task instanceof Runnable))
803 public void scheduleTimeout(Timeout.Task task, long timeoutMs) 803 throw new IllegalArgumentException("!Runnable");
804 { 804 _timeout.schedule(task, timeoutMs);
805 if (!(task instanceof Runnable)) 805 }
806 throw new IllegalArgumentException("!Runnable"); 806
807 _timeout.schedule(task, timeoutMs); 807 /* ------------------------------------------------------------ */
808 } 808 public void cancelTimeout(Timeout.Task task)
809 809 {
810 /* ------------------------------------------------------------ */ 810 task.cancel();
811 public void cancelTimeout(Timeout.Task task) 811 }
812 { 812
813 task.cancel(); 813 /* ------------------------------------------------------------ */
814 } 814 public void wakeup()
815 815 {
816 /* ------------------------------------------------------------ */ 816 try
817 public void wakeup() 817 {
818 { 818 Selector selector = _selector;
819 try 819 if (selector!=null)
820 { 820 selector.wakeup();
821 Selector selector = _selector; 821 }
822 if (selector!=null) 822 catch(Exception e)
823 selector.wakeup(); 823 {
824 } 824 addChange(new ChangeTask()
825 catch(Exception e) 825 {
826 { 826 public void run()
827 addChange(new ChangeTask() 827 {
828 { 828 renewSelector();
829 public void run() 829 }
830 { 830 });
831 renewSelector(); 831
832 } 832 renewSelector();
833 }); 833 }
834 834 }
835 renewSelector(); 835
836 } 836 /* ------------------------------------------------------------ */
837 } 837 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
838 838 {
839 /* ------------------------------------------------------------ */ 839 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
840 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException 840 LOG.debug("created {}",endp);
841 { 841 endPointOpened(endp);
842 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); 842 _endPoints.put(endp,this);
843 LOG.debug("created {}",endp); 843 return endp;
844 endPointOpened(endp); 844 }
845 _endPoints.put(endp,this); 845
846 return endp; 846 /* ------------------------------------------------------------ */
847 } 847 public void destroyEndPoint(SelectChannelEndPoint endp)
848 848 {
849 /* ------------------------------------------------------------ */ 849 LOG.debug("destroyEndPoint {}",endp);
850 public void destroyEndPoint(SelectChannelEndPoint endp) 850 _endPoints.remove(endp);
851 { 851 endPointClosed(endp);
852 LOG.debug("destroyEndPoint {}",endp); 852 }
853 _endPoints.remove(endp); 853
854 endPointClosed(endp); 854 /* ------------------------------------------------------------ */
855 } 855 Selector getSelector()
856 856 {
857 /* ------------------------------------------------------------ */ 857 return _selector;
858 Selector getSelector() 858 }
859 { 859
860 return _selector; 860 /* ------------------------------------------------------------ */
861 } 861 void stop() throws Exception
862 862 {
863 /* ------------------------------------------------------------ */ 863 // Spin for a while waiting for selector to complete
864 void stop() throws Exception 864 // to avoid unneccessary closed channel exceptions
865 { 865 try
866 // Spin for a while waiting for selector to complete 866 {
867 // to avoid unneccessary closed channel exceptions 867 for (int i=0;i<100 && _selecting!=null;i++)
868 try 868 {
869 { 869 wakeup();
870 for (int i=0;i<100 && _selecting!=null;i++) 870 Thread.sleep(10);
871 { 871 }
872 wakeup(); 872 }
873 Thread.sleep(10); 873 catch(Exception e)
874 } 874 {
875 } 875 LOG.trace("",e);
876 catch(Exception e) 876 }
877 { 877
878 LOG.trace("",e); 878 // close endpoints and selector
879 } 879 synchronized (this)
880 880 {
881 // close endpoints and selector 881 Selector selector=_selector;
882 synchronized (this) 882 for (SelectionKey key:selector.keys())
883 { 883 {
884 Selector selector=_selector; 884 if (key==null)
885 for (SelectionKey key:selector.keys()) 885 continue;
886 { 886 Object att=key.attachment();
887 if (key==null) 887 if (att instanceof EndPoint)
888 continue; 888 {
889 Object att=key.attachment(); 889 EndPoint endpoint = (EndPoint)att;
890 if (att instanceof EndPoint) 890 try
891 { 891 {
892 EndPoint endpoint = (EndPoint)att; 892 endpoint.close();
893 try 893 }
894 { 894 catch(IOException e)
895 endpoint.close(); 895 {
896 } 896 LOG.trace("",e);
897 catch(IOException e) 897 }
898 { 898 }
899 LOG.trace("",e); 899 }
900 } 900
901 } 901
902 } 902 _timeout.cancelAll();
903 903 try
904 904 {
905 _timeout.cancelAll(); 905 selector=_selector;
906 try 906 if (selector != null)
907 { 907 selector.close();
908 selector=_selector; 908 }
909 if (selector != null) 909 catch (IOException e)
910 selector.close(); 910 {
911 } 911 LOG.trace("",e);
912 catch (IOException e) 912 }
913 { 913 _selector=null;
914 LOG.trace("",e); 914 }
915 } 915 }
916 _selector=null; 916
917 } 917 /* ------------------------------------------------------------ */
918 } 918 public String dump()
919 919 {
920 /* ------------------------------------------------------------ */ 920 return AggregateLifeCycle.dump(this);
921 public String dump() 921 }
922 { 922
923 return AggregateLifeCycle.dump(this); 923 /* ------------------------------------------------------------ */
924 } 924 public void dump(Appendable out, String indent) throws IOException
925 925 {
926 /* ------------------------------------------------------------ */ 926 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
927 public void dump(Appendable out, String indent) throws IOException 927
928 { 928 Thread selecting = _selecting;
929 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); 929
930 930 Object where = "not selecting";
931 Thread selecting = _selecting; 931 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
932 932 if (trace!=null)
933 Object where = "not selecting"; 933 {
934 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); 934 for (StackTraceElement t:trace)
935 if (trace!=null) 935 if (t.getClassName().startsWith("org.eclipse.jetty."))
936 { 936 {
937 for (StackTraceElement t:trace) 937 where=t;
938 if (t.getClassName().startsWith("org.eclipse.jetty.")) 938 break;
939 { 939 }
940 where=t; 940 }
941 break; 941
942 } 942 Selector selector=_selector;
943 } 943 if (selector!=null)
944 944 {
945 Selector selector=_selector; 945 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
946 if (selector!=null) 946 dump.add(where);
947 { 947
948 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); 948 final CountDownLatch latch = new CountDownLatch(1);
949 dump.add(where); 949
950 950 addChange(new ChangeTask()
951 final CountDownLatch latch = new CountDownLatch(1); 951 {
952 952 public void run()
953 addChange(new ChangeTask() 953 {
954 { 954 dumpKeyState(dump);
955 public void run() 955 latch.countDown();
956 { 956 }
957 dumpKeyState(dump); 957 });
958 latch.countDown(); 958
959 } 959 try
960 }); 960 {
961 961 latch.await(5,TimeUnit.SECONDS);
962 try 962 }
963 { 963 catch(InterruptedException e)
964 latch.await(5,TimeUnit.SECONDS); 964 {
965 } 965 LOG.trace("",e);
966 catch(InterruptedException e) 966 }
967 { 967
968 LOG.trace("",e); 968 AggregateLifeCycle.dump(out,indent,dump);
969 } 969 }
970 970 }
971 AggregateLifeCycle.dump(out,indent,dump); 971
972 } 972 /* ------------------------------------------------------------ */
973 } 973 public void dumpKeyState(List<Object> dumpto)
974 974 {
975 /* ------------------------------------------------------------ */ 975 Selector selector=_selector;
976 public void dumpKeyState(List<Object> dumpto) 976 Set<SelectionKey> keys = selector.keys();
977 { 977 dumpto.add(selector + " keys=" + keys.size());
978 Selector selector=_selector; 978 for (SelectionKey key: keys)
979 Set<SelectionKey> keys = selector.keys(); 979 {
980 dumpto.add(selector + " keys=" + keys.size()); 980 if (key.isValid())
981 for (SelectionKey key: keys) 981 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
982 { 982 else
983 if (key.isValid()) 983 dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
984 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); 984 }
985 else 985 }
986 dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); 986
987 } 987 /* ------------------------------------------------------------ */
988 } 988 public String toString()
989 989 {
990 /* ------------------------------------------------------------ */ 990 Selector selector=_selector;
991 public String toString() 991 return String.format("%s keys=%d selected=%d",
992 { 992 super.toString(),
993 Selector selector=_selector; 993 selector != null && selector.isOpen() ? selector.keys().size() : -1,
994 return String.format("%s keys=%d selected=%d", 994 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
995 super.toString(), 995 }
996 selector != null && selector.isOpen() ? selector.keys().size() : -1, 996 }
997 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); 997
998 } 998 /* ------------------------------------------------------------ */
999 } 999 private static class ChannelAndAttachment
1000 1000 {
1001 /* ------------------------------------------------------------ */ 1001 final SelectableChannel _channel;
1002 private static class ChannelAndAttachment 1002 final Object _attachment;
1003 { 1003
1004 final SelectableChannel _channel; 1004 public ChannelAndAttachment(SelectableChannel channel, Object attachment)
1005 final Object _attachment; 1005 {
1006 1006 super();
1007 public ChannelAndAttachment(SelectableChannel channel, Object attachment) 1007 _channel = channel;
1008 { 1008 _attachment = attachment;
1009 super(); 1009 }
1010 _channel = channel; 1010 }
1011 _attachment = attachment; 1011
1012 } 1012 /* ------------------------------------------------------------ */
1013 } 1013 public boolean isDeferringInterestedOps0()
1014 1014 {
1015 /* ------------------------------------------------------------ */ 1015 return _deferringInterestedOps0;
1016 public boolean isDeferringInterestedOps0() 1016 }
1017 { 1017
1018 return _deferringInterestedOps0; 1018 /* ------------------------------------------------------------ */
1019 } 1019 public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
1020 1020 {
1021 /* ------------------------------------------------------------ */ 1021 _deferringInterestedOps0 = deferringInterestedOps0;
1022 public void setDeferringInterestedOps0(boolean deferringInterestedOps0) 1022 }
1023 { 1023
1024 _deferringInterestedOps0 = deferringInterestedOps0; 1024
1025 } 1025 /* ------------------------------------------------------------ */
1026 1026 /* ------------------------------------------------------------ */
1027 1027 /* ------------------------------------------------------------ */
1028 /* ------------------------------------------------------------ */ 1028 private interface ChangeTask extends Runnable
1029 /* ------------------------------------------------------------ */ 1029 {}
1030 /* ------------------------------------------------------------ */
1031 private interface ChangeTask extends Runnable
1032 {}
1033 1030
1034 } 1031 }