comparison src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 955:6f49b8dfffe6

simplify SelectChannelEndPoint
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 13 Oct 2016 16:55:53 -0600
parents 7db4a488fc82
children 1094975d013b
comparison
equal deleted inserted replaced
954:a021c4c9c244 955:6f49b8dfffe6
39 39
40 /* ------------------------------------------------------------ */ 40 /* ------------------------------------------------------------ */
41 /** 41 /**
42 * An Endpoint that can be scheduled by {@link SelectorManager}. 42 * An Endpoint that can be scheduled by {@link SelectorManager}.
43 */ 43 */
44 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint 44 public final class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
45 { 45 {
46 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); 46 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");
47 47
48 private final SelectorManager.SelectSet _selectSet; 48 private final SelectorManager.SelectSet _selectSet;
49 private final SelectorManager _manager; 49 private final SelectorManager _manager;
67 private static final int STATE_NEEDS_DISPATCH = -1; 67 private static final int STATE_NEEDS_DISPATCH = -1;
68 private static final int STATE_UNDISPATCHED = 0; 68 private static final int STATE_UNDISPATCHED = 0;
69 private static final int STATE_DISPATCHED = 1; 69 private static final int STATE_DISPATCHED = 1;
70 private int _state; 70 private int _state;
71 71
72 private boolean _onIdle;
73
74 /** true if the last write operation succeed and wrote all offered bytes */ 72 /** true if the last write operation succeed and wrote all offered bytes */
75 private volatile boolean _writable = true; 73 private volatile boolean _writable = true;
76 74
77
78 /** True if a thread has is blocked in {@link #blockReadable(long)} */ 75 /** True if a thread has is blocked in {@link #blockReadable(long)} */
79 private boolean _readBlocked; 76 private boolean _readBlocked;
80 77
81 /** True if a thread has is blocked in {@link #blockWritable(long)} */ 78 /** True if a thread has is blocked in {@link #blockWritable(long)} */
82 private boolean _writeBlocked; 79 private boolean _writeBlocked;
83 80
84 /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ 81 /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
85 private boolean _open; 82 private boolean _open;
86 83
87 private volatile long _idleTimestamp;
88 private volatile boolean _checkIdle; 84 private volatile boolean _checkIdle;
89 85
90 private boolean _ishut; 86 private boolean _ishut;
91 87
92 /* ------------------------------------------------------------ */
93 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) 88 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
94 throws IOException 89 throws IOException
95 { 90 {
96 super(channel, maxIdleTime); 91 super(channel, maxIdleTime);
97 92
98 _manager = selectSet.getManager(); 93 _manager = selectSet.getManager();
99 _selectSet = selectSet; 94 _selectSet = selectSet;
100 _state=STATE_UNDISPATCHED; 95 _state = STATE_UNDISPATCHED;
101 _onIdle=false; 96 _open = true;
102 _open=true;
103 _key = key; 97 _key = key;
104 98
105 setCheckForIdle(true); 99 setCheckForIdle(true);
106 } 100 }
107 101
108 /* ------------------------------------------------------------ */ 102 @Override
109 public SelectionKey getSelectionKey()
110 {
111 synchronized (this)
112 {
113 return _key;
114 }
115 }
116
117 public SelectorManager getSelectManager()
118 {
119 return _manager;
120 }
121
122 public Connection getConnection() 103 public Connection getConnection()
123 { 104 {
124 return _connection; 105 return _connection;
125 } 106 }
126 107
108 @Override
127 public void setConnection(Connection connection) 109 public void setConnection(Connection connection)
128 { 110 {
129 _connection = (AsyncConnection)connection; 111 _connection = (AsyncConnection)connection;
130 } 112 }
131 113
136 public synchronized void schedule() 118 public synchronized void schedule()
137 { 119 {
138 // If there is no key, then do nothing 120 // If there is no key, then do nothing
139 if (_key == null || !_key.isValid()) 121 if (_key == null || !_key.isValid())
140 { 122 {
141 _readBlocked=false; 123 _readBlocked = false;
142 _writeBlocked=false; 124 _writeBlocked = false;
143 this.notifyAll(); 125 this.notifyAll();
144 return; 126 return;
145 } 127 }
146 128
147 // If there are threads dispatched reading and writing 129 // If there are threads dispatched reading and writing
149 { 131 {
150 // assert _dispatched; 132 // assert _dispatched;
151 if (_readBlocked && _key.isReadable()) 133 if (_readBlocked && _key.isReadable())
152 _readBlocked=false; 134 _readBlocked=false;
153 if (_writeBlocked && _key.isWritable()) 135 if (_writeBlocked && _key.isWritable())
154 _writeBlocked=false; 136 _writeBlocked = false;
155 137
156 // wake them up is as good as a dispatched. 138 // wake them up is as good as a dispatched.
157 this.notifyAll(); 139 this.notifyAll();
158 140
159 // we are not interested in further selecting 141 // we are not interested in further selecting
180 // other wise do the dispatch 162 // other wise do the dispatch
181 dispatch(); 163 dispatch();
182 } 164 }
183 } 165 }
184 166
167 @Override
185 public synchronized void dispatch() 168 public synchronized void dispatch()
186 { 169 {
187 if (_state<=STATE_UNDISPATCHED) 170 if (_state<=STATE_UNDISPATCHED)
188 { 171 {
189 if (_onIdle) 172 _state = STATE_DISPATCHED;
173 try {
174 _manager.execute(_handler);
175 } catch(RejectedExecutionException e) {
190 _state = STATE_NEEDS_DISPATCH; 176 _state = STATE_NEEDS_DISPATCH;
191 else 177 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
192 { 178 updateKey();
193 _state = STATE_DISPATCHED;
194 try {
195 _manager.execute(_handler);
196 } catch(RejectedExecutionException e) {
197 _state = STATE_NEEDS_DISPATCH;
198 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
199 updateKey();
200 }
201 } 179 }
202 } 180 }
203 } 181 }
204 182
205 /* ------------------------------------------------------------ */ 183 /* ------------------------------------------------------------ */
207 * Called when a dispatched thread is no longer handling the endpoint. 185 * Called when a dispatched thread is no longer handling the endpoint.
208 * The selection key operations are updated. 186 * The selection key operations are updated.
209 * @return If false is returned, the endpoint has been redispatched and 187 * @return If false is returned, the endpoint has been redispatched and
210 * thread must keep handling the endpoint. 188 * thread must keep handling the endpoint.
211 */ 189 */
212 protected synchronized void undispatch() 190 private synchronized void undispatch()
213 { 191 {
214 _state = STATE_UNDISPATCHED; 192 _state = STATE_UNDISPATCHED;
215 updateKey(); 193 updateKey();
216 } 194 }
217 195
196 @Override
218 public void setCheckForIdle(boolean check) 197 public void setCheckForIdle(boolean check)
219 { 198 {
220 if (check) 199 if (check)
221 { 200 {
222 _idleTimestamp=System.currentTimeMillis(); 201 _checkIdle = true;
223 _checkIdle=true;
224 } 202 }
225 else 203 else
226 _checkIdle=false; 204 _checkIdle = false;
227 }
228
229 private boolean isCheckForIdle()
230 {
231 return _checkIdle;
232 }
233
234 protected void notIdle()
235 {
236 _idleTimestamp=System.currentTimeMillis();
237 }
238
239 public void checkIdleTimestamp(long now)
240 {
241 if (isCheckForIdle() && _maxIdleTime>0)
242 {
243 final long idleForMs=now-_idleTimestamp;
244
245 if (idleForMs>_maxIdleTime)
246 {
247 // Don't idle out again until onIdleExpired task completes.
248 setCheckForIdle(false);
249 _manager.execute(new Runnable()
250 {
251 public void run()
252 {
253 try
254 {
255 onIdleExpired(idleForMs);
256 }
257 finally
258 {
259 setCheckForIdle(true);
260 }
261 }
262 });
263 }
264 }
265 }
266
267 private void onIdleExpired(long idleForMs)
268 {
269 try
270 {
271 synchronized (this)
272 {
273 _onIdle=true;
274 }
275
276 _connection.onIdleExpired(idleForMs);
277 }
278 finally
279 {
280 synchronized (this)
281 {
282 _onIdle=false;
283 if (_state==STATE_NEEDS_DISPATCH)
284 dispatch();
285 }
286 }
287 } 205 }
288 206
289 @Override 207 @Override
290 public int fill(Buffer buffer) throws IOException 208 public int fill(Buffer buffer) throws IOException
291 { 209 {
292 int fill=super.fill(buffer); 210 int fill=super.fill(buffer);
293 if (fill>0)
294 notIdle();
295 return fill; 211 return fill;
296 } 212 }
297 213
298 @Override 214 @Override
299 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException 215 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
303 // If there was something to write and it wasn't written, then we are not writable. 219 // If there was something to write and it wasn't written, then we are not writable.
304 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) 220 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
305 { 221 {
306 synchronized (this) 222 synchronized (this)
307 { 223 {
308 _writable=false; 224 _writable = false;
309 if (_state<STATE_DISPATCHED) 225 if (_state<STATE_DISPATCHED)
310 updateKey(); 226 updateKey();
311 } 227 }
312 } 228 }
313 else if (l>0) 229 else if (l>0)
314 { 230 {
315 _writable=true; 231 _writable = true;
316 notIdle();
317 } 232 }
318 return l; 233 return l;
319 } 234 }
320 235
321 @Override 236 @Override
326 // If there was something to write and it wasn't written, then we are not writable. 241 // If there was something to write and it wasn't written, then we are not writable.
327 if (l==0 && buffer!=null && buffer.hasContent()) 242 if (l==0 && buffer!=null && buffer.hasContent())
328 { 243 {
329 synchronized (this) 244 synchronized (this)
330 { 245 {
331 _writable=false; 246 _writable = false;
332 if (_state<STATE_DISPATCHED) 247 if (_state<STATE_DISPATCHED)
333 updateKey(); 248 updateKey();
334 } 249 }
335 } 250 }
336 else if (l>0) 251 else if (l>0)
337 { 252 {
338 _writable=true; 253 _writable = true;
339 notIdle();
340 } 254 }
341 255
342 return l; 256 return l;
343 } 257 }
344 258
352 synchronized (this) 266 synchronized (this)
353 { 267 {
354 if (isInputShutdown()) 268 if (isInputShutdown())
355 throw new EofException(); 269 throw new EofException();
356 270
271 long now = _selectSet.getNow();
272 long end = now+timeoutMs;
273 boolean check = _checkIdle;
274 setCheckForIdle(true);
275 try
276 {
277 _readBlocked = true;
278 while (!isInputShutdown() && _readBlocked)
279 {
280 try
281 {
282 updateKey();
283 this.wait(timeoutMs>0?(end-now):10000);
284 }
285 catch (final InterruptedException e)
286 {
287 LOG.warn("",e);
288 }
289 finally
290 {
291 now=_selectSet.getNow();
292 }
293
294 if (_readBlocked && timeoutMs>0 && now>=end)
295 return false;
296 }
297 }
298 finally
299 {
300 _readBlocked = false;
301 setCheckForIdle(check);
302 }
303 }
304 return true;
305 }
306
307 /* ------------------------------------------------------------ */
308 /*
309 * Allows thread to block waiting for further events.
310 */
311 @Override
312 public boolean blockWritable(long timeoutMs) throws IOException
313 {
314 synchronized (this)
315 {
316 if (isOutputShutdown())
317 throw new EofException();
318
357 long now=_selectSet.getNow(); 319 long now=_selectSet.getNow();
358 long end=now+timeoutMs; 320 long end=now+timeoutMs;
359 boolean check=isCheckForIdle(); 321 boolean check = _checkIdle;
360 setCheckForIdle(true); 322 setCheckForIdle(true);
361 try 323 try
362 { 324 {
363 _readBlocked=true; 325 _writeBlocked = true;
364 while (!isInputShutdown() && _readBlocked) 326 while (_writeBlocked && !isOutputShutdown())
365 { 327 {
366 try 328 try
367 { 329 {
368 updateKey(); 330 updateKey();
369 this.wait(timeoutMs>0?(end-now):10000); 331 this.wait(timeoutMs>0?(end-now):10000);
374 } 336 }
375 finally 337 finally
376 { 338 {
377 now=_selectSet.getNow(); 339 now=_selectSet.getNow();
378 } 340 }
379
380 if (_readBlocked && timeoutMs>0 && now>=end)
381 return false;
382 }
383 }
384 finally
385 {
386 _readBlocked=false;
387 setCheckForIdle(check);
388 }
389 }
390 return true;
391 }
392
393 /* ------------------------------------------------------------ */
394 /*
395 * Allows thread to block waiting for further events.
396 */
397 @Override
398 public boolean blockWritable(long timeoutMs) throws IOException
399 {
400 synchronized (this)
401 {
402 if (isOutputShutdown())
403 throw new EofException();
404
405 long now=_selectSet.getNow();
406 long end=now+timeoutMs;
407 boolean check=isCheckForIdle();
408 setCheckForIdle(true);
409 try
410 {
411 _writeBlocked=true;
412 while (_writeBlocked && !isOutputShutdown())
413 {
414 try
415 {
416 updateKey();
417 this.wait(timeoutMs>0?(end-now):10000);
418 }
419 catch (final InterruptedException e)
420 {
421 LOG.warn("",e);
422 }
423 finally
424 {
425 now=_selectSet.getNow();
426 }
427 if (_writeBlocked && timeoutMs>0 && now>=end) 341 if (_writeBlocked && timeoutMs>0 && now>=end)
428 return false; 342 return false;
429 } 343 }
430 } 344 }
431 finally 345 finally
432 { 346 {
433 _writeBlocked=false; 347 _writeBlocked = false;
434 setCheckForIdle(check); 348 setCheckForIdle(check);
435 } 349 }
436 } 350 }
437 return true; 351 return true;
438 } 352 }
439 353
354 @Override
440 public boolean hasProgressed() 355 public boolean hasProgressed()
441 { 356 {
442 return false; 357 return false;
443 } 358 }
444 359
470 { 385 {
471 _key = null; 386 _key = null;
472 LOG.trace("",e); 387 LOG.trace("",e);
473 } 388 }
474 } 389 }
475 changed=_interestOps!=current_ops; 390 changed = _interestOps!=current_ops;
476 } 391 }
477 392
478 if(changed) 393 if(changed)
479 { 394 {
480 doUpdateKey(); 395 doUpdateKey();
516 431
517 if (_open) 432 if (_open)
518 { 433 {
519 _selectSet.destroyEndPoint(this); 434 _selectSet.destroyEndPoint(this);
520 } 435 }
521 _open=false; 436 _open = false;
522 _key = null; 437 _key = null;
523 } 438 }
524 } 439 }
525 } 440 }
526 else 441 else
541 if (_key!=null && _key.isValid()) 456 if (_key!=null && _key.isValid())
542 _key.cancel(); 457 _key.cancel();
543 458
544 if (_open) 459 if (_open)
545 { 460 {
546 _open=false; 461 _open = false;
547 _selectSet.destroyEndPoint(this); 462 _selectSet.destroyEndPoint(this);
548 } 463 }
549 _key = null; 464 _key = null;
550 } 465 }
551 } 466 }
552 467
553 /* ------------------------------------------------------------ */ 468 private void handle()
554 /*
555 */
556 protected void handle()
557 { 469 {
558 boolean dispatched = true; 470 boolean dispatched = true;
559 try 471 try
560 { 472 {
561 try 473 try
646 { 558 {
647 updateKey(); 559 updateKey();
648 } 560 }
649 } 561 }
650 562
651 /* ------------------------------------------------------------ */
652 @Override 563 @Override
653 public String toString() 564 public String toString()
654 { 565 {
655 // Do NOT use synchronized (this) 566 // Do NOT use synchronized (this)
656 // because it's very easy to deadlock when debugging is enabled. 567 // because it's very easy to deadlock when debugging is enabled.
697 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) 608 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
698 */ 609 */
699 @Override 610 @Override
700 public void setMaxIdleTime(int timeMs) throws IOException 611 public void setMaxIdleTime(int timeMs) throws IOException
701 { 612 {
702 _maxIdleTime=timeMs; 613 _maxIdleTime = timeMs;
703 } 614 }
704 615
705 } 616 }