Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 963:4b6216fa9cec
replace SelectChannelEndPoint._state with isDispatched
| author | Franklin Schmidt <fschmidt@gmail.com> |
|---|---|
| date | Fri, 14 Oct 2016 00:15:28 -0600 |
| parents | 94498d6daf5b |
| children | 768414c16e10 |
comparison
equal
deleted
inserted
replaced
| 962:94498d6daf5b | 963:4b6216fa9cec |
|---|---|
| 58 * There is a different type of connection for HTTP, AJP, WebSocket and | 58 * There is a different type of connection for HTTP, AJP, WebSocket and |
| 59 * ProxyConnect. The connection may change for an SCEP as it is upgraded | 59 * ProxyConnect. The connection may change for an SCEP as it is upgraded |
| 60 * from HTTP to proxy connect or websocket. | 60 * from HTTP to proxy connect or websocket. |
| 61 */ | 61 */ |
| 62 private volatile AsyncConnection _connection; | 62 private volatile AsyncConnection _connection; |
| 63 | 63 /* |
| 64 private static final int STATE_NEEDS_DISPATCH = -1; | 64 private static final int STATE_NEEDS_DISPATCH = -1; |
| 65 private static final int STATE_UNDISPATCHED = 0; | 65 private static final int STATE_UNDISPATCHED = 0; |
| 66 private static final int STATE_DISPATCHED = 1; | 66 private static final int STATE_DISPATCHED = 1; |
| 67 private int _state; | 67 private int _state; |
| 68 | 68 */ |
| 69 private boolean isDispatched = false; | |
| 70 | |
| 69 /** true if the last write operation succeed and wrote all offered bytes */ | 71 /** true if the last write operation succeed and wrote all offered bytes */ |
| 70 private volatile boolean _writable = true; | 72 private volatile boolean _writable = true; |
| 71 | 73 |
| 72 /** True if a thread has is blocked in {@link #blockReadable(long)} */ | 74 /** True if a thread has is blocked in {@link #blockReadable(long)} */ |
| 73 private boolean _readBlocked; | 75 private boolean _readBlocked; |
| 74 | 76 |
| 75 /** True if a thread has is blocked in {@link #blockWritable(long)} */ | 77 /** True if a thread has is blocked in {@link #blockWritable(long)} */ |
| 76 private boolean _writeBlocked; | 78 private boolean _writeBlocked; |
| 77 | 79 |
| 78 private boolean _ishut; | 80 private boolean _ishut = false; |
| 79 | 81 |
| 80 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) | 82 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) |
| 81 throws IOException | 83 throws IOException |
| 82 { | 84 { |
| 83 super(channel, maxIdleTime); | 85 super(channel, maxIdleTime); |
| 84 | 86 |
| 85 _manager = selectSet.getManager(); | 87 _manager = selectSet.getManager(); |
| 86 _selectSet = selectSet; | 88 _selectSet = selectSet; |
| 87 _state = STATE_UNDISPATCHED; | |
| 88 _key = key; | 89 _key = key; |
| 89 } | 90 } |
| 90 | 91 |
| 91 @Override | 92 @Override |
| 92 public Connection getConnection() | 93 public Connection getConnection() |
| 104 /** Called by selectSet to schedule handling | 105 /** Called by selectSet to schedule handling |
| 105 * | 106 * |
| 106 */ | 107 */ |
| 107 public synchronized void schedule() | 108 public synchronized void schedule() |
| 108 { | 109 { |
| 109 // If there is no key, then do nothing | |
| 110 if (!_key.isValid()) | 110 if (!_key.isValid()) |
| 111 { | 111 { |
| 112 /* | |
| 112 _readBlocked = false; | 113 _readBlocked = false; |
| 113 _writeBlocked = false; | 114 _writeBlocked = false; |
| 114 this.notifyAll(); | 115 this.notifyAll(); |
| 116 */ | |
| 117 _key.cancel(); | |
| 115 return; | 118 return; |
| 116 } | 119 } |
| 117 | 120 |
| 118 // If there are threads dispatched reading and writing | 121 // If there are threads dispatched reading and writing |
| 119 if (_readBlocked || _writeBlocked) | 122 if (_readBlocked || _writeBlocked) |
| 120 { | 123 { |
| 121 // assert _dispatched; | 124 // assert _dispatched; |
| 122 if (_readBlocked && _key.isReadable()) | 125 if (_readBlocked && _key.isReadable()) |
| 123 _readBlocked=false; | 126 _readBlocked = false; |
| 124 if (_writeBlocked && _key.isWritable()) | 127 if (_writeBlocked && _key.isWritable()) |
| 125 _writeBlocked = false; | 128 _writeBlocked = false; |
| 126 | 129 |
| 127 // wake them up is as good as a dispatched. | 130 // wake them up is as good as a dispatched. |
| 128 this.notifyAll(); | 131 this.notifyAll(); |
| 129 | 132 |
| 130 // we are not interested in further selecting | 133 // we are not interested in further selecting |
| 131 _key.interestOps(0); | 134 _key.interestOps(0); |
| 132 if (_state<STATE_DISPATCHED) | 135 if( !isDispatched ) |
| 133 updateKey(); | 136 updateKey(); |
| 134 return; | 137 return; |
| 135 } | 138 } |
| 136 | 139 |
| 137 // Remove writeable op | 140 // Remove writeable op |
| 142 _key.interestOps(interestOps); | 145 _key.interestOps(interestOps); |
| 143 _writable = true; // Once writable is in ops, only removed with dispatch. | 146 _writable = true; // Once writable is in ops, only removed with dispatch. |
| 144 } | 147 } |
| 145 | 148 |
| 146 // If dispatched, then deregister interest | 149 // If dispatched, then deregister interest |
| 147 if (_state>=STATE_DISPATCHED) | 150 if (isDispatched) |
| 148 _key.interestOps(0); | 151 _key.interestOps(0); |
| 149 else | 152 else |
| 150 { | 153 { |
| 151 // other wise do the dispatch | 154 // other wise do the dispatch |
| 152 dispatch(); | 155 dispatch(); |
| 154 } | 157 } |
| 155 | 158 |
| 156 @Override | 159 @Override |
| 157 public synchronized void dispatch() | 160 public synchronized void dispatch() |
| 158 { | 161 { |
| 159 if (_state<=STATE_UNDISPATCHED) | 162 if( !isDispatched ) |
| 160 { | 163 { |
| 161 _state = STATE_DISPATCHED; | 164 isDispatched = true; |
| 162 try { | 165 try { |
| 163 _manager.execute(_handler); | 166 _manager.execute(_handler); |
| 164 } catch(RejectedExecutionException e) { | 167 } catch(RejectedExecutionException e) { |
| 165 _state = STATE_NEEDS_DISPATCH; | 168 isDispatched = false; |
| 166 LOG.warn("Dispatched Failed! "+this+" to "+_manager); | 169 LOG.warn("Dispatched Failed! "+this+" to "+_manager); |
| 167 updateKey(); | 170 // updateKey(); |
| 168 } | 171 } |
| 169 } | 172 } |
| 170 } | 173 } |
| 171 | 174 |
| 172 @Override | 175 @Override |
| 185 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) | 188 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) |
| 186 { | 189 { |
| 187 synchronized (this) | 190 synchronized (this) |
| 188 { | 191 { |
| 189 _writable = false; | 192 _writable = false; |
| 190 if (_state<STATE_DISPATCHED) | 193 if( !isDispatched ) |
| 191 updateKey(); | 194 updateKey(); |
| 192 } | 195 } |
| 193 } | 196 } |
| 194 else if (l>0) | 197 else if (l>0) |
| 195 { | 198 { |
| 207 if (l==0 && buffer!=null && buffer.hasContent()) | 210 if (l==0 && buffer!=null && buffer.hasContent()) |
| 208 { | 211 { |
| 209 synchronized (this) | 212 synchronized (this) |
| 210 { | 213 { |
| 211 _writable = false; | 214 _writable = false; |
| 212 if (_state<STATE_DISPATCHED) | 215 if( !isDispatched ) |
| 213 updateKey(); | 216 updateKey(); |
| 214 } | 217 } |
| 215 } | 218 } |
| 216 else if (l>0) | 219 else if (l>0) |
| 217 { | 220 { |
| 224 /* ------------------------------------------------------------ */ | 227 /* ------------------------------------------------------------ */ |
| 225 /* | 228 /* |
| 226 * Allows thread to block waiting for further events. | 229 * Allows thread to block waiting for further events. |
| 227 */ | 230 */ |
| 228 @Override | 231 @Override |
| 229 public boolean blockReadable(long timeoutMs) throws IOException | 232 public synchronized boolean blockReadable(long timeoutMs) throws IOException |
| 230 { | 233 { |
| 231 synchronized (this) | 234 if (isInputShutdown()) |
| 232 { | 235 throw new EofException(); |
| 233 if (isInputShutdown()) | 236 |
| 234 throw new EofException(); | 237 long now = _selectSet.getNow(); |
| 235 | 238 long end = now+timeoutMs; |
| 236 long now = _selectSet.getNow(); | 239 try |
| 237 long end = now+timeoutMs; | 240 { |
| 238 try | 241 _readBlocked = true; |
| 239 { | 242 while (!isInputShutdown() && _readBlocked) |
| 240 _readBlocked = true; | 243 { |
| 241 while (!isInputShutdown() && _readBlocked) | 244 try |
| 242 { | 245 { |
| 243 try | 246 updateKey(); |
| 244 { | 247 this.wait(timeoutMs>0?(end-now):10000); |
| 245 updateKey(); | 248 } |
| 246 this.wait(timeoutMs>0?(end-now):10000); | 249 catch (final InterruptedException e) |
| 247 } | 250 { |
| 248 catch (final InterruptedException e) | 251 LOG.warn("",e); |
| 249 { | 252 } |
| 250 LOG.warn("",e); | 253 finally |
| 251 } | 254 { |
| 252 finally | 255 now = _selectSet.getNow(); |
| 253 { | 256 } |
| 254 now=_selectSet.getNow(); | 257 |
| 255 } | 258 if (_readBlocked && timeoutMs>0 && now>=end) |
| 256 | 259 return false; |
| 257 if (_readBlocked && timeoutMs>0 && now>=end) | 260 } |
| 258 return false; | 261 } |
| 259 } | 262 finally |
| 260 } | 263 { |
| 261 finally | 264 _readBlocked = false; |
| 262 { | |
| 263 _readBlocked = false; | |
| 264 } | |
| 265 } | 265 } |
| 266 return true; | 266 return true; |
| 267 } | 267 } |
| 268 | 268 |
| 269 /* ------------------------------------------------------------ */ | 269 /* ------------------------------------------------------------ */ |
| 270 /* | 270 /* |
| 271 * Allows thread to block waiting for further events. | 271 * Allows thread to block waiting for further events. |
| 272 */ | 272 */ |
| 273 @Override | 273 @Override |
| 274 public boolean blockWritable(long timeoutMs) throws IOException | 274 public synchronized boolean blockWritable(long timeoutMs) throws IOException |
| 275 { | 275 { |
| 276 synchronized (this) | 276 if (isOutputShutdown()) |
| 277 { | 277 throw new EofException(); |
| 278 if (isOutputShutdown()) | 278 |
| 279 throw new EofException(); | 279 long now=_selectSet.getNow(); |
| 280 | 280 long end=now+timeoutMs; |
| 281 long now=_selectSet.getNow(); | 281 try |
| 282 long end=now+timeoutMs; | 282 { |
| 283 try | 283 _writeBlocked = true; |
| 284 { | 284 while (_writeBlocked && !isOutputShutdown()) |
| 285 _writeBlocked = true; | 285 { |
| 286 while (_writeBlocked && !isOutputShutdown()) | 286 try |
| 287 { | 287 { |
| 288 try | 288 updateKey(); |
| 289 { | 289 this.wait(timeoutMs>0?(end-now):10000); |
| 290 updateKey(); | 290 } |
| 291 this.wait(timeoutMs>0?(end-now):10000); | 291 catch (final InterruptedException e) |
| 292 } | 292 { |
| 293 catch (final InterruptedException e) | 293 LOG.warn("",e); |
| 294 { | 294 } |
| 295 LOG.warn("",e); | 295 finally |
| 296 } | 296 { |
| 297 finally | 297 now = _selectSet.getNow(); |
| 298 { | 298 } |
| 299 now=_selectSet.getNow(); | 299 if (_writeBlocked && timeoutMs>0 && now>=end) |
| 300 } | 300 return false; |
| 301 if (_writeBlocked && timeoutMs>0 && now>=end) | 301 } |
| 302 return false; | 302 } |
| 303 } | 303 finally |
| 304 } | 304 { |
| 305 finally | 305 _writeBlocked = false; |
| 306 { | |
| 307 _writeBlocked = false; | |
| 308 } | |
| 309 } | 306 } |
| 310 return true; | 307 return true; |
| 311 } | 308 } |
| 312 | 309 |
| 313 @Override | 310 @Override |
| 324 */ | 321 */ |
| 325 private synchronized void updateKey() | 322 private synchronized void updateKey() |
| 326 { | 323 { |
| 327 if( getChannel().isOpen() && _key.isValid()) | 324 if( getChannel().isOpen() && _key.isValid()) |
| 328 { | 325 { |
| 329 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); | 326 boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended()); |
| 330 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); | 327 boolean write_interest = _writeBlocked || (!isDispatched && !_writable); |
| 328 // boolean write_interest = _writeBlocked || !isDispatched; | |
| 329 // boolean write_interest = true; | |
| 331 | 330 |
| 332 int interestOps = | 331 int interestOps = |
| 333 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) | 332 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) |
| 334 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); | 333 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); |
| 335 if( _key.interestOps() != interestOps ) { | 334 if( _key.interestOps() != interestOps ) { |
| 398 } | 397 } |
| 399 } | 398 } |
| 400 } | 399 } |
| 401 finally | 400 finally |
| 402 { | 401 { |
| 403 _state = STATE_UNDISPATCHED; | 402 isDispatched = false; |
| 404 updateKey(); | 403 updateKey(); |
| 405 } | 404 } |
| 406 } | 405 } |
| 407 | 406 |
| 408 /* ------------------------------------------------------------ */ | 407 /* ------------------------------------------------------------ */ |
| 443 } | 442 } |
| 444 else | 443 else |
| 445 { | 444 { |
| 446 keyString += "!"; | 445 keyString += "!"; |
| 447 } | 446 } |
| 448 return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", | 447 return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", |
| 449 hashCode(), | 448 hashCode(), |
| 450 _socket.getRemoteSocketAddress(), | 449 _socket.getRemoteSocketAddress(), |
| 451 _socket.getLocalSocketAddress(), | 450 _socket.getLocalSocketAddress(), |
| 452 _state, | 451 isDispatched, |
| 453 isOpen(), | 452 isOpen(), |
| 454 isInputShutdown(), | 453 isInputShutdown(), |
| 455 isOutputShutdown(), | 454 isOutputShutdown(), |
| 456 _readBlocked, | 455 _readBlocked, |
| 457 _writeBlocked, | 456 _writeBlocked, |
