Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 865:6b210bb66c63
remove ThreadPool
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 02 Oct 2016 20:38:06 -0600 |
parents | 8e9db0bbf4f9 |
children | 54308d65265a |
comparison
equal
deleted
inserted
replaced
864:e21ca9878a10 | 865:6b210bb66c63 |
---|---|
56 * NIO scheduling to scale to large numbers of connections. | 56 * NIO scheduling to scale to large numbers of connections. |
57 * <p> | 57 * <p> |
58 */ | 58 */ |
59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable | 59 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable |
60 { | 60 { |
61 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); | 61 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); |
62 | 62 |
63 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); | 63 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); |
64 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); | 64 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); |
65 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); | 65 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); |
66 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); | 66 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); |
67 | 67 |
68 private int _maxIdleTime; | 68 private int _maxIdleTime; |
69 private int _lowResourcesMaxIdleTime; | 69 private int _lowResourcesMaxIdleTime; |
70 private long _lowResourcesConnections; | 70 private long _lowResourcesConnections; |
71 private SelectSet[] _selectSet; | 71 private SelectSet[] _selectSet; |
72 private int _selectSets=1; | 72 private int _selectSets=1; |
73 private volatile int _set=0; | 73 private volatile int _set=0; |
74 private boolean _deferringInterestedOps0=true; | 74 private boolean _deferringInterestedOps0=true; |
75 private int _selectorPriorityDelta=0; | 75 private int _selectorPriorityDelta=0; |
76 | 76 |
77 /* ------------------------------------------------------------ */ | 77 /* ------------------------------------------------------------ */ |
78 /** | 78 /** |
79 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. | 79 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. |
80 * @see #setLowResourcesMaxIdleTime(long) | 80 * @see #setLowResourcesMaxIdleTime(long) |
81 */ | 81 */ |
82 public void setMaxIdleTime(long maxIdleTime) | 82 public void setMaxIdleTime(long maxIdleTime) |
83 { | 83 { |
84 _maxIdleTime=(int)maxIdleTime; | 84 _maxIdleTime=(int)maxIdleTime; |
85 } | 85 } |
86 | 86 |
87 /* ------------------------------------------------------------ */ | 87 /* ------------------------------------------------------------ */ |
88 /** | 88 /** |
89 * @param selectSets number of select sets to create | 89 * @param selectSets number of select sets to create |
90 */ | 90 */ |
91 public void setSelectSets(int selectSets) | 91 public void setSelectSets(int selectSets) |
92 { | 92 { |
93 long lrc = _lowResourcesConnections * _selectSets; | 93 long lrc = _lowResourcesConnections * _selectSets; |
94 _selectSets=selectSets; | 94 _selectSets=selectSets; |
95 _lowResourcesConnections=lrc/_selectSets; | 95 _lowResourcesConnections=lrc/_selectSets; |
96 } | 96 } |
97 | 97 |
98 /* ------------------------------------------------------------ */ | 98 /* ------------------------------------------------------------ */ |
99 /** | 99 /** |
100 * @return the max idle time | 100 * @return the max idle time |
101 */ | 101 */ |
102 public long getMaxIdleTime() | 102 public long getMaxIdleTime() |
103 { | 103 { |
104 return _maxIdleTime; | 104 return _maxIdleTime; |
105 } | 105 } |
106 | 106 |
107 /* ------------------------------------------------------------ */ | 107 /* ------------------------------------------------------------ */ |
108 /** | 108 /** |
109 * @return the number of select sets in use | 109 * @return the number of select sets in use |
110 */ | 110 */ |
111 public int getSelectSets() | 111 public int getSelectSets() |
112 { | 112 { |
113 return _selectSets; | 113 return _selectSets; |
114 } | 114 } |
115 | 115 |
116 /* ------------------------------------------------------------ */ | 116 /* ------------------------------------------------------------ */ |
117 /** | 117 /** |
118 * @param i | 118 * @param i |
119 * @return The select set | 119 * @return The select set |
120 */ | 120 */ |
121 public SelectSet getSelectSet(int i) | 121 public SelectSet getSelectSet(int i) |
122 { | 122 { |
123 return _selectSet[i]; | 123 return _selectSet[i]; |
124 } | 124 } |
125 | 125 |
126 /* ------------------------------------------------------------ */ | 126 /* ------------------------------------------------------------ */ |
127 /** Register a channel | 127 /** Register a channel |
128 * @param channel | 128 * @param channel |
129 * @param att Attached Object | 129 * @param att Attached Object |
130 */ | 130 */ |
131 public void register(SocketChannel channel, Object att) | 131 public void register(SocketChannel channel, Object att) |
132 { | 132 { |
133 // The ++ increment here is not atomic, but it does not matter. | 133 // The ++ increment here is not atomic, but it does not matter. |
134 // so long as the value changes sometimes, then connections will | 134 // so long as the value changes sometimes, then connections will |
135 // be distributed over the available sets. | 135 // be distributed over the available sets. |
136 | 136 |
137 int s=_set++; | 137 int s=_set++; |
138 if (s<0) | 138 if (s<0) |
139 s=-s; | 139 s=-s; |
140 s=s%_selectSets; | 140 s=s%_selectSets; |
141 SelectSet[] sets=_selectSet; | 141 SelectSet[] sets=_selectSet; |
142 if (sets!=null) | 142 if (sets!=null) |
143 { | 143 { |
144 SelectSet set=sets[s]; | 144 SelectSet set=sets[s]; |
145 set.addChange(channel,att); | 145 set.addChange(channel,att); |
146 set.wakeup(); | 146 set.wakeup(); |
147 } | 147 } |
148 } | 148 } |
149 | 149 |
150 | 150 |
151 /* ------------------------------------------------------------ */ | 151 /* ------------------------------------------------------------ */ |
152 /** Register a channel | 152 /** Register a channel |
153 * @param channel | 153 * @param channel |
154 */ | 154 */ |
155 public void register(SocketChannel channel) | 155 public void register(SocketChannel channel) |
156 { | 156 { |
157 // The ++ increment here is not atomic, but it does not matter. | 157 // The ++ increment here is not atomic, but it does not matter. |
158 // so long as the value changes sometimes, then connections will | 158 // so long as the value changes sometimes, then connections will |
159 // be distributed over the available sets. | 159 // be distributed over the available sets. |
160 | 160 |
161 int s=_set++; | 161 int s=_set++; |
162 if (s<0) | 162 if (s<0) |
163 s=-s; | 163 s=-s; |
164 s=s%_selectSets; | 164 s=s%_selectSets; |
165 SelectSet[] sets=_selectSet; | 165 SelectSet[] sets=_selectSet; |
166 if (sets!=null) | 166 if (sets!=null) |
167 { | 167 { |
168 SelectSet set=sets[s]; | 168 SelectSet set=sets[s]; |
169 set.addChange(channel); | 169 set.addChange(channel); |
170 set.wakeup(); | 170 set.wakeup(); |
171 } | 171 } |
172 } | 172 } |
173 | 173 |
174 /* ------------------------------------------------------------ */ | 174 /* ------------------------------------------------------------ */ |
175 /** Register a {@link ServerSocketChannel} | 175 /** Register a {@link ServerSocketChannel} |
176 * @param acceptChannel | 176 * @param acceptChannel |
177 */ | 177 */ |
178 public void register(ServerSocketChannel acceptChannel) | 178 public void register(ServerSocketChannel acceptChannel) |
179 { | 179 { |
180 int s=_set++; | 180 int s=_set++; |
181 if (s<0) | 181 if (s<0) |
182 s=-s; | 182 s=-s; |
183 s=s%_selectSets; | 183 s=s%_selectSets; |
184 SelectSet set=_selectSet[s]; | 184 SelectSet set=_selectSet[s]; |
185 set.addChange(acceptChannel); | 185 set.addChange(acceptChannel); |
186 set.wakeup(); | 186 set.wakeup(); |
187 } | 187 } |
188 | 188 |
189 /* ------------------------------------------------------------ */ | 189 /* ------------------------------------------------------------ */ |
190 /** | 190 /** |
191 * @return delta The value to add to the selector thread priority. | 191 * @return delta The value to add to the selector thread priority. |
192 */ | 192 */ |
193 public int getSelectorPriorityDelta() | 193 public int getSelectorPriorityDelta() |
194 { | 194 { |
195 return _selectorPriorityDelta; | 195 return _selectorPriorityDelta; |
196 } | 196 } |
197 | 197 |
198 /* ------------------------------------------------------------ */ | 198 /* ------------------------------------------------------------ */ |
199 /** Set the selector thread priorty delta. | 199 /** Set the selector thread priorty delta. |
200 * @param delta The value to add to the selector thread priority. | 200 * @param delta The value to add to the selector thread priority. |
201 */ | 201 */ |
202 public void setSelectorPriorityDelta(int delta) | 202 public void setSelectorPriorityDelta(int delta) |
203 { | 203 { |
204 _selectorPriorityDelta=delta; | 204 _selectorPriorityDelta=delta; |
205 } | 205 } |
206 | 206 |
207 | 207 |
208 /* ------------------------------------------------------------ */ | 208 /* ------------------------------------------------------------ */ |
209 /** | 209 /** |
210 * @return the lowResourcesConnections | 210 * @return the lowResourcesConnections |
211 */ | 211 */ |
212 public long getLowResourcesConnections() | 212 public long getLowResourcesConnections() |
213 { | 213 { |
214 return _lowResourcesConnections*_selectSets; | 214 return _lowResourcesConnections*_selectSets; |
215 } | 215 } |
216 | 216 |
217 /* ------------------------------------------------------------ */ | 217 /* ------------------------------------------------------------ */ |
218 /** | 218 /** |
219 * Set the number of connections, which if exceeded places this manager in low resources state. | 219 * Set the number of connections, which if exceeded places this manager in low resources state. |
220 * This is not an exact measure as the connection count is averaged over the select sets. | 220 * This is not an exact measure as the connection count is averaged over the select sets. |
221 * @param lowResourcesConnections the number of connections | 221 * @param lowResourcesConnections the number of connections |
222 * @see #setLowResourcesMaxIdleTime(long) | 222 * @see #setLowResourcesMaxIdleTime(long) |
223 */ | 223 */ |
224 public void setLowResourcesConnections(long lowResourcesConnections) | 224 public void setLowResourcesConnections(long lowResourcesConnections) |
225 { | 225 { |
226 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; | 226 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; |
227 } | 227 } |
228 | 228 |
229 /* ------------------------------------------------------------ */ | 229 /* ------------------------------------------------------------ */ |
230 /** | 230 /** |
231 * @return the lowResourcesMaxIdleTime | 231 * @return the lowResourcesMaxIdleTime |
232 */ | 232 */ |
233 public long getLowResourcesMaxIdleTime() | 233 public long getLowResourcesMaxIdleTime() |
234 { | 234 { |
235 return _lowResourcesMaxIdleTime; | 235 return _lowResourcesMaxIdleTime; |
236 } | 236 } |
237 | 237 |
238 /* ------------------------------------------------------------ */ | 238 /* ------------------------------------------------------------ */ |
239 /** | 239 /** |
240 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} | 240 * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()} |
241 * @see #setMaxIdleTime(long) | 241 * @see #setMaxIdleTime(long) |
242 */ | 242 */ |
243 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) | 243 public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) |
244 { | 244 { |
245 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; | 245 _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; |
246 } | 246 } |
247 | 247 |
248 | 248 |
249 /* ------------------------------------------------------------------------------- */ | 249 /* ------------------------------------------------------------------------------- */ |
250 public abstract boolean dispatch(Runnable task); | 250 public abstract void execute(Runnable task); |
251 | 251 |
252 /* ------------------------------------------------------------ */ | 252 /* ------------------------------------------------------------ */ |
253 /* (non-Javadoc) | 253 /* (non-Javadoc) |
254 * @see org.eclipse.component.AbstractLifeCycle#doStart() | 254 * @see org.eclipse.component.AbstractLifeCycle#doStart() |
255 */ | 255 */ |
256 @Override | 256 @Override |
257 protected void doStart() throws Exception | 257 protected void doStart() throws Exception |
258 { | 258 { |
259 _selectSet = new SelectSet[_selectSets]; | 259 _selectSet = new SelectSet[_selectSets]; |
260 for (int i=0;i<_selectSet.length;i++) | 260 for (int i=0;i<_selectSet.length;i++) |
261 _selectSet[i]= new SelectSet(i); | 261 _selectSet[i]= new SelectSet(i); |
262 | 262 |
263 super.doStart(); | 263 super.doStart(); |
264 | 264 |
265 // start a thread to Select | 265 // start a thread to Select |
266 for (int i=0;i<getSelectSets();i++) | 266 for (int i=0;i<getSelectSets();i++) |
267 { | 267 { |
268 final int id=i; | 268 final int id=i; |
269 boolean selecting=dispatch(new Runnable() | 269 execute(new Runnable() |
270 { | 270 { |
271 public void run() | 271 public void run() |
272 { | 272 { |
273 String name=Thread.currentThread().getName(); | 273 String name=Thread.currentThread().getName(); |
274 int priority=Thread.currentThread().getPriority(); | 274 int priority=Thread.currentThread().getPriority(); |
275 try | 275 try |
276 { | 276 { |
277 SelectSet[] sets=_selectSet; | 277 SelectSet[] sets=_selectSet; |
278 if (sets==null) | 278 if (sets==null) |
279 return; | 279 return; |
280 SelectSet set=sets[id]; | 280 SelectSet set=sets[id]; |
281 | 281 |
282 Thread.currentThread().setName(name+" Selector"+id); | 282 Thread.currentThread().setName(name+" Selector"+id); |
283 if (getSelectorPriorityDelta()!=0) | 283 if (getSelectorPriorityDelta()!=0) |
284 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta()); | 284 Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta()); |
285 LOG.debug("Starting {} on {}",Thread.currentThread(),this); | 285 LOG.debug("Starting {} on {}",Thread.currentThread(),this); |
286 while (isRunning()) | 286 while (isRunning()) |
287 { | 287 { |
288 try | 288 try |
289 { | 289 { |
290 set.doSelect(); | 290 set.doSelect(); |
291 } | 291 } |
292 catch(IOException e) | 292 catch(IOException e) |
293 { | 293 { |
294 LOG.trace("",e); | 294 LOG.trace("",e); |
295 } | 295 } |
296 catch(Exception e) | 296 catch(Exception e) |
297 { | 297 { |
298 LOG.warn("",e); | 298 LOG.warn("",e); |
299 } | 299 } |
300 } | 300 } |
301 } | 301 } |
302 finally | 302 finally |
303 { | 303 { |
304 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); | 304 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); |
305 Thread.currentThread().setName(name); | 305 Thread.currentThread().setName(name); |
306 if (getSelectorPriorityDelta()!=0) | 306 if (getSelectorPriorityDelta()!=0) |
307 Thread.currentThread().setPriority(priority); | 307 Thread.currentThread().setPriority(priority); |
308 } | 308 } |
309 } | 309 } |
310 | 310 |
311 }); | 311 }); |
312 | 312 } |
313 if (!selecting) | 313 } |
314 throw new IllegalStateException("!Selecting"); | 314 |
315 } | 315 |
316 } | 316 /* ------------------------------------------------------------------------------- */ |
317 | 317 @Override |
318 | 318 protected void doStop() throws Exception |
319 /* ------------------------------------------------------------------------------- */ | 319 { |
320 @Override | 320 SelectSet[] sets= _selectSet; |
321 protected void doStop() throws Exception | 321 _selectSet=null; |
322 { | 322 if (sets!=null) |
323 SelectSet[] sets= _selectSet; | 323 { |
324 _selectSet=null; | 324 for (SelectSet set : sets) |
325 if (sets!=null) | 325 { |
326 { | 326 if (set!=null) |
327 for (SelectSet set : sets) | 327 set.stop(); |
328 { | 328 } |
329 if (set!=null) | 329 } |
330 set.stop(); | 330 super.doStop(); |
331 } | 331 } |
332 } | 332 |
333 super.doStop(); | 333 /* ------------------------------------------------------------ */ |
334 } | 334 /** |
335 | 335 * @param endpoint |
336 /* ------------------------------------------------------------ */ | 336 */ |
337 /** | 337 protected abstract void endPointClosed(SelectChannelEndPoint endpoint); |
338 * @param endpoint | 338 |
339 */ | 339 /* ------------------------------------------------------------ */ |
340 protected abstract void endPointClosed(SelectChannelEndPoint endpoint); | 340 /** |
341 | 341 * @param endpoint |
342 /* ------------------------------------------------------------ */ | 342 */ |
343 /** | 343 protected abstract void endPointOpened(SelectChannelEndPoint endpoint); |
344 * @param endpoint | 344 |
345 */ | 345 /* ------------------------------------------------------------ */ |
346 protected abstract void endPointOpened(SelectChannelEndPoint endpoint); | 346 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); |
347 | 347 |
348 /* ------------------------------------------------------------ */ | 348 /* ------------------------------------------------------------------------------- */ |
349 protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); | 349 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); |
350 | 350 |
351 /* ------------------------------------------------------------------------------- */ | 351 /* ------------------------------------------------------------ */ |
352 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); | 352 /** |
353 | 353 * Create a new end point |
354 /* ------------------------------------------------------------ */ | 354 * @param channel |
355 /** | 355 * @param selectSet |
356 * Create a new end point | 356 * @param sKey the selection key |
357 * @param channel | 357 * @return the new endpoint {@link SelectChannelEndPoint} |
358 * @param selectSet | 358 * @throws IOException |
359 * @param sKey the selection key | 359 */ |
360 * @return the new endpoint {@link SelectChannelEndPoint} | 360 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; |
361 * @throws IOException | 361 |
362 */ | 362 /* ------------------------------------------------------------------------------- */ |
363 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; | 363 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) |
364 | 364 { |
365 /* ------------------------------------------------------------------------------- */ | 365 LOG.warn(ex+","+channel+","+attachment); |
366 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) | 366 LOG.debug("",ex); |
367 { | 367 } |
368 LOG.warn(ex+","+channel+","+attachment); | 368 |
369 LOG.debug("",ex); | 369 /* ------------------------------------------------------------ */ |
370 } | 370 public String dump() |
371 | 371 { |
372 /* ------------------------------------------------------------ */ | 372 return AggregateLifeCycle.dump(this); |
373 public String dump() | 373 } |
374 { | 374 |
375 return AggregateLifeCycle.dump(this); | 375 /* ------------------------------------------------------------ */ |
376 } | 376 public void dump(Appendable out, String indent) throws IOException |
377 | 377 { |
378 /* ------------------------------------------------------------ */ | 378 AggregateLifeCycle.dumpObject(out,this); |
379 public void dump(Appendable out, String indent) throws IOException | 379 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); |
380 { | 380 } |
381 AggregateLifeCycle.dumpObject(out,this); | 381 |
382 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); | 382 |
383 } | 383 /* ------------------------------------------------------------------------------- */ |
384 | 384 /* ------------------------------------------------------------------------------- */ |
385 | 385 /* ------------------------------------------------------------------------------- */ |
386 /* ------------------------------------------------------------------------------- */ | 386 public class SelectSet implements Dumpable |
387 /* ------------------------------------------------------------------------------- */ | 387 { |
388 /* ------------------------------------------------------------------------------- */ | 388 private final int _setID; |
389 public class SelectSet implements Dumpable | 389 private final Timeout _timeout; |
390 { | 390 |
391 private final int _setID; | 391 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); |
392 private final Timeout _timeout; | 392 |
393 | 393 private volatile Selector _selector; |
394 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); | 394 |
395 | 395 private volatile Thread _selecting; |
396 private volatile Selector _selector; | 396 private int _busySelects; |
397 | 397 private long _monitorNext; |
398 private volatile Thread _selecting; | 398 private boolean _pausing; |
399 private int _busySelects; | 399 private boolean _paused; |
400 private long _monitorNext; | 400 private volatile long _idleTick; |
401 private boolean _pausing; | 401 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); |
402 private boolean _paused; | 402 |
403 private volatile long _idleTick; | 403 /* ------------------------------------------------------------ */ |
404 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); | 404 SelectSet(int acceptorID) throws Exception |
405 | 405 { |
406 /* ------------------------------------------------------------ */ | 406 _setID=acceptorID; |
407 SelectSet(int acceptorID) throws Exception | 407 |
408 { | 408 _idleTick = System.currentTimeMillis(); |
409 _setID=acceptorID; | 409 _timeout = new Timeout(this); |
410 | 410 _timeout.setDuration(0L); |
411 _idleTick = System.currentTimeMillis(); | 411 |
412 _timeout = new Timeout(this); | 412 // create a selector; |
413 _timeout.setDuration(0L); | 413 _selector = Selector.open(); |
414 | 414 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; |
415 // create a selector; | 415 } |
416 _selector = Selector.open(); | 416 |
417 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; | 417 /* ------------------------------------------------------------ */ |
418 } | 418 public void addChange(Object change) |
419 | 419 { |
420 /* ------------------------------------------------------------ */ | 420 _changes.add(change); |
421 public void addChange(Object change) | 421 } |
422 { | 422 |
423 _changes.add(change); | 423 /* ------------------------------------------------------------ */ |
424 } | 424 public void addChange(SelectableChannel channel, Object att) |
425 | 425 { |
426 /* ------------------------------------------------------------ */ | 426 if (att==null) |
427 public void addChange(SelectableChannel channel, Object att) | 427 addChange(channel); |
428 { | 428 else if (att instanceof EndPoint) |
429 if (att==null) | 429 addChange(att); |
430 addChange(channel); | 430 else |
431 else if (att instanceof EndPoint) | 431 addChange(new ChannelAndAttachment(channel,att)); |
432 addChange(att); | 432 } |
433 else | 433 |
434 addChange(new ChannelAndAttachment(channel,att)); | 434 /* ------------------------------------------------------------ */ |
435 } | 435 /** |
436 | 436 * Select and dispatch tasks found from changes and the selector. |
437 /* ------------------------------------------------------------ */ | 437 * |
438 /** | 438 * @throws IOException |
439 * Select and dispatch tasks found from changes and the selector. | 439 */ |
440 * | 440 public void doSelect() throws IOException |
441 * @throws IOException | 441 { |
442 */ | 442 try |
443 public void doSelect() throws IOException | 443 { |
444 { | 444 _selecting=Thread.currentThread(); |
445 try | 445 final Selector selector=_selector; |
446 { | 446 // Stopped concurrently ? |
447 _selecting=Thread.currentThread(); | 447 if (selector == null) |
448 final Selector selector=_selector; | 448 return; |
449 // Stopped concurrently ? | 449 |
450 if (selector == null) | 450 // Make any key changes required |
451 return; | 451 Object change; |
452 | 452 int changes=_changes.size(); |
453 // Make any key changes required | 453 while (changes-->0 && (change=_changes.poll())!=null) |
454 Object change; | 454 { |
455 int changes=_changes.size(); | 455 Channel ch=null; |
456 while (changes-->0 && (change=_changes.poll())!=null) | 456 SelectionKey key=null; |
457 { | 457 |
458 Channel ch=null; | 458 try |
459 SelectionKey key=null; | 459 { |
460 | 460 if (change instanceof EndPoint) |
461 try | 461 { |
462 { | 462 // Update the operations for a key. |
463 if (change instanceof EndPoint) | 463 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; |
464 { | 464 ch=endpoint.getChannel(); |
465 // Update the operations for a key. | 465 endpoint.doUpdateKey(); |
466 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; | 466 } |
467 ch=endpoint.getChannel(); | 467 else if (change instanceof ChannelAndAttachment) |
468 endpoint.doUpdateKey(); | 468 { |
469 } | 469 // finish accepting/connecting this connection |
470 else if (change instanceof ChannelAndAttachment) | 470 final ChannelAndAttachment asc = (ChannelAndAttachment)change; |
471 { | 471 final SelectableChannel channel=asc._channel; |
472 // finish accepting/connecting this connection | 472 ch=channel; |
473 final ChannelAndAttachment asc = (ChannelAndAttachment)change; | 473 final Object att = asc._attachment; |
474 final SelectableChannel channel=asc._channel; | 474 |
475 ch=channel; | 475 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) |
476 final Object att = asc._attachment; | 476 { |
477 | 477 key = channel.register(selector,SelectionKey.OP_READ,att); |
478 if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) | 478 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); |
479 { | 479 key.attach(endpoint); |
480 key = channel.register(selector,SelectionKey.OP_READ,att); | 480 endpoint.schedule(); |
481 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); | 481 } |
482 key.attach(endpoint); | 482 else if (channel.isOpen()) |
483 endpoint.schedule(); | 483 { |
484 } | 484 key = channel.register(selector,SelectionKey.OP_CONNECT,att); |
485 else if (channel.isOpen()) | 485 } |
486 { | 486 } |
487 key = channel.register(selector,SelectionKey.OP_CONNECT,att); | 487 else if (change instanceof SocketChannel) |
488 } | 488 { |
489 } | 489 // Newly registered channel |
490 else if (change instanceof SocketChannel) | 490 final SocketChannel channel=(SocketChannel)change; |
491 { | 491 ch=channel; |
492 // Newly registered channel | 492 key = channel.register(selector,SelectionKey.OP_READ,null); |
493 final SocketChannel channel=(SocketChannel)change; | 493 SelectChannelEndPoint endpoint = createEndPoint(channel,key); |
494 ch=channel; | 494 key.attach(endpoint); |
495 key = channel.register(selector,SelectionKey.OP_READ,null); | 495 endpoint.schedule(); |
496 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | 496 } |
497 key.attach(endpoint); | 497 else if (change instanceof ChangeTask) |
498 endpoint.schedule(); | 498 { |
499 } | 499 ((Runnable)change).run(); |
500 else if (change instanceof ChangeTask) | 500 } |
501 { | 501 else if (change instanceof Runnable) |
502 ((Runnable)change).run(); | 502 { |
503 } | 503 execute((Runnable)change); |
504 else if (change instanceof Runnable) | 504 } |
505 { | 505 else |
506 dispatch((Runnable)change); | 506 throw new IllegalArgumentException(change.toString()); |
507 } | 507 } |
508 else | 508 catch (CancelledKeyException e) |
509 throw new IllegalArgumentException(change.toString()); | 509 { |
510 } | 510 LOG.trace("",e); |
511 catch (CancelledKeyException e) | 511 } |
512 { | 512 catch (Throwable e) |
513 LOG.trace("",e); | 513 { |
514 } | 514 if (isRunning()) |
515 catch (Throwable e) | 515 LOG.warn("",e); |
516 { | 516 else |
517 if (isRunning()) | 517 LOG.debug("",e); |
518 LOG.warn("",e); | 518 |
519 else | 519 try |
520 LOG.debug("",e); | 520 { |
521 | 521 if (ch!=null) |
522 try | 522 ch.close(); |
523 { | 523 } |
524 if (ch!=null) | 524 catch(IOException e2) |
525 ch.close(); | 525 { |
526 } | 526 LOG.debug("",e2); |
527 catch(IOException e2) | 527 } |
528 { | 528 } |
529 LOG.debug("",e2); | 529 } |
530 } | 530 |
531 } | 531 |
532 } | 532 // Do and instant select to see if any connections can be handled. |
533 | 533 int selected=selector.selectNow(); |
534 | 534 |
535 // Do and instant select to see if any connections can be handled. | 535 long now=System.currentTimeMillis(); |
536 int selected=selector.selectNow(); | 536 |
537 | 537 // if no immediate things to do |
538 long now=System.currentTimeMillis(); | 538 if (selected==0 && selector.selectedKeys().isEmpty()) |
539 | 539 { |
540 // if no immediate things to do | 540 // If we are in pausing mode |
541 if (selected==0 && selector.selectedKeys().isEmpty()) | 541 if (_pausing) |
542 { | 542 { |
543 // If we are in pausing mode | 543 try |
544 if (_pausing) | 544 { |
545 { | 545 Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop |
546 try | 546 } |
547 { | 547 catch(InterruptedException e) |
548 Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop | 548 { |
549 } | 549 LOG.trace("",e); |
550 catch(InterruptedException e) | 550 } |
551 { | 551 now=System.currentTimeMillis(); |
552 LOG.trace("",e); | 552 } |
553 } | 553 |
554 now=System.currentTimeMillis(); | 554 // workout how long to wait in select |
555 } | 555 _timeout.setNow(now); |
556 | 556 long to_next_timeout=_timeout.getTimeToNext(); |
557 // workout how long to wait in select | 557 |
558 _timeout.setNow(now); | 558 long wait = _changes.size()==0?__IDLE_TICK:0L; |
559 long to_next_timeout=_timeout.getTimeToNext(); | 559 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) |
560 | 560 wait = to_next_timeout; |
561 long wait = _changes.size()==0?__IDLE_TICK:0L; | 561 |
562 if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout) | 562 // If we should wait with a select |
563 wait = to_next_timeout; | 563 if (wait>0) |
564 | 564 { |
565 // If we should wait with a select | 565 long before=now; |
566 if (wait>0) | 566 selector.select(wait); |
567 { | 567 now = System.currentTimeMillis(); |
568 long before=now; | 568 _timeout.setNow(now); |
569 selector.select(wait); | 569 |
570 now = System.currentTimeMillis(); | 570 // If we are monitoring for busy selector |
571 _timeout.setNow(now); | 571 // and this select did not wait more than 1ms |
572 | 572 if (__MONITOR_PERIOD>0 && now-before <=1) |
573 // If we are monitoring for busy selector | 573 { |
574 // and this select did not wait more than 1ms | 574 // count this as a busy select and if there have been too many this monitor cycle |
575 if (__MONITOR_PERIOD>0 && now-before <=1) | 575 if (++_busySelects>__MAX_SELECTS) |
576 { | 576 { |
577 // count this as a busy select and if there have been too many this monitor cycle | 577 // Start injecting pauses |
578 if (++_busySelects>__MAX_SELECTS) | 578 _pausing=true; |
579 { | 579 |
580 // Start injecting pauses | 580 // if this is the first pause |
581 _pausing=true; | 581 if (!_paused) |
582 | 582 { |
583 // if this is the first pause | 583 // Log and dump some status |
584 if (!_paused) | 584 _paused=true; |
585 { | 585 LOG.warn("Selector {} is too busy, pausing!",this); |
586 // Log and dump some status | 586 } |
587 _paused=true; | 587 } |
588 LOG.warn("Selector {} is too busy, pausing!",this); | 588 } |
589 } | 589 } |
590 } | 590 } |
591 } | 591 |
592 } | 592 // have we been destroyed while sleeping |
593 } | 593 if (_selector==null || !selector.isOpen()) |
594 | 594 return; |
595 // have we been destroyed while sleeping | 595 |
596 if (_selector==null || !selector.isOpen()) | 596 // Look for things to do |
597 return; | 597 for (SelectionKey key: selector.selectedKeys()) |
598 | 598 { |
599 // Look for things to do | 599 SocketChannel channel=null; |
600 for (SelectionKey key: selector.selectedKeys()) | 600 |
601 { | 601 try |
602 SocketChannel channel=null; | 602 { |
603 | 603 if (!key.isValid()) |
604 try | 604 { |
605 { | 605 key.cancel(); |
606 if (!key.isValid()) | 606 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); |
607 { | 607 if (endpoint != null) |
608 key.cancel(); | 608 endpoint.doUpdateKey(); |
609 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); | 609 continue; |
610 if (endpoint != null) | 610 } |
611 endpoint.doUpdateKey(); | 611 |
612 continue; | 612 Object att = key.attachment(); |
613 } | 613 if (att instanceof SelectChannelEndPoint) |
614 | 614 { |
615 Object att = key.attachment(); | 615 if (key.isReadable()||key.isWritable()) |
616 if (att instanceof SelectChannelEndPoint) | 616 ((SelectChannelEndPoint)att).schedule(); |
617 { | 617 } |
618 if (key.isReadable()||key.isWritable()) | 618 else if (key.isConnectable()) |
619 ((SelectChannelEndPoint)att).schedule(); | 619 { |
620 } | 620 // Complete a connection of a registered channel |
621 else if (key.isConnectable()) | 621 channel = (SocketChannel)key.channel(); |
622 { | 622 boolean connected=false; |
623 // Complete a connection of a registered channel | 623 try |
624 channel = (SocketChannel)key.channel(); | 624 { |
625 boolean connected=false; | 625 connected=channel.finishConnect(); |
626 try | 626 } |
627 { | 627 catch(Exception e) |
628 connected=channel.finishConnect(); | 628 { |
629 } | 629 connectionFailed(channel,e,att); |
630 catch(Exception e) | 630 } |
631 { | 631 finally |
632 connectionFailed(channel,e,att); | 632 { |
633 } | 633 if (connected) |
634 finally | 634 { |
635 { | 635 key.interestOps(SelectionKey.OP_READ); |
636 if (connected) | 636 SelectChannelEndPoint endpoint = createEndPoint(channel,key); |
637 { | 637 key.attach(endpoint); |
638 key.interestOps(SelectionKey.OP_READ); | 638 endpoint.schedule(); |
639 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | 639 } |
640 key.attach(endpoint); | 640 else |
641 endpoint.schedule(); | 641 { |
642 } | 642 key.cancel(); |
643 else | 643 channel.close(); |
644 { | 644 } |
645 key.cancel(); | 645 } |
646 channel.close(); | 646 } |
647 } | 647 else |
648 } | 648 { |
649 } | 649 // Wrap readable registered channel in an endpoint |
650 else | 650 channel = (SocketChannel)key.channel(); |
651 { | 651 SelectChannelEndPoint endpoint = createEndPoint(channel,key); |
652 // Wrap readable registered channel in an endpoint | 652 key.attach(endpoint); |
653 channel = (SocketChannel)key.channel(); | 653 if (key.isReadable()) |
654 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | 654 endpoint.schedule(); |
655 key.attach(endpoint); | 655 } |
656 if (key.isReadable()) | 656 key = null; |
657 endpoint.schedule(); | 657 } |
658 } | 658 catch (CancelledKeyException e) |
659 key = null; | 659 { |
660 } | 660 LOG.trace("",e); |
661 catch (CancelledKeyException e) | 661 } |
662 { | 662 catch (Exception e) |
663 LOG.trace("",e); | 663 { |
664 } | 664 if (isRunning()) |
665 catch (Exception e) | 665 LOG.warn("",e); |
666 { | 666 else |
667 if (isRunning()) | 667 LOG.trace("",e); |
668 LOG.warn("",e); | 668 |
669 else | 669 try |
670 LOG.trace("",e); | 670 { |
671 | 671 if (channel!=null) |
672 try | 672 channel.close(); |
673 { | 673 } |
674 if (channel!=null) | 674 catch(IOException e2) |
675 channel.close(); | 675 { |
676 } | 676 LOG.debug("",e2); |
677 catch(IOException e2) | 677 } |
678 { | 678 |
679 LOG.debug("",e2); | 679 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) |
680 } | 680 key.cancel(); |
681 | 681 } |
682 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) | 682 } |
683 key.cancel(); | 683 |
684 } | 684 // Everything always handled |
685 } | 685 selector.selectedKeys().clear(); |
686 | 686 |
687 // Everything always handled | 687 now=System.currentTimeMillis(); |
688 selector.selectedKeys().clear(); | 688 _timeout.setNow(now); |
689 | 689 Task task = _timeout.expired(); |
690 now=System.currentTimeMillis(); | 690 while (task!=null) |
691 _timeout.setNow(now); | 691 { |
692 Task task = _timeout.expired(); | 692 if (task instanceof Runnable) |
693 while (task!=null) | 693 execute((Runnable)task); |
694 { | 694 task = _timeout.expired(); |
695 if (task instanceof Runnable) | 695 } |
696 dispatch((Runnable)task); | 696 |
697 task = _timeout.expired(); | 697 // Idle tick |
698 } | 698 if (now-_idleTick>__IDLE_TICK) |
699 | 699 { |
700 // Idle tick | 700 _idleTick=now; |
701 if (now-_idleTick>__IDLE_TICK) | 701 |
702 { | 702 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) |
703 _idleTick=now; | 703 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) |
704 | 704 :now; |
705 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) | 705 |
706 ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) | 706 execute(new Runnable() |
707 :now; | 707 { |
708 | 708 public void run() |
709 dispatch(new Runnable() | 709 { |
710 { | 710 for (SelectChannelEndPoint endp:_endPoints.keySet()) |
711 public void run() | 711 { |
712 { | 712 endp.checkIdleTimestamp(idle_now); |
713 for (SelectChannelEndPoint endp:_endPoints.keySet()) | 713 } |
714 { | 714 } |
715 endp.checkIdleTimestamp(idle_now); | 715 public String toString() {return "Idle-"+super.toString();} |
716 } | 716 }); |
717 } | 717 |
718 public String toString() {return "Idle-"+super.toString();} | 718 } |
719 }); | 719 |
720 | 720 // Reset busy select monitor counts |
721 } | 721 if (__MONITOR_PERIOD>0 && now>_monitorNext) |
722 | 722 { |
723 // Reset busy select monitor counts | 723 _busySelects=0; |
724 if (__MONITOR_PERIOD>0 && now>_monitorNext) | 724 _pausing=false; |
725 { | 725 _monitorNext=now+__MONITOR_PERIOD; |
726 _busySelects=0; | 726 |
727 _pausing=false; | 727 } |
728 _monitorNext=now+__MONITOR_PERIOD; | 728 } |
729 | 729 catch (ClosedSelectorException e) |
730 } | 730 { |
731 } | 731 if (isRunning()) |
732 catch (ClosedSelectorException e) | 732 LOG.warn("",e); |
733 { | 733 else |
734 if (isRunning()) | 734 LOG.trace("",e); |
735 LOG.warn("",e); | 735 } |
736 else | 736 catch (CancelledKeyException e) |
737 LOG.trace("",e); | 737 { |
738 } | 738 LOG.trace("",e); |
739 catch (CancelledKeyException e) | 739 } |
740 { | 740 finally |
741 LOG.trace("",e); | 741 { |
742 } | 742 _selecting=null; |
743 finally | 743 } |
744 { | 744 } |
745 _selecting=null; | 745 |
746 } | 746 |
747 } | 747 /* ------------------------------------------------------------ */ |
748 | 748 private void renewSelector() |
749 | 749 { |
750 /* ------------------------------------------------------------ */ | 750 try |
751 private void renewSelector() | 751 { |
752 { | 752 synchronized (this) |
753 try | 753 { |
754 { | 754 Selector selector=_selector; |
755 synchronized (this) | 755 if (selector==null) |
756 { | 756 return; |
757 Selector selector=_selector; | 757 final Selector new_selector = Selector.open(); |
758 if (selector==null) | 758 for (SelectionKey k: selector.keys()) |
759 return; | 759 { |
760 final Selector new_selector = Selector.open(); | 760 if (!k.isValid() || k.interestOps()==0) |
761 for (SelectionKey k: selector.keys()) | 761 continue; |
762 { | 762 |
763 if (!k.isValid() || k.interestOps()==0) | 763 final SelectableChannel channel = k.channel(); |
764 continue; | 764 final Object attachment = k.attachment(); |
765 | 765 |
766 final SelectableChannel channel = k.channel(); | 766 if (attachment==null) |
767 final Object attachment = k.attachment(); | 767 addChange(channel); |
768 | 768 else |
769 if (attachment==null) | 769 addChange(channel,attachment); |
770 addChange(channel); | 770 } |
771 else | 771 _selector.close(); |
772 addChange(channel,attachment); | 772 _selector=new_selector; |
773 } | 773 } |
774 _selector.close(); | 774 } |
775 _selector=new_selector; | 775 catch(IOException e) |
776 } | 776 { |
777 } | 777 throw new RuntimeException("recreating selector",e); |
778 catch(IOException e) | 778 } |
779 { | 779 } |
780 throw new RuntimeException("recreating selector",e); | 780 |
781 } | 781 /* ------------------------------------------------------------ */ |
782 } | 782 public SelectorManager getManager() |
783 | 783 { |
784 /* ------------------------------------------------------------ */ | 784 return SelectorManager.this; |
785 public SelectorManager getManager() | 785 } |
786 { | 786 |
787 return SelectorManager.this; | 787 /* ------------------------------------------------------------ */ |
788 } | 788 public long getNow() |
789 | 789 { |
790 /* ------------------------------------------------------------ */ | 790 return _timeout.getNow(); |
791 public long getNow() | 791 } |
792 { | 792 |
793 return _timeout.getNow(); | 793 /* ------------------------------------------------------------ */ |
794 } | 794 /** |
795 | 795 * @param task The task to timeout. If it implements Runnable, then |
796 /* ------------------------------------------------------------ */ | 796 * expired will be called from a dispatched thread. |
797 /** | 797 * |
798 * @param task The task to timeout. If it implements Runnable, then | 798 * @param timeoutMs |
799 * expired will be called from a dispatched thread. | 799 */ |
800 * | 800 public void scheduleTimeout(Timeout.Task task, long timeoutMs) |
801 * @param timeoutMs | 801 { |
802 */ | 802 if (!(task instanceof Runnable)) |
803 public void scheduleTimeout(Timeout.Task task, long timeoutMs) | 803 throw new IllegalArgumentException("!Runnable"); |
804 { | 804 _timeout.schedule(task, timeoutMs); |
805 if (!(task instanceof Runnable)) | 805 } |
806 throw new IllegalArgumentException("!Runnable"); | 806 |
807 _timeout.schedule(task, timeoutMs); | 807 /* ------------------------------------------------------------ */ |
808 } | 808 public void cancelTimeout(Timeout.Task task) |
809 | 809 { |
810 /* ------------------------------------------------------------ */ | 810 task.cancel(); |
811 public void cancelTimeout(Timeout.Task task) | 811 } |
812 { | 812 |
813 task.cancel(); | 813 /* ------------------------------------------------------------ */ |
814 } | 814 public void wakeup() |
815 | 815 { |
816 /* ------------------------------------------------------------ */ | 816 try |
817 public void wakeup() | 817 { |
818 { | 818 Selector selector = _selector; |
819 try | 819 if (selector!=null) |
820 { | 820 selector.wakeup(); |
821 Selector selector = _selector; | 821 } |
822 if (selector!=null) | 822 catch(Exception e) |
823 selector.wakeup(); | 823 { |
824 } | 824 addChange(new ChangeTask() |
825 catch(Exception e) | 825 { |
826 { | 826 public void run() |
827 addChange(new ChangeTask() | 827 { |
828 { | 828 renewSelector(); |
829 public void run() | 829 } |
830 { | 830 }); |
831 renewSelector(); | 831 |
832 } | 832 renewSelector(); |
833 }); | 833 } |
834 | 834 } |
835 renewSelector(); | 835 |
836 } | 836 /* ------------------------------------------------------------ */ |
837 } | 837 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException |
838 | 838 { |
839 /* ------------------------------------------------------------ */ | 839 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); |
840 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException | 840 LOG.debug("created {}",endp); |
841 { | 841 endPointOpened(endp); |
842 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); | 842 _endPoints.put(endp,this); |
843 LOG.debug("created {}",endp); | 843 return endp; |
844 endPointOpened(endp); | 844 } |
845 _endPoints.put(endp,this); | 845 |
846 return endp; | 846 /* ------------------------------------------------------------ */ |
847 } | 847 public void destroyEndPoint(SelectChannelEndPoint endp) |
848 | 848 { |
849 /* ------------------------------------------------------------ */ | 849 LOG.debug("destroyEndPoint {}",endp); |
850 public void destroyEndPoint(SelectChannelEndPoint endp) | 850 _endPoints.remove(endp); |
851 { | 851 endPointClosed(endp); |
852 LOG.debug("destroyEndPoint {}",endp); | 852 } |
853 _endPoints.remove(endp); | 853 |
854 endPointClosed(endp); | 854 /* ------------------------------------------------------------ */ |
855 } | 855 Selector getSelector() |
856 | 856 { |
857 /* ------------------------------------------------------------ */ | 857 return _selector; |
858 Selector getSelector() | 858 } |
859 { | 859 |
860 return _selector; | 860 /* ------------------------------------------------------------ */ |
861 } | 861 void stop() throws Exception |
862 | 862 { |
863 /* ------------------------------------------------------------ */ | 863 // Spin for a while waiting for selector to complete |
864 void stop() throws Exception | 864 // to avoid unneccessary closed channel exceptions |
865 { | 865 try |
866 // Spin for a while waiting for selector to complete | 866 { |
867 // to avoid unneccessary closed channel exceptions | 867 for (int i=0;i<100 && _selecting!=null;i++) |
868 try | 868 { |
869 { | 869 wakeup(); |
870 for (int i=0;i<100 && _selecting!=null;i++) | 870 Thread.sleep(10); |
871 { | 871 } |
872 wakeup(); | 872 } |
873 Thread.sleep(10); | 873 catch(Exception e) |
874 } | 874 { |
875 } | 875 LOG.trace("",e); |
876 catch(Exception e) | 876 } |
877 { | 877 |
878 LOG.trace("",e); | 878 // close endpoints and selector |
879 } | 879 synchronized (this) |
880 | 880 { |
881 // close endpoints and selector | 881 Selector selector=_selector; |
882 synchronized (this) | 882 for (SelectionKey key:selector.keys()) |
883 { | 883 { |
884 Selector selector=_selector; | 884 if (key==null) |
885 for (SelectionKey key:selector.keys()) | 885 continue; |
886 { | 886 Object att=key.attachment(); |
887 if (key==null) | 887 if (att instanceof EndPoint) |
888 continue; | 888 { |
889 Object att=key.attachment(); | 889 EndPoint endpoint = (EndPoint)att; |
890 if (att instanceof EndPoint) | 890 try |
891 { | 891 { |
892 EndPoint endpoint = (EndPoint)att; | 892 endpoint.close(); |
893 try | 893 } |
894 { | 894 catch(IOException e) |
895 endpoint.close(); | 895 { |
896 } | 896 LOG.trace("",e); |
897 catch(IOException e) | 897 } |
898 { | 898 } |
899 LOG.trace("",e); | 899 } |
900 } | 900 |
901 } | 901 |
902 } | 902 _timeout.cancelAll(); |
903 | 903 try |
904 | 904 { |
905 _timeout.cancelAll(); | 905 selector=_selector; |
906 try | 906 if (selector != null) |
907 { | 907 selector.close(); |
908 selector=_selector; | 908 } |
909 if (selector != null) | 909 catch (IOException e) |
910 selector.close(); | 910 { |
911 } | 911 LOG.trace("",e); |
912 catch (IOException e) | 912 } |
913 { | 913 _selector=null; |
914 LOG.trace("",e); | 914 } |
915 } | 915 } |
916 _selector=null; | 916 |
917 } | 917 /* ------------------------------------------------------------ */ |
918 } | 918 public String dump() |
919 | 919 { |
920 /* ------------------------------------------------------------ */ | 920 return AggregateLifeCycle.dump(this); |
921 public String dump() | 921 } |
922 { | 922 |
923 return AggregateLifeCycle.dump(this); | 923 /* ------------------------------------------------------------ */ |
924 } | 924 public void dump(Appendable out, String indent) throws IOException |
925 | 925 { |
926 /* ------------------------------------------------------------ */ | 926 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); |
927 public void dump(Appendable out, String indent) throws IOException | 927 |
928 { | 928 Thread selecting = _selecting; |
929 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); | 929 |
930 | 930 Object where = "not selecting"; |
931 Thread selecting = _selecting; | 931 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); |
932 | 932 if (trace!=null) |
933 Object where = "not selecting"; | 933 { |
934 StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace(); | 934 for (StackTraceElement t:trace) |
935 if (trace!=null) | 935 if (t.getClassName().startsWith("org.eclipse.jetty.")) |
936 { | 936 { |
937 for (StackTraceElement t:trace) | 937 where=t; |
938 if (t.getClassName().startsWith("org.eclipse.jetty.")) | 938 break; |
939 { | 939 } |
940 where=t; | 940 } |
941 break; | 941 |
942 } | 942 Selector selector=_selector; |
943 } | 943 if (selector!=null) |
944 | 944 { |
945 Selector selector=_selector; | 945 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); |
946 if (selector!=null) | 946 dump.add(where); |
947 { | 947 |
948 final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2); | 948 final CountDownLatch latch = new CountDownLatch(1); |
949 dump.add(where); | 949 |
950 | 950 addChange(new ChangeTask() |
951 final CountDownLatch latch = new CountDownLatch(1); | 951 { |
952 | 952 public void run() |
953 addChange(new ChangeTask() | 953 { |
954 { | 954 dumpKeyState(dump); |
955 public void run() | 955 latch.countDown(); |
956 { | 956 } |
957 dumpKeyState(dump); | 957 }); |
958 latch.countDown(); | 958 |
959 } | 959 try |
960 }); | 960 { |
961 | 961 latch.await(5,TimeUnit.SECONDS); |
962 try | 962 } |
963 { | 963 catch(InterruptedException e) |
964 latch.await(5,TimeUnit.SECONDS); | 964 { |
965 } | 965 LOG.trace("",e); |
966 catch(InterruptedException e) | 966 } |
967 { | 967 |
968 LOG.trace("",e); | 968 AggregateLifeCycle.dump(out,indent,dump); |
969 } | 969 } |
970 | 970 } |
971 AggregateLifeCycle.dump(out,indent,dump); | 971 |
972 } | 972 /* ------------------------------------------------------------ */ |
973 } | 973 public void dumpKeyState(List<Object> dumpto) |
974 | 974 { |
975 /* ------------------------------------------------------------ */ | 975 Selector selector=_selector; |
976 public void dumpKeyState(List<Object> dumpto) | 976 Set<SelectionKey> keys = selector.keys(); |
977 { | 977 dumpto.add(selector + " keys=" + keys.size()); |
978 Selector selector=_selector; | 978 for (SelectionKey key: keys) |
979 Set<SelectionKey> keys = selector.keys(); | 979 { |
980 dumpto.add(selector + " keys=" + keys.size()); | 980 if (key.isValid()) |
981 for (SelectionKey key: keys) | 981 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); |
982 { | 982 else |
983 if (key.isValid()) | 983 dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); |
984 dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps()); | 984 } |
985 else | 985 } |
986 dumpto.add(key.attachment()+" iOps=-1 rOps=-1"); | 986 |
987 } | 987 /* ------------------------------------------------------------ */ |
988 } | 988 public String toString() |
989 | 989 { |
990 /* ------------------------------------------------------------ */ | 990 Selector selector=_selector; |
991 public String toString() | 991 return String.format("%s keys=%d selected=%d", |
992 { | 992 super.toString(), |
993 Selector selector=_selector; | 993 selector != null && selector.isOpen() ? selector.keys().size() : -1, |
994 return String.format("%s keys=%d selected=%d", | 994 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); |
995 super.toString(), | 995 } |
996 selector != null && selector.isOpen() ? selector.keys().size() : -1, | 996 } |
997 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); | 997 |
998 } | 998 /* ------------------------------------------------------------ */ |
999 } | 999 private static class ChannelAndAttachment |
1000 | 1000 { |
1001 /* ------------------------------------------------------------ */ | 1001 final SelectableChannel _channel; |
1002 private static class ChannelAndAttachment | 1002 final Object _attachment; |
1003 { | 1003 |
1004 final SelectableChannel _channel; | 1004 public ChannelAndAttachment(SelectableChannel channel, Object attachment) |
1005 final Object _attachment; | 1005 { |
1006 | 1006 super(); |
1007 public ChannelAndAttachment(SelectableChannel channel, Object attachment) | 1007 _channel = channel; |
1008 { | 1008 _attachment = attachment; |
1009 super(); | 1009 } |
1010 _channel = channel; | 1010 } |
1011 _attachment = attachment; | 1011 |
1012 } | 1012 /* ------------------------------------------------------------ */ |
1013 } | 1013 public boolean isDeferringInterestedOps0() |
1014 | 1014 { |
1015 /* ------------------------------------------------------------ */ | 1015 return _deferringInterestedOps0; |
1016 public boolean isDeferringInterestedOps0() | 1016 } |
1017 { | 1017 |
1018 return _deferringInterestedOps0; | 1018 /* ------------------------------------------------------------ */ |
1019 } | 1019 public void setDeferringInterestedOps0(boolean deferringInterestedOps0) |
1020 | 1020 { |
1021 /* ------------------------------------------------------------ */ | 1021 _deferringInterestedOps0 = deferringInterestedOps0; |
1022 public void setDeferringInterestedOps0(boolean deferringInterestedOps0) | 1022 } |
1023 { | 1023 |
1024 _deferringInterestedOps0 = deferringInterestedOps0; | 1024 |
1025 } | 1025 /* ------------------------------------------------------------ */ |
1026 | 1026 /* ------------------------------------------------------------ */ |
1027 | 1027 /* ------------------------------------------------------------ */ |
1028 /* ------------------------------------------------------------ */ | 1028 private interface ChangeTask extends Runnable |
1029 /* ------------------------------------------------------------ */ | 1029 {} |
1030 /* ------------------------------------------------------------ */ | |
1031 private interface ChangeTask extends Runnable | |
1032 {} | |
1033 | 1030 |
1034 } | 1031 } |