comparison src/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @ 963:4b6216fa9cec

replace SelectChannelEndPoint._state with isDispatched
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 14 Oct 2016 00:15:28 -0600
parents 94498d6daf5b
children 768414c16e10
comparison
equal deleted inserted replaced
962:94498d6daf5b 963:4b6216fa9cec
58 * There is a different type of connection for HTTP, AJP, WebSocket and 58 * There is a different type of connection for HTTP, AJP, WebSocket and
59 * ProxyConnect. The connection may change for an SCEP as it is upgraded 59 * ProxyConnect. The connection may change for an SCEP as it is upgraded
60 * from HTTP to proxy connect or websocket. 60 * from HTTP to proxy connect or websocket.
61 */ 61 */
62 private volatile AsyncConnection _connection; 62 private volatile AsyncConnection _connection;
63 63 /*
64 private static final int STATE_NEEDS_DISPATCH = -1; 64 private static final int STATE_NEEDS_DISPATCH = -1;
65 private static final int STATE_UNDISPATCHED = 0; 65 private static final int STATE_UNDISPATCHED = 0;
66 private static final int STATE_DISPATCHED = 1; 66 private static final int STATE_DISPATCHED = 1;
67 private int _state; 67 private int _state;
68 68 */
69 private boolean isDispatched = false;
70
69 /** true if the last write operation succeed and wrote all offered bytes */ 71 /** true if the last write operation succeed and wrote all offered bytes */
70 private volatile boolean _writable = true; 72 private volatile boolean _writable = true;
71 73
72 /** True if a thread has is blocked in {@link #blockReadable(long)} */ 74 /** True if a thread has is blocked in {@link #blockReadable(long)} */
73 private boolean _readBlocked; 75 private boolean _readBlocked;
74 76
75 /** True if a thread has is blocked in {@link #blockWritable(long)} */ 77 /** True if a thread has is blocked in {@link #blockWritable(long)} */
76 private boolean _writeBlocked; 78 private boolean _writeBlocked;
77 79
78 private boolean _ishut; 80 private boolean _ishut = false;
79 81
80 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) 82 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
81 throws IOException 83 throws IOException
82 { 84 {
83 super(channel, maxIdleTime); 85 super(channel, maxIdleTime);
84 86
85 _manager = selectSet.getManager(); 87 _manager = selectSet.getManager();
86 _selectSet = selectSet; 88 _selectSet = selectSet;
87 _state = STATE_UNDISPATCHED;
88 _key = key; 89 _key = key;
89 } 90 }
90 91
91 @Override 92 @Override
92 public Connection getConnection() 93 public Connection getConnection()
104 /** Called by selectSet to schedule handling 105 /** Called by selectSet to schedule handling
105 * 106 *
106 */ 107 */
107 public synchronized void schedule() 108 public synchronized void schedule()
108 { 109 {
109 // If there is no key, then do nothing
110 if (!_key.isValid()) 110 if (!_key.isValid())
111 { 111 {
112 /*
112 _readBlocked = false; 113 _readBlocked = false;
113 _writeBlocked = false; 114 _writeBlocked = false;
114 this.notifyAll(); 115 this.notifyAll();
116 */
117 _key.cancel();
115 return; 118 return;
116 } 119 }
117 120
118 // If there are threads dispatched reading and writing 121 // If there are threads dispatched reading and writing
119 if (_readBlocked || _writeBlocked) 122 if (_readBlocked || _writeBlocked)
120 { 123 {
121 // assert _dispatched; 124 // assert _dispatched;
122 if (_readBlocked && _key.isReadable()) 125 if (_readBlocked && _key.isReadable())
123 _readBlocked=false; 126 _readBlocked = false;
124 if (_writeBlocked && _key.isWritable()) 127 if (_writeBlocked && _key.isWritable())
125 _writeBlocked = false; 128 _writeBlocked = false;
126 129
127 // wake them up is as good as a dispatched. 130 // wake them up is as good as a dispatched.
128 this.notifyAll(); 131 this.notifyAll();
129 132
130 // we are not interested in further selecting 133 // we are not interested in further selecting
131 _key.interestOps(0); 134 _key.interestOps(0);
132 if (_state<STATE_DISPATCHED) 135 if( !isDispatched )
133 updateKey(); 136 updateKey();
134 return; 137 return;
135 } 138 }
136 139
137 // Remove writeable op 140 // Remove writeable op
142 _key.interestOps(interestOps); 145 _key.interestOps(interestOps);
143 _writable = true; // Once writable is in ops, only removed with dispatch. 146 _writable = true; // Once writable is in ops, only removed with dispatch.
144 } 147 }
145 148
146 // If dispatched, then deregister interest 149 // If dispatched, then deregister interest
147 if (_state>=STATE_DISPATCHED) 150 if (isDispatched)
148 _key.interestOps(0); 151 _key.interestOps(0);
149 else 152 else
150 { 153 {
151 // other wise do the dispatch 154 // other wise do the dispatch
152 dispatch(); 155 dispatch();
154 } 157 }
155 158
156 @Override 159 @Override
157 public synchronized void dispatch() 160 public synchronized void dispatch()
158 { 161 {
159 if (_state<=STATE_UNDISPATCHED) 162 if( !isDispatched )
160 { 163 {
161 _state = STATE_DISPATCHED; 164 isDispatched = true;
162 try { 165 try {
163 _manager.execute(_handler); 166 _manager.execute(_handler);
164 } catch(RejectedExecutionException e) { 167 } catch(RejectedExecutionException e) {
165 _state = STATE_NEEDS_DISPATCH; 168 isDispatched = false;
166 LOG.warn("Dispatched Failed! "+this+" to "+_manager); 169 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
167 updateKey(); 170 // updateKey();
168 } 171 }
169 } 172 }
170 } 173 }
171 174
172 @Override 175 @Override
185 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) 188 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
186 { 189 {
187 synchronized (this) 190 synchronized (this)
188 { 191 {
189 _writable = false; 192 _writable = false;
190 if (_state<STATE_DISPATCHED) 193 if( !isDispatched )
191 updateKey(); 194 updateKey();
192 } 195 }
193 } 196 }
194 else if (l>0) 197 else if (l>0)
195 { 198 {
207 if (l==0 && buffer!=null && buffer.hasContent()) 210 if (l==0 && buffer!=null && buffer.hasContent())
208 { 211 {
209 synchronized (this) 212 synchronized (this)
210 { 213 {
211 _writable = false; 214 _writable = false;
212 if (_state<STATE_DISPATCHED) 215 if( !isDispatched )
213 updateKey(); 216 updateKey();
214 } 217 }
215 } 218 }
216 else if (l>0) 219 else if (l>0)
217 { 220 {
224 /* ------------------------------------------------------------ */ 227 /* ------------------------------------------------------------ */
225 /* 228 /*
226 * Allows thread to block waiting for further events. 229 * Allows thread to block waiting for further events.
227 */ 230 */
228 @Override 231 @Override
229 public boolean blockReadable(long timeoutMs) throws IOException 232 public synchronized boolean blockReadable(long timeoutMs) throws IOException
230 { 233 {
231 synchronized (this) 234 if (isInputShutdown())
232 { 235 throw new EofException();
233 if (isInputShutdown()) 236
234 throw new EofException(); 237 long now = _selectSet.getNow();
235 238 long end = now+timeoutMs;
236 long now = _selectSet.getNow(); 239 try
237 long end = now+timeoutMs; 240 {
238 try 241 _readBlocked = true;
239 { 242 while (!isInputShutdown() && _readBlocked)
240 _readBlocked = true; 243 {
241 while (!isInputShutdown() && _readBlocked) 244 try
242 { 245 {
243 try 246 updateKey();
244 { 247 this.wait(timeoutMs>0?(end-now):10000);
245 updateKey(); 248 }
246 this.wait(timeoutMs>0?(end-now):10000); 249 catch (final InterruptedException e)
247 } 250 {
248 catch (final InterruptedException e) 251 LOG.warn("",e);
249 { 252 }
250 LOG.warn("",e); 253 finally
251 } 254 {
252 finally 255 now = _selectSet.getNow();
253 { 256 }
254 now=_selectSet.getNow(); 257
255 } 258 if (_readBlocked && timeoutMs>0 && now>=end)
256 259 return false;
257 if (_readBlocked && timeoutMs>0 && now>=end) 260 }
258 return false; 261 }
259 } 262 finally
260 } 263 {
261 finally 264 _readBlocked = false;
262 {
263 _readBlocked = false;
264 }
265 } 265 }
266 return true; 266 return true;
267 } 267 }
268 268
269 /* ------------------------------------------------------------ */ 269 /* ------------------------------------------------------------ */
270 /* 270 /*
271 * Allows thread to block waiting for further events. 271 * Allows thread to block waiting for further events.
272 */ 272 */
273 @Override 273 @Override
274 public boolean blockWritable(long timeoutMs) throws IOException 274 public synchronized boolean blockWritable(long timeoutMs) throws IOException
275 { 275 {
276 synchronized (this) 276 if (isOutputShutdown())
277 { 277 throw new EofException();
278 if (isOutputShutdown()) 278
279 throw new EofException(); 279 long now=_selectSet.getNow();
280 280 long end=now+timeoutMs;
281 long now=_selectSet.getNow(); 281 try
282 long end=now+timeoutMs; 282 {
283 try 283 _writeBlocked = true;
284 { 284 while (_writeBlocked && !isOutputShutdown())
285 _writeBlocked = true; 285 {
286 while (_writeBlocked && !isOutputShutdown()) 286 try
287 { 287 {
288 try 288 updateKey();
289 { 289 this.wait(timeoutMs>0?(end-now):10000);
290 updateKey(); 290 }
291 this.wait(timeoutMs>0?(end-now):10000); 291 catch (final InterruptedException e)
292 } 292 {
293 catch (final InterruptedException e) 293 LOG.warn("",e);
294 { 294 }
295 LOG.warn("",e); 295 finally
296 } 296 {
297 finally 297 now = _selectSet.getNow();
298 { 298 }
299 now=_selectSet.getNow(); 299 if (_writeBlocked && timeoutMs>0 && now>=end)
300 } 300 return false;
301 if (_writeBlocked && timeoutMs>0 && now>=end) 301 }
302 return false; 302 }
303 } 303 finally
304 } 304 {
305 finally 305 _writeBlocked = false;
306 {
307 _writeBlocked = false;
308 }
309 } 306 }
310 return true; 307 return true;
311 } 308 }
312 309
313 @Override 310 @Override
324 */ 321 */
325 private synchronized void updateKey() 322 private synchronized void updateKey()
326 { 323 {
327 if( getChannel().isOpen() && _key.isValid()) 324 if( getChannel().isOpen() && _key.isValid())
328 { 325 {
329 boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended()); 326 boolean read_interest = _readBlocked || (!isDispatched && !_connection.isSuspended());
330 boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable); 327 boolean write_interest = _writeBlocked || (!isDispatched && !_writable);
328 // boolean write_interest = _writeBlocked || !isDispatched;
329 // boolean write_interest = true;
331 330
332 int interestOps = 331 int interestOps =
333 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0) 332 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
334 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0); 333 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
335 if( _key.interestOps() != interestOps ) { 334 if( _key.interestOps() != interestOps ) {
398 } 397 }
399 } 398 }
400 } 399 }
401 finally 400 finally
402 { 401 {
403 _state = STATE_UNDISPATCHED; 402 isDispatched = false;
404 updateKey(); 403 updateKey();
405 } 404 }
406 } 405 }
407 406
408 /* ------------------------------------------------------------ */ 407 /* ------------------------------------------------------------ */
443 } 442 }
444 else 443 else
445 { 444 {
446 keyString += "!"; 445 keyString += "!";
447 } 446 }
448 return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}", 447 return String.format("SCEP@%x{l(%s)<->r(%s),dispatched=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%s}-{%s}",
449 hashCode(), 448 hashCode(),
450 _socket.getRemoteSocketAddress(), 449 _socket.getRemoteSocketAddress(),
451 _socket.getLocalSocketAddress(), 450 _socket.getLocalSocketAddress(),
452 _state, 451 isDispatched,
453 isOpen(), 452 isOpen(),
454 isInputShutdown(), 453 isInputShutdown(),
455 isOutputShutdown(), 454 isOutputShutdown(),
456 _readBlocked, 455 _readBlocked,
457 _writeBlocked, 456 _writeBlocked,