Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 949:e9088af3787f
remove SelectSet._changes
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Tue, 11 Oct 2016 23:18:13 -0600 |
parents | f5aefdc4a81a |
children | a778413aefc0 |
comparison
equal
deleted
inserted
replaced
948:f5aefdc4a81a | 949:e9088af3787f |
---|---|
113 set.wakeup(); | 113 set.wakeup(); |
114 } | 114 } |
115 } | 115 } |
116 | 116 |
117 /* ------------------------------------------------------------ */ | 117 /* ------------------------------------------------------------ */ |
118 /** Register a {@link ServerSocketChannel} | |
119 * @param acceptChannel | |
120 */ | |
121 public void register(ServerSocketChannel acceptChannel) | |
122 { | |
123 int s=_set++; | |
124 if (s<0) | |
125 s=-s; | |
126 s=s%_selectSets; | |
127 SelectSet set=_selectSet[s]; | |
128 set.addChange(acceptChannel); | |
129 set.wakeup(); | |
130 } | |
131 | |
132 | |
133 /* ------------------------------------------------------------ */ | |
134 /** | 118 /** |
135 * @return the lowResourcesConnections | 119 * @return the lowResourcesConnections |
136 */ | 120 */ |
137 public long getLowResourcesConnections() | 121 public long getLowResourcesConnections() |
138 { | 122 { |
248 public class SelectSet implements Dumpable | 232 public class SelectSet implements Dumpable |
249 { | 233 { |
250 private final int _setID; | 234 private final int _setID; |
251 private volatile long _now = System.currentTimeMillis(); | 235 private volatile long _now = System.currentTimeMillis(); |
252 | 236 |
253 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); | |
254 | |
255 private volatile Selector _selector; | 237 private volatile Selector _selector; |
256 | 238 |
257 private volatile Thread _selecting; | 239 private volatile Thread _selecting; |
258 private int _busySelects; | 240 private int _busySelects; |
259 private long _monitorNext; | 241 private long _monitorNext; |
271 // create a selector; | 253 // create a selector; |
272 _selector = Selector.open(); | 254 _selector = Selector.open(); |
273 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; | 255 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; |
274 } | 256 } |
275 | 257 |
276 public void addChange(Object change) | 258 private void addChange(SocketChannel channel) |
277 { | 259 { |
278 _changes.add(change); | 260 try { |
279 } | 261 SelectionKey key = channel.register(_selector,SelectionKey.OP_READ,null); |
280 | 262 SelectChannelEndPoint endpoint = createEndPoint(channel,key); |
263 key.attach(endpoint); | |
264 endpoint.schedule(); | |
265 } catch(IOException e) { | |
266 LOG.warn("",e); | |
267 try { | |
268 channel.close(); | |
269 } catch(IOException e2) { | |
270 LOG.warn("",e2); | |
271 } | |
272 } | |
273 } | |
281 /* ------------------------------------------------------------ */ | 274 /* ------------------------------------------------------------ */ |
282 /** | 275 /** |
283 * Select and dispatch tasks found from changes and the selector. | 276 * Select and dispatch tasks found from changes and the selector. |
284 * | 277 * |
285 * @throws IOException | 278 * @throws IOException |
292 final Selector selector=_selector; | 285 final Selector selector=_selector; |
293 // Stopped concurrently ? | 286 // Stopped concurrently ? |
294 if (selector == null) | 287 if (selector == null) |
295 return; | 288 return; |
296 | 289 |
297 // Make any key changes required | |
298 Object change; | |
299 int changes=_changes.size(); | |
300 while (changes-->0 && (change=_changes.poll())!=null) | |
301 { | |
302 Channel ch=null; | |
303 SelectionKey key=null; | |
304 | |
305 try | |
306 { | |
307 if (change instanceof EndPoint) | |
308 { | |
309 // Update the operations for a key. | |
310 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; | |
311 ch=endpoint.getChannel(); | |
312 endpoint.doUpdateKey(); | |
313 } | |
314 else if (change instanceof SocketChannel) | |
315 { | |
316 // Newly registered channel | |
317 final SocketChannel channel=(SocketChannel)change; | |
318 ch=channel; | |
319 key = channel.register(selector,SelectionKey.OP_READ,null); | |
320 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | |
321 key.attach(endpoint); | |
322 endpoint.schedule(); | |
323 } | |
324 else if (change instanceof Runnable) | |
325 { | |
326 execute((Runnable)change); | |
327 } | |
328 else | |
329 throw new IllegalArgumentException(change.toString()); | |
330 } | |
331 catch (CancelledKeyException e) | |
332 { | |
333 LOG.trace("",e); | |
334 } | |
335 catch (Throwable e) | |
336 { | |
337 if (isRunning()) | |
338 LOG.warn("",e); | |
339 else | |
340 LOG.debug("",e); | |
341 | |
342 try | |
343 { | |
344 if (ch!=null) | |
345 ch.close(); | |
346 } | |
347 catch(IOException e2) | |
348 { | |
349 LOG.debug("",e2); | |
350 } | |
351 } | |
352 } | |
353 | |
354 | |
355 // Do and instant select to see if any connections can be handled. | 290 // Do and instant select to see if any connections can be handled. |
356 int selected=selector.selectNow(); | 291 int selected=selector.selectNow(); |
357 | 292 |
358 _now = System.currentTimeMillis(); | 293 _now = System.currentTimeMillis(); |
359 | 294 |
373 } | 308 } |
374 _now = System.currentTimeMillis(); | 309 _now = System.currentTimeMillis(); |
375 } | 310 } |
376 | 311 |
377 // workout how long to wait in select | 312 // workout how long to wait in select |
378 long wait = _changes.size()==0?__IDLE_TICK:0L; | 313 long wait = __IDLE_TICK; |
379 | 314 |
380 // If we should wait with a select | 315 // If we should wait with a select |
381 if (wait>0) | 316 if (wait>0) |
382 { | 317 { |
383 long before = _now; | 318 long before = _now; |