Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 963:4b6216fa9cec
replace SelectChannelEndPoint._state with isDispatched
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Fri, 14 Oct 2016 00:15:28 -0600 |
parents | 94498d6daf5b |
children | 768414c16e10 |
comparison
equal
deleted
inserted
replaced
962:94498d6daf5b | 963:4b6216fa9cec |
---|---|
58 * There is a different type of connection for HTTP, AJP, WebSocket and | 58 * There is a different type of connection for HTTP, AJP, WebSocket and |
59 * ProxyConnect. The connection may change for an SCEP as it is upgraded | 59 * ProxyConnect. The connection may change for an SCEP as it is upgraded |
60 * from HTTP to proxy connect or websocket. | 60 * from HTTP to proxy connect or websocket. |
61 */ | 61 */ |
62 private volatile AsyncConnection _connection; | 62 private volatile AsyncConnection _connection; |
63 | 63 /* |
64 private static final int STATE_NEEDS_DISPATCH = -1; | 64 private static final int STATE_NEEDS_DISPATCH = -1; |
65 private static final int STATE_UNDISPATCHED = 0; | 65 private static final int STATE_UNDISPATCHED = 0; |
66 private static final int STATE_DISPATCHED = 1; | 66 private static final int STATE_DISPATCHED = 1; |
67 private int _state; | 67 private int _state; |
68 | 68 */ |
69 private boolean isDispatched = false; | |
70 | |
69 /** true if the last write operation succeed and wrote all offered bytes */ | 71 /** true if the last write operation succeed and wrote all offered bytes */ |
70 private volatile boolean _writable = true; | 72 private volatile boolean _writable = true; |
71 | 73 |
72 /** True if a thread has is blocked in {@link #blockReadable(long)} */ | 74 /** True if a thread has is blocked in {@link #blockReadable(long)} */ |
73 private boolean _readBlocked; | 75 private boolean _readBlocked; |
74 | 76 |
75 /** True if a thread has is blocked in {@link #blockWritable(long)} */ | 77 /** True if a thread has is blocked in {@link #blockWritable(long)} */ |
76 private boolean _writeBlocked; | 78 private boolean _writeBlocked; |
77 | 79 |
78 private boolean _ishut; | 80 private boolean _ishut = false; |
79 | 81 |
80 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) | 82 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) |
81 throws IOException | 83 throws IOException |
82 { | 84 { |
83 super(channel, maxIdleTime); | 85 super(channel, maxIdleTime); |
84 | 86 |
85 _manager = selectSet.getManager(); | 87 _manager = selectSet.getManager(); |
86 _selectSet = selectSet; | 88 _selectSet = selectSet; |
87 _state = STATE_UNDISPATCHED; | |
88 _key = key; | 89 _key = key; |
89 } | 90 } |
90 | 91 |
91 @Override | 92 @Override |
92 public Connection getConnection() | 93 public Connection getConnection() |
104 /** Called by selectSet to schedule handling | 105 /** Called by selectSet to schedule handling |
105 * | 106 * |
106 */ | 107 */ |
107 public synchronized void schedule() | 108 public synchronized void schedule() |
108 { | 109 { |
109 // If there is no key, then do nothing | |
110 if (!_key.isValid()) | 110 if (!_key.isValid()) |
111 { | 111 { |
112 /* | |
112 _readBlocked = false; | 113 _readBlocked = false; |
113 _writeBlocked = false; | 114 _writeBlocked = false; |
114 this.notifyAll(); | 115 this.notifyAll(); |
116 */ | |
117 _key.cancel(); | |
115 return; | 118 return; |
116 } | 119 } |
117 | 120 |
118 // If there are threads dispatched reading and writing | 121 // If there are threads dispatched reading and writing |
119 if (_readBlocked || _writeBlocked) | 122 if (_readBlocked || _writeBlocked) |
120 { | 123 { |
121 // assert _dispatched; | 124 // assert _dispatched; |
122 if (_readBlocked && _key.isReadable()) | 125 if (_readBlocked && _key.isReadable()) |
123 _readBlocked=false; | 126 _readBlocked = false; |
124 if (_writeBlocked && _key.isWritable()) | 127 if (_writeBlocked && _key.isWritable()) |
125 _writeBlocked = false; | 128 _writeBlocked = false; |
126 | 129 |
127 // wake them up is as good as a dispatched. | 130 // wake them up is as good as a dispatched. |
128 this.notifyAll(); | 131 this.notifyAll(); |
129 | 132 |
130 // we are not interested in further selecting | 133 // we are not interested in further selecting |
131 _key.interestOps(0); | 134 _key.interestOps(0); |
132 if (_state<STATE_DISPATCHED) | 135 if( !isDispatched ) |
133 updateKey(); | 136 updateKey(); |
134 return; | 137 return; |
135 } | 138 } |
136 | 139 |
137 // Remove writeable op | 140 // Remove writeable op |
142 _key.interestOps(interestOps); | 145 _key.interestOps(interestOps); |
143 _writable = true; // Once writable is in ops, only removed with dispatch. | 146 _writable = true; // Once writable is in ops, only removed with dispatch. |
144 } | 147 } |
145 | 148 |
146 // If dispatched, then deregister interest | 149 // If dispatched, then deregister interest |
147 if (_state>=STATE_DISPATCHED) | 150 if (isDispatched) |
148 _key.interestOps(0); | 151 _key.interestOps(0); |
149 else | 152 else |
150 { | 153 { |
151 // other wise do the dispatch | 154 // other wise do the dispatch |
152 dispatch(); | 155 dispatch(); |
154 } | 157 } |
155 | 158 |
156 @Override | 159 @Override |
157 public synchronized void dispatch() | 160 public synchronized void dispatch() |
158 { | 161 { |
159 if (_state<=STATE_UNDISPATCHED) | 162 if( !isDispatched ) |
160 { | 163 { |
161 _state = STATE_DISPATCHED; | 164 isDispatched = true; |
162 try { | 165 try { |
163 _manager.execute(_handler); | 166 _manager.execute(_handler); |
164 } catch(RejectedExecutionException e) { | 167 } catch(RejectedExecutionException e) { |
165 _state = STATE_NEEDS_DISPATCH; | 168 isDispatched = false; |
166 LOG.warn("Dispatched Failed! "+this+" to "+_manager); | 169 LOG.warn("Dispatched Failed! "+this+" to "+_manager); |
167 updateKey(); | 170 // updateKey(); |
168 } | 171 } |
169 } | 172 } |
170 } | 173 } |
171 | 174 |
172 @Override | 175 @Override |
185 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) | 188 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) |
186 { | 189 { |
187 synchronized (this) | 190 synchronized (this) |
188 { | 191 { |
189 _writable = false; | 192 _writable = false; |
190 if (_state<STATE_DISPATCHED) | 193 if( !isDispatched ) |
191 updateKey(); | 194 updateKey(); |
192 } | 195 } |
193 } | 196 } |
194 else if (l>0) | 197 else if (l>0) |
195 { | 198 { |
207 if (l==0 && buffer!=null && buffer.hasContent()) | 210 if (l==0 && buffer!=null && buffer.hasContent()) |
208 { | 211 { |
209 synchronized (this) | 212 synchronized (this) |
210 { | 213 { |
211 _writable = false; | 214 _writable = false; |
212 if (_state<STATE_DISPATCHED) | 215 if( !isDispatched ) |
213 updateKey(); | 216 updateKey(); |
214 } | 217 } |
215 } | 218 } |
216 else if (l>0) | 219 else if (l>0) |
217 { | 220 { |
224 /* ------------------------------------------------------------ */ | 227 /* ------------------------------------------------------------ */ |
225 /* | 228 /* |
226 * Allows thread to block waiting for further events. | 229 * Allows thread to block waiting for further events. |
227 */ | 230 */ |
228 @Override | 231 @Override |
229 public boolean blockReadable(long timeoutMs) throws IOException | 232 public synchronized boolean blockReadable(long timeoutMs) throws IOException |
230 { | 233 { |
231 synchronized (this) | 234 if (isInputShutdown()) |
232 { | 235 throw new EofException(); |
233 if (isInputShutdown()) | 236 |
234 throw new EofException(); | 237 long now = _selectSet.getNow(); |
235 | 238 long end = now+timeoutMs; |
236 long now = _selectSet.getNow(); | 239 try |
237 long end = now+timeoutMs; | 240 { |
238 try | 241 _readBlocked = true; |
239 { | 242 while (!isInputShutdown() && _readBlocked) |
240 _readBlocked = true; | 243 { |
241 while (!isInputShutdown() && _readBlocked) | 244 try |
242 { | 245 { |
243 try | 246 updateKey(); |
244 { | 247 this.wait(timeoutMs>0?(end-now):10000); |
245 updateKey(); | 248 } |
246 this.wait(timeoutMs>0?(end-now):10000); | 249 catch (final InterruptedException e) |
247 } | 250 { |
248 catch (final InterruptedException e) | 251 LOG.warn("",e); |
249 { | 252 } |
250 LOG.warn("",e); | 253 finally |
251 } | 254 { |
252 finally | 255 now = _selectSet.getNow(); |
253 { | 256 } |
254 now=_selectSet.getNow(); | 257 |
255 } | 258 if (_readBlocked && timeoutMs>0 && now>=end) |
256 | 259 return false; |
257 if (_readBlocked && timeoutMs>0 && now>=end) | 260 } |
258 return false; | 261 } |
259 } | 262 finally |
260 } | 263 { |
261 finally | 264 _readBlocked = false; |
262 { | |
263 _readBlocked = false; | |
264 } | |
265 } | 265 } |
266 return true; | 266 return true; |
267 } | 267 } |
268 | 268 |
269 /* ------------------------------------------------------------ */ | 269 /* ------------------------------------------------------------ */ |
270 /* | 270 /* |
271 * Allows thread to block waiting for further events. | 271 * Allows thread to block waiting for further events. |
272 */ | 272 */ |
273 @Override | 273 @Override |
274 public boolean blockWritable(long timeoutMs) throws IOException | 274 public synchronized boolean blockWritable(long timeoutMs) throws IOException |
275 { | 275 { |
276 synchronized (this) | 276 if (isOutputShutdown()) |
277 { | 277 throw new EofException(); |
278 if (isOutputShutdown()) | 278 |
279 throw new EofException(); | 279 long now=_selectSet.getNow(); |
280 | 280 long end=now+timeoutMs; |
281 long now=_selectSet.getNow(); | 281 try |
282 long end=now+timeoutMs; | 282 { |
283 try | 283 _writeBlocked = true; |
284 { | 284 while (_writeBlocked && !isOutputShutdown()) |
285 _writeBlocked = true; | 285 { |
286 while (_writeBlocked && !isOutputShutdown()) | 286 try |
287 { | 287 { |
288 try | 288 updateKey(); |
289 { | 289 this.wait(timeoutMs>0?(end-now):10000); |
290 updateKey(); | 290 } |
291 this.wait(timeoutMs>0?(end-now):10000); | 291 catch (final InterruptedException e) |
292 } | 292 { |
293 catch (final InterruptedException e) | 293 LOG.warn("",e); |
294 { | 294 } |
295 LOG.warn("",e); | 295 finally |
296 } | 296 { |
297 finally | 297 now = _selectSet.getNow(); |
298 { | 298 } |
299 now=_selectSet.getNow(); | 299 if (_writeBlocked && timeoutMs>0 && now>=end) |
300 } | 300 return false; |
301 if (_writeBlocked && timeoutMs>0 && now>=end) | 301 } |
302 return false; | 302 } |
303 } | 303 finally |
304 } | 304 { |
305 finally | 305 _writeBlocked = false; |
306 { | |
307 _writeBlocked = false; | |
308 } | |
309 } | 306 } |
310 return true; | 307 return true; |
311 } | 308 } |
312 | 309 |
313 @Override | 310 @Override |
324 */ | 321 */ |
325 private synchronized void updateKey() | 322 private synchronized void updateKey() |
326 { | 323 { |
327 if( getChannel().isOpen() && _key.isValid()) | 324 if( getChannel().isOpen() && _key.isValid()) |
328 { | 325 { |
329 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); | 326 boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended()); |
330 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); | 327 boolean write_interest = _writeBlocked || (!isDispatched && !_writable); |
328 // boolean write_interest = _writeBlocked || !isDispatched; | |
329 // boolean write_interest = true; | |
331 | 330 |
332 int interestOps = | 331 int interestOps = |
333 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) | 332 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) |
334 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); | 333 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); |
335 if( _key.interestOps() != interestOps ) { | 334 if( _key.interestOps() != interestOps ) { |
398 } | 397 } |
399 } | 398 } |
400 } | 399 } |
401 finally | 400 finally |
402 { | 401 { |
403 _state = STATE_UNDISPATCHED; | 402 isDispatched = false; |
404 updateKey(); | 403 updateKey(); |
405 } | 404 } |
406 } | 405 } |
407 | 406 |
408 /* ------------------------------------------------------------ */ | 407 /* ------------------------------------------------------------ */ |
443 } | 442 } |
444 else | 443 else |
445 { | 444 { |
446 keyString += "!"; | 445 keyString += "!"; |
447 } | 446 } |
448 return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", | 447 return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", |
449 hashCode(), | 448 hashCode(), |
450 _socket.getRemoteSocketAddress(), | 449 _socket.getRemoteSocketAddress(), |
451 _socket.getLocalSocketAddress(), | 450 _socket.getLocalSocketAddress(), |
452 _state, | 451 isDispatched, |
453 isOpen(), | 452 isOpen(), |
454 isInputShutdown(), | 453 isInputShutdown(), |
455 isOutputShutdown(), | 454 isOutputShutdown(), |
456 _readBlocked, | 455 _readBlocked, |
457 _writeBlocked, | 456 _writeBlocked, |