Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 955:6f49b8dfffe6
simplify SelectChannelEndPoint
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 13 Oct 2016 16:55:53 -0600 |
parents | 7db4a488fc82 |
children | 1094975d013b |
comparison
equal
deleted
inserted
replaced
954:a021c4c9c244 | 955:6f49b8dfffe6 |
---|---|
39 | 39 |
40 /* ------------------------------------------------------------ */ | 40 /* ------------------------------------------------------------ */ |
41 /** | 41 /** |
42 * An Endpoint that can be scheduled by {@link SelectorManager}. | 42 * An Endpoint that can be scheduled by {@link SelectorManager}. |
43 */ | 43 */ |
44 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint | 44 public final class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint |
45 { | 45 { |
46 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); | 46 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); |
47 | 47 |
48 private final SelectorManager.SelectSet _selectSet; | 48 private final SelectorManager.SelectSet _selectSet; |
49 private final SelectorManager _manager; | 49 private final SelectorManager _manager; |
67 private static final int STATE_NEEDS_DISPATCH = -1; | 67 private static final int STATE_NEEDS_DISPATCH = -1; |
68 private static final int STATE_UNDISPATCHED = 0; | 68 private static final int STATE_UNDISPATCHED = 0; |
69 private static final int STATE_DISPATCHED = 1; | 69 private static final int STATE_DISPATCHED = 1; |
70 private int _state; | 70 private int _state; |
71 | 71 |
72 private boolean _onIdle; | |
73 | |
74 /** true if the last write operation succeed and wrote all offered bytes */ | 72 /** true if the last write operation succeed and wrote all offered bytes */ |
75 private volatile boolean _writable = true; | 73 private volatile boolean _writable = true; |
76 | 74 |
77 | |
78 /** True if a thread has is blocked in {@link #blockReadable(long)} */ | 75 /** True if a thread has is blocked in {@link #blockReadable(long)} */ |
79 private boolean _readBlocked; | 76 private boolean _readBlocked; |
80 | 77 |
81 /** True if a thread has is blocked in {@link #blockWritable(long)} */ | 78 /** True if a thread has is blocked in {@link #blockWritable(long)} */ |
82 private boolean _writeBlocked; | 79 private boolean _writeBlocked; |
83 | 80 |
84 /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ | 81 /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ |
85 private boolean _open; | 82 private boolean _open; |
86 | 83 |
87 private volatile long _idleTimestamp; | |
88 private volatile boolean _checkIdle; | 84 private volatile boolean _checkIdle; |
89 | 85 |
90 private boolean _ishut; | 86 private boolean _ishut; |
91 | 87 |
92 /* ------------------------------------------------------------ */ | |
93 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) | 88 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) |
94 throws IOException | 89 throws IOException |
95 { | 90 { |
96 super(channel, maxIdleTime); | 91 super(channel, maxIdleTime); |
97 | 92 |
98 _manager = selectSet.getManager(); | 93 _manager = selectSet.getManager(); |
99 _selectSet = selectSet; | 94 _selectSet = selectSet; |
100 _state=STATE_UNDISPATCHED; | 95 _state = STATE_UNDISPATCHED; |
101 _onIdle=false; | 96 _open = true; |
102 _open=true; | |
103 _key = key; | 97 _key = key; |
104 | 98 |
105 setCheckForIdle(true); | 99 setCheckForIdle(true); |
106 } | 100 } |
107 | 101 |
108 /* ------------------------------------------------------------ */ | 102 @Override |
109 public SelectionKey getSelectionKey() | |
110 { | |
111 synchronized (this) | |
112 { | |
113 return _key; | |
114 } | |
115 } | |
116 | |
117 public SelectorManager getSelectManager() | |
118 { | |
119 return _manager; | |
120 } | |
121 | |
122 public Connection getConnection() | 103 public Connection getConnection() |
123 { | 104 { |
124 return _connection; | 105 return _connection; |
125 } | 106 } |
126 | 107 |
108 @Override | |
127 public void setConnection(Connection connection) | 109 public void setConnection(Connection connection) |
128 { | 110 { |
129 _connection = (AsyncConnection)connection; | 111 _connection = (AsyncConnection)connection; |
130 } | 112 } |
131 | 113 |
136 public synchronized void schedule() | 118 public synchronized void schedule() |
137 { | 119 { |
138 // If there is no key, then do nothing | 120 // If there is no key, then do nothing |
139 if (_key == null || !_key.isValid()) | 121 if (_key == null || !_key.isValid()) |
140 { | 122 { |
141 _readBlocked=false; | 123 _readBlocked = false; |
142 _writeBlocked=false; | 124 _writeBlocked = false; |
143 this.notifyAll(); | 125 this.notifyAll(); |
144 return; | 126 return; |
145 } | 127 } |
146 | 128 |
147 // If there are threads dispatched reading and writing | 129 // If there are threads dispatched reading and writing |
149 { | 131 { |
150 // assert _dispatched; | 132 // assert _dispatched; |
151 if (_readBlocked && _key.isReadable()) | 133 if (_readBlocked && _key.isReadable()) |
152 _readBlocked=false; | 134 _readBlocked=false; |
153 if (_writeBlocked && _key.isWritable()) | 135 if (_writeBlocked && _key.isWritable()) |
154 _writeBlocked=false; | 136 _writeBlocked = false; |
155 | 137 |
156 // wake them up is as good as a dispatched. | 138 // wake them up is as good as a dispatched. |
157 this.notifyAll(); | 139 this.notifyAll(); |
158 | 140 |
159 // we are not interested in further selecting | 141 // we are not interested in further selecting |
180 // other wise do the dispatch | 162 // other wise do the dispatch |
181 dispatch(); | 163 dispatch(); |
182 } | 164 } |
183 } | 165 } |
184 | 166 |
167 @Override | |
185 public synchronized void dispatch() | 168 public synchronized void dispatch() |
186 { | 169 { |
187 if (_state<=STATE_UNDISPATCHED) | 170 if (_state<=STATE_UNDISPATCHED) |
188 { | 171 { |
189 if (_onIdle) | 172 _state = STATE_DISPATCHED; |
173 try { | |
174 _manager.execute(_handler); | |
175 } catch(RejectedExecutionException e) { | |
190 _state = STATE_NEEDS_DISPATCH; | 176 _state = STATE_NEEDS_DISPATCH; |
191 else | 177 LOG.warn("Dispatched Failed! "+this+" to "+_manager); |
192 { | 178 updateKey(); |
193 _state = STATE_DISPATCHED; | |
194 try { | |
195 _manager.execute(_handler); | |
196 } catch(RejectedExecutionException e) { | |
197 _state = STATE_NEEDS_DISPATCH; | |
198 LOG.warn("Dispatched Failed! "+this+" to "+_manager); | |
199 updateKey(); | |
200 } | |
201 } | 179 } |
202 } | 180 } |
203 } | 181 } |
204 | 182 |
205 /* ------------------------------------------------------------ */ | 183 /* ------------------------------------------------------------ */ |
207 * Called when a dispatched thread is no longer handling the endpoint. | 185 * Called when a dispatched thread is no longer handling the endpoint. |
208 * The selection key operations are updated. | 186 * The selection key operations are updated. |
209 * @return If false is returned, the endpoint has been redispatched and | 187 * @return If false is returned, the endpoint has been redispatched and |
210 * thread must keep handling the endpoint. | 188 * thread must keep handling the endpoint. |
211 */ | 189 */ |
212 protected synchronized void undispatch() | 190 private synchronized void undispatch() |
213 { | 191 { |
214 _state = STATE_UNDISPATCHED; | 192 _state = STATE_UNDISPATCHED; |
215 updateKey(); | 193 updateKey(); |
216 } | 194 } |
217 | 195 |
196 @Override | |
218 public void setCheckForIdle(boolean check) | 197 public void setCheckForIdle(boolean check) |
219 { | 198 { |
220 if (check) | 199 if (check) |
221 { | 200 { |
222 _idleTimestamp=System.currentTimeMillis(); | 201 _checkIdle = true; |
223 _checkIdle=true; | |
224 } | 202 } |
225 else | 203 else |
226 _checkIdle=false; | 204 _checkIdle = false; |
227 } | |
228 | |
229 private boolean isCheckForIdle() | |
230 { | |
231 return _checkIdle; | |
232 } | |
233 | |
234 protected void notIdle() | |
235 { | |
236 _idleTimestamp=System.currentTimeMillis(); | |
237 } | |
238 | |
239 public void checkIdleTimestamp(long now) | |
240 { | |
241 if (isCheckForIdle() && _maxIdleTime>0) | |
242 { | |
243 final long idleForMs=now-_idleTimestamp; | |
244 | |
245 if (idleForMs>_maxIdleTime) | |
246 { | |
247 // Don't idle out again until onIdleExpired task completes. | |
248 setCheckForIdle(false); | |
249 _manager.execute(new Runnable() | |
250 { | |
251 public void run() | |
252 { | |
253 try | |
254 { | |
255 onIdleExpired(idleForMs); | |
256 } | |
257 finally | |
258 { | |
259 setCheckForIdle(true); | |
260 } | |
261 } | |
262 }); | |
263 } | |
264 } | |
265 } | |
266 | |
267 private void onIdleExpired(long idleForMs) | |
268 { | |
269 try | |
270 { | |
271 synchronized (this) | |
272 { | |
273 _onIdle=true; | |
274 } | |
275 | |
276 _connection.onIdleExpired(idleForMs); | |
277 } | |
278 finally | |
279 { | |
280 synchronized (this) | |
281 { | |
282 _onIdle=false; | |
283 if (_state==STATE_NEEDS_DISPATCH) | |
284 dispatch(); | |
285 } | |
286 } | |
287 } | 205 } |
288 | 206 |
289 @Override | 207 @Override |
290 public int fill(Buffer buffer) throws IOException | 208 public int fill(Buffer buffer) throws IOException |
291 { | 209 { |
292 int fill=super.fill(buffer); | 210 int fill=super.fill(buffer); |
293 if (fill>0) | |
294 notIdle(); | |
295 return fill; | 211 return fill; |
296 } | 212 } |
297 | 213 |
298 @Override | 214 @Override |
299 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException | 215 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException |
303 // If there was something to write and it wasn't written, then we are not writable. | 219 // If there was something to write and it wasn't written, then we are not writable. |
304 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) | 220 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) |
305 { | 221 { |
306 synchronized (this) | 222 synchronized (this) |
307 { | 223 { |
308 _writable=false; | 224 _writable = false; |
309 if (_state<STATE_DISPATCHED) | 225 if (_state<STATE_DISPATCHED) |
310 updateKey(); | 226 updateKey(); |
311 } | 227 } |
312 } | 228 } |
313 else if (l>0) | 229 else if (l>0) |
314 { | 230 { |
315 _writable=true; | 231 _writable = true; |
316 notIdle(); | |
317 } | 232 } |
318 return l; | 233 return l; |
319 } | 234 } |
320 | 235 |
321 @Override | 236 @Override |
326 // If there was something to write and it wasn't written, then we are not writable. | 241 // If there was something to write and it wasn't written, then we are not writable. |
327 if (l==0 && buffer!=null && buffer.hasContent()) | 242 if (l==0 && buffer!=null && buffer.hasContent()) |
328 { | 243 { |
329 synchronized (this) | 244 synchronized (this) |
330 { | 245 { |
331 _writable=false; | 246 _writable = false; |
332 if (_state<STATE_DISPATCHED) | 247 if (_state<STATE_DISPATCHED) |
333 updateKey(); | 248 updateKey(); |
334 } | 249 } |
335 } | 250 } |
336 else if (l>0) | 251 else if (l>0) |
337 { | 252 { |
338 _writable=true; | 253 _writable = true; |
339 notIdle(); | |
340 } | 254 } |
341 | 255 |
342 return l; | 256 return l; |
343 } | 257 } |
344 | 258 |
352 synchronized (this) | 266 synchronized (this) |
353 { | 267 { |
354 if (isInputShutdown()) | 268 if (isInputShutdown()) |
355 throw new EofException(); | 269 throw new EofException(); |
356 | 270 |
271 long now = _selectSet.getNow(); | |
272 long end = now+timeoutMs; | |
273 boolean check = _checkIdle; | |
274 setCheckForIdle(true); | |
275 try | |
276 { | |
277 _readBlocked = true; | |
278 while (!isInputShutdown() && _readBlocked) | |
279 { | |
280 try | |
281 { | |
282 updateKey(); | |
283 this.wait(timeoutMs>0?(end-now):10000); | |
284 } | |
285 catch (final InterruptedException e) | |
286 { | |
287 LOG.warn("",e); | |
288 } | |
289 finally | |
290 { | |
291 now=_selectSet.getNow(); | |
292 } | |
293 | |
294 if (_readBlocked && timeoutMs>0 && now>=end) | |
295 return false; | |
296 } | |
297 } | |
298 finally | |
299 { | |
300 _readBlocked = false; | |
301 setCheckForIdle(check); | |
302 } | |
303 } | |
304 return true; | |
305 } | |
306 | |
307 /* ------------------------------------------------------------ */ | |
308 /* | |
309 * Allows thread to block waiting for further events. | |
310 */ | |
311 @Override | |
312 public boolean blockWritable(long timeoutMs) throws IOException | |
313 { | |
314 synchronized (this) | |
315 { | |
316 if (isOutputShutdown()) | |
317 throw new EofException(); | |
318 | |
357 long now=_selectSet.getNow(); | 319 long now=_selectSet.getNow(); |
358 long end=now+timeoutMs; | 320 long end=now+timeoutMs; |
359 boolean check=isCheckForIdle(); | 321 boolean check = _checkIdle; |
360 setCheckForIdle(true); | 322 setCheckForIdle(true); |
361 try | 323 try |
362 { | 324 { |
363 _readBlocked=true; | 325 _writeBlocked = true; |
364 while (!isInputShutdown() && _readBlocked) | 326 while (_writeBlocked && !isOutputShutdown()) |
365 { | 327 { |
366 try | 328 try |
367 { | 329 { |
368 updateKey(); | 330 updateKey(); |
369 this.wait(timeoutMs>0?(end-now):10000); | 331 this.wait(timeoutMs>0?(end-now):10000); |
374 } | 336 } |
375 finally | 337 finally |
376 { | 338 { |
377 now=_selectSet.getNow(); | 339 now=_selectSet.getNow(); |
378 } | 340 } |
379 | |
380 if (_readBlocked && timeoutMs>0 && now>=end) | |
381 return false; | |
382 } | |
383 } | |
384 finally | |
385 { | |
386 _readBlocked=false; | |
387 setCheckForIdle(check); | |
388 } | |
389 } | |
390 return true; | |
391 } | |
392 | |
393 /* ------------------------------------------------------------ */ | |
394 /* | |
395 * Allows thread to block waiting for further events. | |
396 */ | |
397 @Override | |
398 public boolean blockWritable(long timeoutMs) throws IOException | |
399 { | |
400 synchronized (this) | |
401 { | |
402 if (isOutputShutdown()) | |
403 throw new EofException(); | |
404 | |
405 long now=_selectSet.getNow(); | |
406 long end=now+timeoutMs; | |
407 boolean check=isCheckForIdle(); | |
408 setCheckForIdle(true); | |
409 try | |
410 { | |
411 _writeBlocked=true; | |
412 while (_writeBlocked && !isOutputShutdown()) | |
413 { | |
414 try | |
415 { | |
416 updateKey(); | |
417 this.wait(timeoutMs>0?(end-now):10000); | |
418 } | |
419 catch (final InterruptedException e) | |
420 { | |
421 LOG.warn("",e); | |
422 } | |
423 finally | |
424 { | |
425 now=_selectSet.getNow(); | |
426 } | |
427 if (_writeBlocked && timeoutMs>0 && now>=end) | 341 if (_writeBlocked && timeoutMs>0 && now>=end) |
428 return false; | 342 return false; |
429 } | 343 } |
430 } | 344 } |
431 finally | 345 finally |
432 { | 346 { |
433 _writeBlocked=false; | 347 _writeBlocked = false; |
434 setCheckForIdle(check); | 348 setCheckForIdle(check); |
435 } | 349 } |
436 } | 350 } |
437 return true; | 351 return true; |
438 } | 352 } |
439 | 353 |
354 @Override | |
440 public boolean hasProgressed() | 355 public boolean hasProgressed() |
441 { | 356 { |
442 return false; | 357 return false; |
443 } | 358 } |
444 | 359 |
470 { | 385 { |
471 _key = null; | 386 _key = null; |
472 LOG.trace("",e); | 387 LOG.trace("",e); |
473 } | 388 } |
474 } | 389 } |
475 changed=_interestOps!=current_ops; | 390 changed = _interestOps!=current_ops; |
476 } | 391 } |
477 | 392 |
478 if(changed) | 393 if(changed) |
479 { | 394 { |
480 doUpdateKey(); | 395 doUpdateKey(); |
516 | 431 |
517 if (_open) | 432 if (_open) |
518 { | 433 { |
519 _selectSet.destroyEndPoint(this); | 434 _selectSet.destroyEndPoint(this); |
520 } | 435 } |
521 _open=false; | 436 _open = false; |
522 _key = null; | 437 _key = null; |
523 } | 438 } |
524 } | 439 } |
525 } | 440 } |
526 else | 441 else |
541 if (_key!=null && _key.isValid()) | 456 if (_key!=null && _key.isValid()) |
542 _key.cancel(); | 457 _key.cancel(); |
543 | 458 |
544 if (_open) | 459 if (_open) |
545 { | 460 { |
546 _open=false; | 461 _open = false; |
547 _selectSet.destroyEndPoint(this); | 462 _selectSet.destroyEndPoint(this); |
548 } | 463 } |
549 _key = null; | 464 _key = null; |
550 } | 465 } |
551 } | 466 } |
552 | 467 |
553 /* ------------------------------------------------------------ */ | 468 private void handle() |
554 /* | |
555 */ | |
556 protected void handle() | |
557 { | 469 { |
558 boolean dispatched = true; | 470 boolean dispatched = true; |
559 try | 471 try |
560 { | 472 { |
561 try | 473 try |
646 { | 558 { |
647 updateKey(); | 559 updateKey(); |
648 } | 560 } |
649 } | 561 } |
650 | 562 |
651 /* ------------------------------------------------------------ */ | |
652 @Override | 563 @Override |
653 public String toString() | 564 public String toString() |
654 { | 565 { |
655 // Do NOT use synchronized (this) | 566 // Do NOT use synchronized (this) |
656 // because it's very easy to deadlock when debugging is enabled. | 567 // because it's very easy to deadlock when debugging is enabled. |
697 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) | 608 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) |
698 */ | 609 */ |
699 @Override | 610 @Override |
700 public void setMaxIdleTime(int timeMs) throws IOException | 611 public void setMaxIdleTime(int timeMs) throws IOException |
701 { | 612 { |
702 _maxIdleTime=timeMs; | 613 _maxIdleTime = timeMs; |
703 } | 614 } |
704 | 615 |
705 } | 616 } |