comparison src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 865:6b210bb66c63

remove ThreadPool
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 02 Oct 2016 20:38:06 -0600
parents 8e9db0bbf4f9
children 54308d65265a
comparison
equal deleted inserted replaced
864:e21ca9878a10 865:6b210bb66c63
23 import java.nio.channels.ClosedChannelException; 23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.SelectableChannel; 24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey; 25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.SocketChannel; 26 import java.nio.channels.SocketChannel;
27 import java.util.Locale; 27 import java.util.Locale;
28 import java.util.concurrent.RejectedExecutionException;
28 29
29 import org.eclipse.jetty.io.AsyncEndPoint; 30 import org.eclipse.jetty.io.AsyncEndPoint;
30 import org.eclipse.jetty.io.Buffer; 31 import org.eclipse.jetty.io.Buffer;
31 import org.eclipse.jetty.io.ConnectedEndPoint; 32 import org.eclipse.jetty.io.ConnectedEndPoint;
32 import org.eclipse.jetty.io.Connection; 33 import org.eclipse.jetty.io.Connection;
40 /** 41 /**
41 * An Endpoint that can be scheduled by {@link SelectorManager}. 42 * An Endpoint that can be scheduled by {@link SelectorManager}.
42 */ 43 */
43 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint 44 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
44 { 45 {
45 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); 46 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");
46 47
47 private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win"); 48 private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
48 private final SelectorManager.SelectSet _selectSet; 49 private final SelectorManager.SelectSet _selectSet;
49 private final SelectorManager _manager; 50 private final SelectorManager _manager;
50 private SelectionKey _key; 51 private SelectionKey _key;
51 private final Runnable _handler = new Runnable() 52 private final Runnable _handler = new Runnable()
52 { 53 {
53 public void run() { handle(); } 54 public void run() { handle(); }
54 }; 55 };
55 56
56 /** The desired value for {@link SelectionKey#interestOps()} */ 57 /** The desired value for {@link SelectionKey#interestOps()} */
57 private int _interestOps; 58 private int _interestOps;
58 59
59 /** 60 /**
60 * The connection instance is the handler for any IO activity on the endpoint. 61 * The connection instance is the handler for any IO activity on the endpoint.
61 * There is a different type of connection for HTTP, AJP, WebSocket and 62 * There is a different type of connection for HTTP, AJP, WebSocket and
62 * ProxyConnect. The connection may change for an SCEP as it is upgraded 63 * ProxyConnect. The connection may change for an SCEP as it is upgraded
63 * from HTTP to proxy connect or websocket. 64 * from HTTP to proxy connect or websocket.
64 */ 65 */
65 private volatile AsyncConnection _connection; 66 private volatile AsyncConnection _connection;
66 67
67 private static final int STATE_NEEDS_DISPATCH=-1; 68 private static final int STATE_NEEDS_DISPATCH=-1;
68 private static final int STATE_UNDISPATCHED=0; 69 private static final int STATE_UNDISPATCHED=0;
69 private static final int STATE_DISPATCHED=1; 70 private static final int STATE_DISPATCHED=1;
70 private static final int STATE_ASYNC=2; 71 private static final int STATE_ASYNC=2;
71 private int _state; 72 private int _state;
72 73
73 private boolean _onIdle; 74 private boolean _onIdle;
74 75
75 /** true if the last write operation succeed and wrote all offered bytes */ 76 /** true if the last write operation succeed and wrote all offered bytes */
76 private volatile boolean _writable = true; 77 private volatile boolean _writable = true;
77 78
78 79
79 /** True if a thread has is blocked in {@link #blockReadable(long)} */ 80 /** True if a thread has is blocked in {@link #blockReadable(long)} */
80 private boolean _readBlocked; 81 private boolean _readBlocked;
81 82
82 /** True if a thread has is blocked in {@link #blockWritable(long)} */ 83 /** True if a thread has is blocked in {@link #blockWritable(long)} */
83 private boolean _writeBlocked; 84 private boolean _writeBlocked;
84 85
85 /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */ 86 /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
86 private boolean _open; 87 private boolean _open;
87 88
88 private volatile long _idleTimestamp; 89 private volatile long _idleTimestamp;
89 private volatile boolean _checkIdle; 90 private volatile boolean _checkIdle;
90 91
91 private boolean _interruptable; 92 private boolean _interruptable;
92 93
93 private boolean _ishut; 94 private boolean _ishut;
94 95
95 /* ------------------------------------------------------------ */ 96 /* ------------------------------------------------------------ */
96 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) 97 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
97 throws IOException 98 throws IOException
98 { 99 {
99 super(channel, maxIdleTime); 100 super(channel, maxIdleTime);
100 101
101 _manager = selectSet.getManager(); 102 _manager = selectSet.getManager();
102 _selectSet = selectSet; 103 _selectSet = selectSet;
103 _state=STATE_UNDISPATCHED; 104 _state=STATE_UNDISPATCHED;
104 _onIdle=false; 105 _onIdle=false;
105 _open=true; 106 _open=true;
106 _key = key; 107 _key = key;
107 108
108 setCheckForIdle(true); 109 setCheckForIdle(true);
109 } 110 }
110 111
111 /* ------------------------------------------------------------ */ 112 /* ------------------------------------------------------------ */
112 public SelectionKey getSelectionKey() 113 public SelectionKey getSelectionKey()
113 { 114 {
114 synchronized (this) 115 synchronized (this)
115 { 116 {
116 return _key; 117 return _key;
117 } 118 }
118 } 119 }
119 120
120 /* ------------------------------------------------------------ */ 121 /* ------------------------------------------------------------ */
121 public SelectorManager getSelectManager() 122 public SelectorManager getSelectManager()
122 { 123 {
123 return _manager; 124 return _manager;
124 } 125 }
125 126
126 /* ------------------------------------------------------------ */ 127 /* ------------------------------------------------------------ */
127 public Connection getConnection() 128 public Connection getConnection()
128 { 129 {
129 return _connection; 130 return _connection;
130 } 131 }
131 132
132 /* ------------------------------------------------------------ */ 133 /* ------------------------------------------------------------ */
133 public void setConnection(Connection connection) 134 public void setConnection(Connection connection)
134 { 135 {
135 Connection old=_connection; 136 Connection old=_connection;
136 _connection=(AsyncConnection)connection; 137 _connection=(AsyncConnection)connection;
137 if (old!=null && old!=_connection) 138 if (old!=null && old!=_connection)
138 _manager.endPointUpgraded(this,old); 139 _manager.endPointUpgraded(this,old);
139 } 140 }
140 141
141 /* ------------------------------------------------------------ */ 142 /* ------------------------------------------------------------ */
142 public long getIdleTimestamp() 143 public long getIdleTimestamp()
143 { 144 {
144 return _idleTimestamp; 145 return _idleTimestamp;
145 } 146 }
146 147
147 /* ------------------------------------------------------------ */ 148 /* ------------------------------------------------------------ */
148 /** Called by selectSet to schedule handling 149 /** Called by selectSet to schedule handling
149 * 150 *
150 */ 151 */
151 public void schedule() 152 public void schedule()
152 { 153 {
153 synchronized (this) 154 synchronized (this)
154 { 155 {
155 // If there is no key, then do nothing 156 // If there is no key, then do nothing
156 if (_key == null || !_key.isValid()) 157 if (_key == null || !_key.isValid())
157 { 158 {
158 _readBlocked=false; 159 _readBlocked=false;
159 _writeBlocked=false; 160 _writeBlocked=false;
160 this.notifyAll(); 161 this.notifyAll();
161 return; 162 return;
162 } 163 }
163 164
164 // If there are threads dispatched reading and writing 165 // If there are threads dispatched reading and writing
165 if (_readBlocked || _writeBlocked) 166 if (_readBlocked || _writeBlocked)
166 { 167 {
167 // assert _dispatched; 168 // assert _dispatched;
168 if (_readBlocked && _key.isReadable()) 169 if (_readBlocked && _key.isReadable())
169 _readBlocked=false; 170 _readBlocked=false;
170 if (_writeBlocked && _key.isWritable()) 171 if (_writeBlocked && _key.isWritable())
171 _writeBlocked=false; 172 _writeBlocked=false;
172 173
173 // wake them up is as good as a dispatched. 174 // wake them up is as good as a dispatched.
174 this.notifyAll(); 175 this.notifyAll();
175 176
176 // we are not interested in further selecting 177 // we are not interested in further selecting
177 _key.interestOps(0); 178 _key.interestOps(0);
178 if (_state<STATE_DISPATCHED) 179 if (_state<STATE_DISPATCHED)
179 updateKey(); 180 updateKey();
180 return; 181 return;
181 } 182 }
182 183
183 // Remove writeable op 184 // Remove writeable op
184 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) 185 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
185 { 186 {
186 // Remove writeable op 187 // Remove writeable op
187 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE; 188 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
188 _key.interestOps(_interestOps); 189 _key.interestOps(_interestOps);
189 _writable = true; // Once writable is in ops, only removed with dispatch. 190 _writable = true; // Once writable is in ops, only removed with dispatch.
190 } 191 }
191 192
192 // If dispatched, then deregister interest 193 // If dispatched, then deregister interest
193 if (_state>=STATE_DISPATCHED) 194 if (_state>=STATE_DISPATCHED)
194 _key.interestOps(0); 195 _key.interestOps(0);
195 else 196 else
196 { 197 {
197 // other wise do the dispatch 198 // other wise do the dispatch
198 dispatch(); 199 dispatch();
199 if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0()) 200 if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
200 { 201 {
201 _key.interestOps(0); 202 _key.interestOps(0);
202 } 203 }
203 } 204 }
204 } 205 }
205 } 206 }
206 207
207 /* ------------------------------------------------------------ */ 208 /* ------------------------------------------------------------ */
208 public void asyncDispatch() 209 public void asyncDispatch()
209 { 210 {
210 synchronized(this) 211 synchronized(this)
211 { 212 {
212 switch(_state) 213 switch(_state)
213 { 214 {
214 case STATE_NEEDS_DISPATCH: 215 case STATE_NEEDS_DISPATCH:
215 case STATE_UNDISPATCHED: 216 case STATE_UNDISPATCHED:
216 dispatch(); 217 dispatch();
217 break; 218 break;
218 219
219 case STATE_DISPATCHED: 220 case STATE_DISPATCHED:
220 case STATE_ASYNC: 221 case STATE_ASYNC:
221 _state=STATE_ASYNC; 222 _state=STATE_ASYNC;
222 break; 223 break;
223 } 224 }
224 } 225 }
225 } 226 }
226 227
227 /* ------------------------------------------------------------ */ 228 /* ------------------------------------------------------------ */
228 public void dispatch() 229 public void dispatch()
229 { 230 {
230 synchronized(this) 231 synchronized(this)
231 { 232 {
232 if (_state<=STATE_UNDISPATCHED) 233 if (_state<=STATE_UNDISPATCHED)
233 { 234 {
234 if (_onIdle) 235 if (_onIdle)
235 _state = STATE_NEEDS_DISPATCH; 236 _state = STATE_NEEDS_DISPATCH;
236 else 237 else
237 { 238 {
238 _state = STATE_DISPATCHED; 239 _state = STATE_DISPATCHED;
239 boolean dispatched = _manager.dispatch(_handler); 240 try {
240 if(!dispatched) 241 _manager.execute(_handler);
241 { 242 } catch(RejectedExecutionException e) {
242 _state = STATE_NEEDS_DISPATCH; 243 _state = STATE_NEEDS_DISPATCH;
243 LOG.warn("Dispatched Failed! "+this+" to "+_manager); 244 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
244 updateKey(); 245 updateKey();
245 } 246 }
246 } 247 }
247 } 248 }
248 } 249 }
249 } 250 }
250 251
251 /* ------------------------------------------------------------ */ 252 /* ------------------------------------------------------------ */
252 /** 253 /**
253 * Called when a dispatched thread is no longer handling the endpoint. 254 * Called when a dispatched thread is no longer handling the endpoint.
254 * The selection key operations are updated. 255 * The selection key operations are updated.
255 * @return If false is returned, the endpoint has been redispatched and 256 * @return If false is returned, the endpoint has been redispatched and
256 * thread must keep handling the endpoint. 257 * thread must keep handling the endpoint.
257 */ 258 */
258 protected boolean undispatch() 259 protected boolean undispatch()
259 { 260 {
260 synchronized (this) 261 synchronized (this)
261 { 262 {
262 switch(_state) 263 switch(_state)
263 { 264 {
264 case STATE_ASYNC: 265 case STATE_ASYNC:
265 _state=STATE_DISPATCHED; 266 _state=STATE_DISPATCHED;
266 return false; 267 return false;
267 268
268 default: 269 default:
269 _state=STATE_UNDISPATCHED; 270 _state=STATE_UNDISPATCHED;
270 updateKey(); 271 updateKey();
271 return true; 272 return true;
272 } 273 }
273 } 274 }
274 } 275 }
275 276
276 /* ------------------------------------------------------------ */ 277 /* ------------------------------------------------------------ */
277 public void cancelTimeout(Task task) 278 public void cancelTimeout(Task task)
278 { 279 {
279 getSelectSet().cancelTimeout(task); 280 getSelectSet().cancelTimeout(task);
280 } 281 }
281 282
282 /* ------------------------------------------------------------ */ 283 /* ------------------------------------------------------------ */
283 public void scheduleTimeout(Task task, long timeoutMs) 284 public void scheduleTimeout(Task task, long timeoutMs)
284 { 285 {
285 getSelectSet().scheduleTimeout(task,timeoutMs); 286 getSelectSet().scheduleTimeout(task,timeoutMs);
286 } 287 }
287 288
288 /* ------------------------------------------------------------ */ 289 /* ------------------------------------------------------------ */
289 public void setCheckForIdle(boolean check) 290 public void setCheckForIdle(boolean check)
290 { 291 {
291 if (check) 292 if (check)
292 { 293 {
293 _idleTimestamp=System.currentTimeMillis(); 294 _idleTimestamp=System.currentTimeMillis();
294 _checkIdle=true; 295 _checkIdle=true;
295 } 296 }
296 else 297 else
297 _checkIdle=false; 298 _checkIdle=false;
298 } 299 }
299 300
300 /* ------------------------------------------------------------ */ 301 /* ------------------------------------------------------------ */
301 public boolean isCheckForIdle() 302 public boolean isCheckForIdle()
302 { 303 {
303 return _checkIdle; 304 return _checkIdle;
304 } 305 }
305 306
306 /* ------------------------------------------------------------ */ 307 /* ------------------------------------------------------------ */
307 protected void notIdle() 308 protected void notIdle()
308 { 309 {
309 _idleTimestamp=System.currentTimeMillis(); 310 _idleTimestamp=System.currentTimeMillis();
310 } 311 }
311 312
312 /* ------------------------------------------------------------ */ 313 /* ------------------------------------------------------------ */
313 public void checkIdleTimestamp(long now) 314 public void checkIdleTimestamp(long now)
314 { 315 {
315 if (isCheckForIdle() && _maxIdleTime>0) 316 if (isCheckForIdle() && _maxIdleTime>0)
316 { 317 {
317 final long idleForMs=now-_idleTimestamp; 318 final long idleForMs=now-_idleTimestamp;
318 319
319 if (idleForMs>_maxIdleTime) 320 if (idleForMs>_maxIdleTime)
320 { 321 {
321 // Don't idle out again until onIdleExpired task completes. 322 // Don't idle out again until onIdleExpired task completes.
322 setCheckForIdle(false); 323 setCheckForIdle(false);
323 _manager.dispatch(new Runnable() 324 _manager.execute(new Runnable()
324 { 325 {
325 public void run() 326 public void run()
326 { 327 {
327 try 328 try
328 { 329 {
329 onIdleExpired(idleForMs); 330 onIdleExpired(idleForMs);
330 } 331 }
331 finally 332 finally
332 { 333 {
333 setCheckForIdle(true); 334 setCheckForIdle(true);
334 } 335 }
335 } 336 }
336 }); 337 });
337 } 338 }
338 } 339 }
339 } 340 }
340 341
341 /* ------------------------------------------------------------ */ 342 /* ------------------------------------------------------------ */
342 public void onIdleExpired(long idleForMs) 343 public void onIdleExpired(long idleForMs)
343 { 344 {
344 try 345 try
345 { 346 {
346 synchronized (this) 347 synchronized (this)
347 { 348 {
348 _onIdle=true; 349 _onIdle=true;
349 } 350 }
350 351
351 _connection.onIdleExpired(idleForMs); 352 _connection.onIdleExpired(idleForMs);
352 } 353 }
353 finally 354 finally
354 { 355 {
355 synchronized (this) 356 synchronized (this)
356 { 357 {
357 _onIdle=false; 358 _onIdle=false;
358 if (_state==STATE_NEEDS_DISPATCH) 359 if (_state==STATE_NEEDS_DISPATCH)
359 dispatch(); 360 dispatch();
360 } 361 }
361 } 362 }
362 } 363 }
363 364
364 /* ------------------------------------------------------------ */ 365 /* ------------------------------------------------------------ */
365 @Override 366 @Override
366 public int fill(Buffer buffer) throws IOException 367 public int fill(Buffer buffer) throws IOException
367 { 368 {
368 int fill=super.fill(buffer); 369 int fill=super.fill(buffer);
369 if (fill>0) 370 if (fill>0)
370 notIdle(); 371 notIdle();
371 return fill; 372 return fill;
372 } 373 }
373 374
374 /* ------------------------------------------------------------ */ 375 /* ------------------------------------------------------------ */
375 @Override 376 @Override
376 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException 377 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
377 { 378 {
378 int l = super.flush(header, buffer, trailer); 379 int l = super.flush(header, buffer, trailer);
379 380
380 // If there was something to write and it wasn't written, then we are not writable. 381 // If there was something to write and it wasn't written, then we are not writable.
381 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) 382 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
382 { 383 {
383 synchronized (this) 384 synchronized (this)
384 { 385 {
385 _writable=false; 386 _writable=false;
386 if (_state<STATE_DISPATCHED) 387 if (_state<STATE_DISPATCHED)
387 updateKey(); 388 updateKey();
388 } 389 }
389 } 390 }
390 else if (l>0) 391 else if (l>0)
391 { 392 {
392 _writable=true; 393 _writable=true;
393 notIdle(); 394 notIdle();
394 } 395 }
395 return l; 396 return l;
396 } 397 }
397 398
398 /* ------------------------------------------------------------ */ 399 /* ------------------------------------------------------------ */
399 /* 400 /*
400 */ 401 */
401 @Override 402 @Override
402 public int flush(Buffer buffer) throws IOException 403 public int flush(Buffer buffer) throws IOException
403 { 404 {
404 int l = super.flush(buffer); 405 int l = super.flush(buffer);
405 406
406 // If there was something to write and it wasn't written, then we are not writable. 407 // If there was something to write and it wasn't written, then we are not writable.
407 if (l==0 && buffer!=null && buffer.hasContent()) 408 if (l==0 && buffer!=null && buffer.hasContent())
408 { 409 {
409 synchronized (this) 410 synchronized (this)
410 { 411 {
411 _writable=false; 412 _writable=false;
412 if (_state<STATE_DISPATCHED) 413 if (_state<STATE_DISPATCHED)
413 updateKey(); 414 updateKey();
414 } 415 }
415 } 416 }
416 else if (l>0) 417 else if (l>0)
417 { 418 {
418 _writable=true; 419 _writable=true;
419 notIdle(); 420 notIdle();
420 } 421 }
421 422
422 return l; 423 return l;
423 } 424 }
424 425
425 /* ------------------------------------------------------------ */ 426 /* ------------------------------------------------------------ */
426 /* 427 /*
427 * Allows thread to block waiting for further events. 428 * Allows thread to block waiting for further events.
428 */ 429 */
429 @Override 430 @Override
430 public boolean blockReadable(long timeoutMs) throws IOException 431 public boolean blockReadable(long timeoutMs) throws IOException
431 { 432 {
432 synchronized (this) 433 synchronized (this)
433 { 434 {
434 if (isInputShutdown()) 435 if (isInputShutdown())
435 throw new EofException(); 436 throw new EofException();
436 437
437 long now=_selectSet.getNow(); 438 long now=_selectSet.getNow();
438 long end=now+timeoutMs; 439 long end=now+timeoutMs;
439 boolean check=isCheckForIdle(); 440 boolean check=isCheckForIdle();
440 setCheckForIdle(true); 441 setCheckForIdle(true);
441 try 442 try
442 { 443 {
443 _readBlocked=true; 444 _readBlocked=true;
444 while (!isInputShutdown() && _readBlocked) 445 while (!isInputShutdown() && _readBlocked)
445 { 446 {
446 try 447 try
447 { 448 {
448 updateKey(); 449 updateKey();
449 this.wait(timeoutMs>0?(end-now):10000); 450 this.wait(timeoutMs>0?(end-now):10000);
450 } 451 }
451 catch (final InterruptedException e) 452 catch (final InterruptedException e)
452 { 453 {
453 LOG.warn("",e); 454 LOG.warn("",e);
454 if (_interruptable) 455 if (_interruptable)
455 throw new InterruptedIOException(){{this.initCause(e);}}; 456 throw new InterruptedIOException(){{this.initCause(e);}};
456 } 457 }
457 finally 458 finally
458 { 459 {
459 now=_selectSet.getNow(); 460 now=_selectSet.getNow();
460 } 461 }
461 462
462 if (_readBlocked && timeoutMs>0 && now>=end) 463 if (_readBlocked && timeoutMs>0 && now>=end)
463 return false; 464 return false;
464 } 465 }
465 } 466 }
466 finally 467 finally
467 { 468 {
468 _readBlocked=false; 469 _readBlocked=false;
469 setCheckForIdle(check); 470 setCheckForIdle(check);
470 } 471 }
471 } 472 }
472 return true; 473 return true;
473 } 474 }
474 475
475 /* ------------------------------------------------------------ */ 476 /* ------------------------------------------------------------ */
476 /* 477 /*
477 * Allows thread to block waiting for further events. 478 * Allows thread to block waiting for further events.
478 */ 479 */
479 @Override 480 @Override
480 public boolean blockWritable(long timeoutMs) throws IOException 481 public boolean blockWritable(long timeoutMs) throws IOException
481 { 482 {
482 synchronized (this) 483 synchronized (this)
483 { 484 {
484 if (isOutputShutdown()) 485 if (isOutputShutdown())
485 throw new EofException(); 486 throw new EofException();
486 487
487 long now=_selectSet.getNow(); 488 long now=_selectSet.getNow();
488 long end=now+timeoutMs; 489 long end=now+timeoutMs;
489 boolean check=isCheckForIdle(); 490 boolean check=isCheckForIdle();
490 setCheckForIdle(true); 491 setCheckForIdle(true);
491 try 492 try
492 { 493 {
493 _writeBlocked=true; 494 _writeBlocked=true;
494 while (_writeBlocked && !isOutputShutdown()) 495 while (_writeBlocked && !isOutputShutdown())
495 { 496 {
496 try 497 try
497 { 498 {
498 updateKey(); 499 updateKey();
499 this.wait(timeoutMs>0?(end-now):10000); 500 this.wait(timeoutMs>0?(end-now):10000);
500 } 501 }
501 catch (final InterruptedException e) 502 catch (final InterruptedException e)
502 { 503 {
503 LOG.warn("",e); 504 LOG.warn("",e);
504 if (_interruptable) 505 if (_interruptable)
505 throw new InterruptedIOException(){{this.initCause(e);}}; 506 throw new InterruptedIOException(){{this.initCause(e);}};
506 } 507 }
507 finally 508 finally
508 { 509 {
509 now=_selectSet.getNow(); 510 now=_selectSet.getNow();
510 } 511 }
511 if (_writeBlocked && timeoutMs>0 && now>=end) 512 if (_writeBlocked && timeoutMs>0 && now>=end)
512 return false; 513 return false;
513 } 514 }
514 } 515 }
515 finally 516 finally
516 { 517 {
517 _writeBlocked=false; 518 _writeBlocked=false;
518 setCheckForIdle(check); 519 setCheckForIdle(check);
519 } 520 }
520 } 521 }
521 return true; 522 return true;
522 } 523 }
523 524
524 /* ------------------------------------------------------------ */ 525 /* ------------------------------------------------------------ */
525 /** Set the interruptable mode of the endpoint. 526 /** Set the interruptable mode of the endpoint.
526 * If set to false (default), then interrupts are assumed to be spurious 527 * If set to false (default), then interrupts are assumed to be spurious
527 * and blocking operations continue unless the endpoint has been closed. 528 * and blocking operations continue unless the endpoint has been closed.
528 * If true, then interrupts of blocking operations result in InterruptedIOExceptions 529 * If true, then interrupts of blocking operations result in InterruptedIOExceptions
529 * being thrown. 530 * being thrown.
530 * @param interupable 531 * @param interupable
531 */ 532 */
532 public void setInterruptable(boolean interupable) 533 public void setInterruptable(boolean interupable)
533 { 534 {
534 synchronized (this) 535 synchronized (this)
535 { 536 {
536 _interruptable=interupable; 537 _interruptable=interupable;
537 } 538 }
538 } 539 }
539 540
540 /* ------------------------------------------------------------ */ 541 /* ------------------------------------------------------------ */
541 public boolean isInterruptable() 542 public boolean isInterruptable()
542 { 543 {
543 return _interruptable; 544 return _interruptable;
544 } 545 }
545 546
546 /* ------------------------------------------------------------ */ 547 /* ------------------------------------------------------------ */
547 /** 548 /**
548 * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite() 549 * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
549 */ 550 */
550 public void scheduleWrite() 551 public void scheduleWrite()
551 { 552 {
552 if (_writable) 553 if (_writable)
553 LOG.debug("Required scheduleWrite {}",this); 554 LOG.debug("Required scheduleWrite {}",this);
554 555
555 _writable=false; 556 _writable=false;
556 updateKey(); 557 updateKey();
557 } 558 }
558 559
559 /* ------------------------------------------------------------ */ 560 /* ------------------------------------------------------------ */
560 public boolean isWritable() 561 public boolean isWritable()
561 { 562 {
562 return _writable; 563 return _writable;
563 } 564 }
564 565
565 /* ------------------------------------------------------------ */ 566 /* ------------------------------------------------------------ */
566 public boolean hasProgressed() 567 public boolean hasProgressed()
567 { 568 {
568 return false; 569 return false;
569 } 570 }
570 571
571 /* ------------------------------------------------------------ */ 572 /* ------------------------------------------------------------ */
572 /** 573 /**
573 * Updates selection key. Adds operations types to the selection key as needed. No operations 574 * Updates selection key. Adds operations types to the selection key as needed. No operations
574 * are removed as this is only done during dispatch. This method records the new key and 575 * are removed as this is only done during dispatch. This method records the new key and
575 * schedules a call to doUpdateKey to do the keyChange 576 * schedules a call to doUpdateKey to do the keyChange
576 */ 577 */
577 private void updateKey() 578 private void updateKey()
578 { 579 {
579 final boolean changed; 580 final boolean changed;
580 synchronized (this) 581 synchronized (this)
581 { 582 {
582 int current_ops=-1; 583 int current_ops=-1;
583 if (getChannel().isOpen()) 584 if (getChannel().isOpen())
584 { 585 {
585 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); 586 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
586 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); 587 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
587 588
588 _interestOps = 589 _interestOps =
589 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) 590 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
590 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); 591 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
591 try 592 try
592 { 593 {
593 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1); 594 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
594 } 595 }
595 catch(Exception e) 596 catch(Exception e)
596 { 597 {
597 _key=null; 598 _key=null;
598 LOG.trace("",e); 599 LOG.trace("",e);
599 } 600 }
600 } 601 }
601 changed=_interestOps!=current_ops; 602 changed=_interestOps!=current_ops;
602 } 603 }
603 604
604 if(changed) 605 if(changed)
605 { 606 {
606 _selectSet.addChange(this); 607 _selectSet.addChange(this);
607 _selectSet.wakeup(); 608 _selectSet.wakeup();
608 } 609 }
609 } 610 }
610 611
611 612
612 /* ------------------------------------------------------------ */ 613 /* ------------------------------------------------------------ */
613 /** 614 /**
614 * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey 615 * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
615 */ 616 */
616 void doUpdateKey() 617 void doUpdateKey()
617 { 618 {
618 synchronized (this) 619 synchronized (this)
619 { 620 {
620 if (getChannel().isOpen()) 621 if (getChannel().isOpen())
621 { 622 {
622 if (_interestOps>0) 623 if (_interestOps>0)
623 { 624 {
624 if (_key==null || !_key.isValid()) 625 if (_key==null || !_key.isValid())
625 { 626 {
626 SelectableChannel sc = (SelectableChannel)getChannel(); 627 SelectableChannel sc = (SelectableChannel)getChannel();
627 if (sc.isRegistered()) 628 if (sc.isRegistered())
628 { 629 {
629 updateKey(); 630 updateKey();
630 } 631 }
631 else 632 else
632 { 633 {
633 try 634 try
634 { 635 {
635 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this); 636 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
636 } 637 }
637 catch (Exception e) 638 catch (Exception e)
638 { 639 {
639 LOG.trace("",e); 640 LOG.trace("",e);
640 if (_key!=null && _key.isValid()) 641 if (_key!=null && _key.isValid())
641 { 642 {
642 _key.cancel(); 643 _key.cancel();
643 } 644 }
644 645
645 if (_open) 646 if (_open)
646 { 647 {
647 _selectSet.destroyEndPoint(this); 648 _selectSet.destroyEndPoint(this);
648 } 649 }
649 _open=false; 650 _open=false;
650 _key = null; 651 _key = null;
651 } 652 }
652 } 653 }
653 } 654 }
654 else 655 else
655 { 656 {
656 _key.interestOps(_interestOps); 657 _key.interestOps(_interestOps);
657 } 658 }
658 } 659 }
659 else 660 else
660 { 661 {
661 if (_key!=null && _key.isValid()) 662 if (_key!=null && _key.isValid())
662 _key.interestOps(0); 663 _key.interestOps(0);
663 else 664 else
664 _key=null; 665 _key=null;
665 } 666 }
666 } 667 }
667 else 668 else
668 { 669 {
669 if (_key!=null && _key.isValid()) 670 if (_key!=null && _key.isValid())
670 _key.cancel(); 671 _key.cancel();
671 672
672 if (_open) 673 if (_open)
673 { 674 {
674 _open=false; 675 _open=false;
675 _selectSet.destroyEndPoint(this); 676 _selectSet.destroyEndPoint(this);
676 } 677 }
677 _key = null; 678 _key = null;
678 } 679 }
679 } 680 }
680 } 681 }
681 682
682 /* ------------------------------------------------------------ */ 683 /* ------------------------------------------------------------ */
683 /* 684 /*
684 */ 685 */
685 protected void handle() 686 protected void handle()
686 { 687 {
687 boolean dispatched=true; 688 boolean dispatched=true;
688 try 689 try
689 { 690 {
690 while(dispatched) 691 while(dispatched)
691 { 692 {
692 try 693 try
693 { 694 {
694 while(true) 695 while(true)
695 { 696 {
696 final AsyncConnection next = (AsyncConnection)_connection.handle(); 697 final AsyncConnection next = (AsyncConnection)_connection.handle();
697 if (next!=_connection) 698 if (next!=_connection)
698 { 699 {
699 LOG.debug("{} replaced {}",next,_connection); 700 LOG.debug("{} replaced {}",next,_connection);
700 Connection old=_connection; 701 Connection old=_connection;
701 _connection=next; 702 _connection=next;
702 _manager.endPointUpgraded(this,old); 703 _manager.endPointUpgraded(this,old);
703 continue; 704 continue;
704 } 705 }
705 break; 706 break;
706 } 707 }
707 } 708 }
708 catch (ClosedChannelException e) 709 catch (ClosedChannelException e)
709 { 710 {
710 LOG.trace("",e); 711 LOG.trace("",e);
711 } 712 }
712 catch (EofException e) 713 catch (EofException e)
713 { 714 {
714 LOG.debug("EOF", e); 715 LOG.debug("EOF", e);
715 try{close();} 716 try{close();}
716 catch(IOException e2){LOG.trace("",e2);} 717 catch(IOException e2){LOG.trace("",e2);}
717 } 718 }
718 catch (IOException e) 719 catch (IOException e)
719 { 720 {
720 LOG.warn(e.toString()); 721 LOG.warn(e.toString());
721 try{close();} 722 try{close();}
722 catch(IOException e2){LOG.trace("",e2);} 723 catch(IOException e2){LOG.trace("",e2);}
723 } 724 }
724 catch (Throwable e) 725 catch (Throwable e)
725 { 726 {
726 LOG.warn("handle failed", e); 727 LOG.warn("handle failed", e);
727 try{close();} 728 try{close();}
728 catch(IOException e2){LOG.trace("",e2);} 729 catch(IOException e2){LOG.trace("",e2);}
729 } 730 }
730 finally 731 finally
731 { 732 {
732 if (!_ishut && isInputShutdown() && isOpen()) 733 if (!_ishut && isInputShutdown() && isOpen())
733 { 734 {
734 _ishut=true; 735 _ishut=true;
735 try 736 try
736 { 737 {
737 _connection.onInputShutdown(); 738 _connection.onInputShutdown();
738 } 739 }
739 catch(Throwable x) 740 catch(Throwable x)
740 { 741 {
741 LOG.warn("onInputShutdown failed", x); 742 LOG.warn("onInputShutdown failed", x);
742 try{close();} 743 try{close();}
743 catch(IOException e2){LOG.trace("",e2);} 744 catch(IOException e2){LOG.trace("",e2);}
744 } 745 }
745 finally 746 finally
746 { 747 {
747 updateKey(); 748 updateKey();
748 } 749 }
749 } 750 }
750 dispatched=!undispatch(); 751 dispatched=!undispatch();
751 } 752 }
752 } 753 }
753 } 754 }
754 finally 755 finally
755 { 756 {
756 if (dispatched) 757 if (dispatched)
757 { 758 {
758 dispatched=!undispatch(); 759 dispatched=!undispatch();
759 while (dispatched) 760 while (dispatched)
760 { 761 {
761 LOG.warn("SCEP.run() finally DISPATCHED"); 762 LOG.warn("SCEP.run() finally DISPATCHED");
762 dispatched=!undispatch(); 763 dispatched=!undispatch();
763 } 764 }
764 } 765 }
765 } 766 }
766 } 767 }
767 768
768 /* ------------------------------------------------------------ */ 769 /* ------------------------------------------------------------ */
769 /* 770 /*
770 * @see org.eclipse.io.nio.ChannelEndPoint#close() 771 * @see org.eclipse.io.nio.ChannelEndPoint#close()
771 */ 772 */
772 @Override 773 @Override
773 public void close() throws IOException 774 public void close() throws IOException
774 { 775 {
775 // On unix systems there is a JVM issue that if you cancel before closing, it can 776 // On unix systems there is a JVM issue that if you cancel before closing, it can
776 // cause the selector to block waiting for a channel to close and that channel can 777 // cause the selector to block waiting for a channel to close and that channel can
777 // block waiting for the remote end. But on windows, if you don't cancel before a 778 // block waiting for the remote end. But on windows, if you don't cancel before a
778 // close, then the selector can block anyway! 779 // close, then the selector can block anyway!
779 // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318 780 // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
780 if (WORK_AROUND_JVM_BUG_6346658) 781 if (WORK_AROUND_JVM_BUG_6346658)
781 { 782 {
782 try 783 try
783 { 784 {
784 SelectionKey key = _key; 785 SelectionKey key = _key;
785 if (key!=null) 786 if (key!=null)
786 key.cancel(); 787 key.cancel();
787 } 788 }
788 catch (Throwable e) 789 catch (Throwable e)
789 { 790 {
790 LOG.trace("",e); 791 LOG.trace("",e);
791 } 792 }
792 } 793 }
793 794
794 try 795 try
795 { 796 {
796 super.close(); 797 super.close();
797 } 798 }
798 catch (IOException e) 799 catch (IOException e)
799 { 800 {
800 LOG.trace("",e); 801 LOG.trace("",e);
801 } 802 }
802 finally 803 finally
803 { 804 {
804 updateKey(); 805 updateKey();
805 } 806 }
806 } 807 }
807 808
808 /* ------------------------------------------------------------ */ 809 /* ------------------------------------------------------------ */
809 @Override 810 @Override
810 public String toString() 811 public String toString()
811 { 812 {
812 // Do NOT use synchronized (this) 813 // Do NOT use synchronized (this)
813 // because it's very easy to deadlock when debugging is enabled. 814 // because it's very easy to deadlock when debugging is enabled.
814 // We do a best effort to print the right toString() and that's it. 815 // We do a best effort to print the right toString() and that's it.
815 SelectionKey key = _key; 816 SelectionKey key = _key;
816 String keyString = ""; 817 String keyString = "";
817 if (key != null) 818 if (key != null)
818 { 819 {
819 if (key.isValid()) 820 if (key.isValid())
820 { 821 {
821 if (key.isReadable()) 822 if (key.isReadable())
822 keyString += "r"; 823 keyString += "r";
823 if (key.isWritable()) 824 if (key.isWritable())
824 keyString += "w"; 825 keyString += "w";
825 } 826 }
826 else 827 else
827 { 828 {
828 keyString += "!"; 829 keyString += "!";
829 } 830 }
830 } 831 }
831 else 832 else
832 { 833 {
833 keyString += "-"; 834 keyString += "-";
834 } 835 }
835 return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}", 836 return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
836 hashCode(), 837 hashCode(),
837 _socket.getRemoteSocketAddress(), 838 _socket.getRemoteSocketAddress(),
838 _socket.getLocalSocketAddress(), 839 _socket.getLocalSocketAddress(),
839 _state, 840 _state,
840 isOpen(), 841 isOpen(),
841 isInputShutdown(), 842 isInputShutdown(),
842 isOutputShutdown(), 843 isOutputShutdown(),
843 _readBlocked, 844 _readBlocked,
844 _writeBlocked, 845 _writeBlocked,
845 _writable, 846 _writable,
846 _interestOps, 847 _interestOps,
847 keyString, 848 keyString,
848 _connection); 849 _connection);
849 } 850 }
850 851
851 /* ------------------------------------------------------------ */ 852 /* ------------------------------------------------------------ */
852 public SelectSet getSelectSet() 853 public SelectSet getSelectSet()
853 { 854 {
854 return _selectSet; 855 return _selectSet;
855 } 856 }
856 857
857 /* ------------------------------------------------------------ */ 858 /* ------------------------------------------------------------ */
858 /** 859 /**
859 * Don't set the SoTimeout 860 * Don't set the SoTimeout
860 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) 861 * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
861 */ 862 */
862 @Override 863 @Override
863 public void setMaxIdleTime(int timeMs) throws IOException 864 public void setMaxIdleTime(int timeMs) throws IOException
864 { 865 {
865 _maxIdleTime=timeMs; 866 _maxIdleTime=timeMs;
866 } 867 }
867 868
868 } 869 }